Contact Us
Thought Leadership

Using Lambda And The New Firehose Console To Transform Data

November 26, 2020

Objective

Show you how you can create a delivery stream that will ingest sample data, transforms it and store both the source and the transformed data.

Executive Summary

Amazon Kinesis Firehose is one of the easiest ways to prepare and load streaming data into the AWS ecosystem. Firehose was first released on October 2015 and it has evolved from just a simple solution to store your data without any modification to a delivery stream with transformation features. On July 2017 the delivery stream console was updated to offer you more options and so reduce the amount of work required to store and transform your data. This post offers you a guide to setup a proof of concept that will:

  1. Filter and transform sample data with n AWS Lambda function and store the results in S3.
  2. Keep the sample data to S3 for future analysis.
  3. Check the capabilities of the console, like encryption and compression.
  4. Take advantage of Firehose sample data producer (you won’t need to create any script).

Prerequisites

  • You will need an AWS account.

Steps

STEP 0: ACCESS THE KINESIS FIREHOSE SERVICE

  1. Login into the AWS console.
  2. Search for the Kinesis service with “Find a service …” text box or as an item of the “Analytics” list.
  3. Click on “Create delivery stream”. In case you have not created a Kinesis stream before, you will need to press on “Get Started” first.

STEP 1: NAME AND SOURCE

  1. Select a name for your delivery stream, for this demo I will use “deliveryStream2018”.
  2. Choose a “Source”, for this demo select “Direct PUT or other resources”. Essentially you have two options here: Use a Kinesis Stream as the input for the delivery stream or you can send the records by other means:
    • PUT API: You will use this option if your custom application will feed the delivery stream directly with the AWS SDK.
    • Kinesis Agent: Use the agent to send information from logs produced by your applications, in other words, the agent will track the changes in your log files and send the information to the delivery stream.
    • AWS IoT: If you have an IoT ecosystem, you can use the rules to send messages to your Firehose stream.
    • CloudWatch Logs: Sends any incoming log events that match a defined filter to your delivery stream.
    • CloudWatch Events: Deliver information of events when a CloudWatch rule is matched.
  3. Click “Next”.

STEP 6: TEST YOUR WORK

Firehose allows you to send demo data to your stream, let’s try it out.

  1. Select your stream radio button to enable the “Test with demo data” button.
  2. Click the “Test with demo data” button. You will see the “Test with demo data” section
  3. Select “Start sending demo data”.
  4. Do not leave this page until you complete the next steps, but be sure to stop the demo to save money once you see the results in your S3 bucket(s), if you close the tab, the demo data should stop too.
  5. In this same page, go down and check the “Monitoring” tab. Wait two minutes and use the refresh button to see the changes in the metrics.
  6. Wait up to 5 minutes then check your bucket for results, they will be inside folders representing the date. Download the files produced and see the results. Your “source_recods” folder has the backup data.
  7. What if something goes wrong? Where are the logs? Well, you can take check your logs in Cloudwatch. In the “Monitoring” tab, you will see a link to CloudWatch console, once there, select “Logs” on the menu, then look for your Lambda or Firehose logs in the list.
  8. Go back to the Firehose tab and select “Stop sending demo data”.
  1. Select “Create”, you will be taken back to the Function editor.
  2. Make sure to press “Save” to save your changes in the editor.
  3. Now run your test by selecting your test in the dropdown and press “Test”.
  4. You should get quick green results, check the details of the execution to know more.
    1. If you expand the “Details” section you will be able to see the output.
    2. You may want to look at the Base64 decoded object.
    3. In this case, we are filtering and transforming the stocks where price is 5.0 or greater. The one that we are using for testing has a 4.73 as price, so this record ends as a “Dropped” record, indicating that is not going to be part of the transformation set, but it did not provoke an error.
    4. A record that will be part of the transformation set will have a result attribute of “OK”.
    5. You can remove the filter if you want to transform all your data.
  5. Now you can go back to the Kinesis Firehose tab, you can return to to this tab later if you want to dig deeper.
  6. Back into the Firehose delivery stream wizard, close the “Choose Lambda blueprint” dialog.
  7. Select your newly created function in the “Lambda function” dropdown, refresh if necessary.
  8. Ignore the timeout warning, this lambda function does not require too much time to execute, so keep going and select “Next”.
    {
       "ticker_symbol":"NGC",
       "sector":"HEALTHCARE",
       "change":-0.08,
       "price":4.73
    }
    {
      "records": [
        {
          "recordId": "49583354031560888214100043296632351296610463251381092354000000",
          "approximateArrivalTimestamp": 1523204766865,
          "data": "eyJ0aWNrZXJfc3ltYm9sIjoiTkdDIiwic2VjdG9yIjoiSEVBTFRIQ0FSRSIsImNoYW5nZSI6LTAuMDgsInByaWNlIjo0LjczfQ=="
        }
      ],
      "region": "us-east-1",
      "deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
      "invocationId": "invocationIdExample"
    }

     

  1. Go back to the function menu (the header), look for the dropdown where you can create a new test, it is right before the “Test” button, select “Configure Test Event” in the dropdown. A secondary window will appear.
  2. Select “Create new test event” to create a new test and “Kinesis Firehose” as “Event template”.
  3. Select an “Event name”.
  4. Copy and paste the next JSON object into the editor to use it as the input for your test:
    'use strict';
    console.log('Loading function');
     
    /* Stock Ticker format parser */
    const parser = /^{"ticker_symbol":"[A-Z]+","SECTOR":"[A-Z]+","change":[-.0-9]+,"price":[-.0-9]+}/i;
    //"ticker_symbol":"NGC","sector":"HEALTHCARE","change":-0.08,"price":4.73
     
     
    exports.handler = (event, context, callback) => {
        let success = 0; // Number of valid entries found
        let failure = 0; // Number of invalid entries found
        let dropped = 0; // Number of dropped entries 
     
        /* Process the list of records and transform them */
        const output = event.records.map((record) => {
            const entry = (new Buffer(record.data, 'base64')).toString('utf8');
            console.log("Entry: ", entry);
            let match = parser.exec(entry);
            if (match) {
                let parsed_match = JSON.parse(match); 
                var milliseconds = new Date().getTime();
                /* Add timestamp and convert to CSV */
                const result = `${milliseconds},${parsed_match.ticker_symbol},${parsed_match.sector},${parsed_match.change},${parsed_match.price}`+"n";
                const payload = (new Buffer(result, 'utf8')).toString('base64');
                if (parsed_match.sector !== 'RETAIL') {
                    /* Dropped event, notify and leave the record intact */
                    dropped++;
                    return {
                        recordId: record.recordId,
                        result: 'Dropped',
                        data: record.data,
                    };
                }
                else {
                    /* Transformed event */
                    success++;  
                    return {
                        recordId: record.recordId,
                        result: 'Ok',
                        data: payload,
                    };
                }
            }
            else {
                /* Failed event, notify the error and leave the record intact */
                console.log("Failed event : "+ record.data);
                failure++;
                return {
                    recordId: record.recordId,
                    result: 'ProcessingFailed',
                    data: record.data,
                };
            }
        });
        console.log(`Processing completed.  Successful records ${output.length}.`);
        callback(null, { records: output });

     

STEP 2: TRANSFORM RECORDS

  1. Once data is available in a delivery stream, we can invoke a Lambda function to transform it. To our relief, some ready-to-use blueprints are offered by AWS and you can adapt them according to your data format. In this tutorial, we will transform sample data offered by Firehose, so select “Enabled”.
  2. Select “Create New”.
  3. You will see a list of blueprints for you to use. We will process custom data so select the first one “General Firehose Processing”. You will be taken to a new page, do not close the previous one, we will be back to it.
  4. The Lambda “Create function” page will open.
  5. Choose a “Name” for your function.
  6. the “Role” dropdown, select “Create new role from template(s)”, this will create a new role to allow this Lambda function to logging to CloudWatch. Choose a “Name role”, you may want to remember this one to delete it quickly when we are done with the tutorial.
  7. Leave the “Policy templates” field empty.
  8. Once you are ready select “Create function” and wait for the editor to appear.

STEP 2.1: INTO THE LAMBDA REALM

  1. Scroll down until you see the “Function code” section.
  2. Change “Runtime” to “Node.js 8.10”.
  3. The “index.js” file should be available to edit, if it is not, open the file with a double click in the file name on the left side.
  4. Remove all the code and copy the next function and paste it into the editor.
'use strict';
console.log('Loading function');
 
/* Stock Ticker format parser */
const parser = /^{"ticker_symbol":"[A-Z]+","SECTOR":"[A-Z]+","change":[-.0-9]+,"price":[-.0-9]+}/i;
//"ticker_symbol":"NGC","sector":"HEALTHCARE","change":-0.08,"price":4.73
 
 
exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found
    let dropped = 0; // Number of dropped entries 
 
    /* Process the list of records and transform them */
    const output = event.records.map((record) => {
        const entry = (new Buffer(record.data, 'base64')).toString('utf8');
        console.log("Entry: ", entry);
        let match = parser.exec(entry);
        if (match) {
            let parsed_match = JSON.parse(match); 
            var milliseconds = new Date().getTime();
            /* Add timestamp and convert to CSV */
            const result = `${milliseconds},${parsed_match.ticker_symbol},${parsed_match.sector},${parsed_match.change},${parsed_match.price}`+"n";
            const payload = (new Buffer(result, 'utf8')).toString('base64');
            if (parsed_match.sector !== 'RETAIL') {
                /* Dropped event, notify and leave the record intact */
                dropped++;
                return {
                    recordId: record.recordId,
                    result: 'Dropped',
                    data: record.data,
                };
            }
            else {
                /* Transformed event */
                success++;  
                return {
                    recordId: record.recordId,
                    result: 'Ok',
                    data: payload,
                };
            }
        }
        else {
            /* Failed event, notify the error and leave the record intact */
            console.log("Failed event : "+ record.data);
            failure++;
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed',
                data: record.data,
            };
        }
    });
    console.log(`Processing completed.  Successful records ${output.length}.`);
    callback(null, { records: output });

 

  1. Go back to the function menu (the header), look for the dropdown where you can create a new test, it is right before the “Test” button, select “Configure Test Event” in the dropdown. A secondary window will appear.
  2. Select “Create new test event” to create a new test and “Kinesis Firehose” as “Event template”.
  3. Select an “Event name”.
  4. Copy and paste the next JSON object into the editor to use it as the input for your test:
    {
      "records": [
        {
          "recordId": "49583354031560888214100043296632351296610463251381092354000000",
          "approximateArrivalTimestamp": 1523204766865,
          "data": "eyJ0aWNrZXJfc3ltYm9sIjoiTkdDIiwic2VjdG9yIjoiSEVBTFRIQ0FSRSIsImNoYW5nZSI6LTAuMDgsInByaWNlIjo0LjczfQ=="
        }
      ],
      "region": "us-east-1",
      "deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
      "invocationId": "invocationIdExample"
    }

     

The data attribute is encoded in base64, this is the type of data received by Firehose. The value of this data after being parsed is:

{
   "ticker_symbol":"NGC",
   "sector":"HEALTHCARE",
   "change":-0.08,
   "price":4.73
}

 

  1. Select “Create”, you will be taken back to the Function editor.
  2. Make sure to press “Save” to save your changes in the editor.
  3. Now run your test by selecting your test in the dropdown and press “Test”.
  4. You should get quick green results, check the details of the execution to know more.
    1. If you expand the “Details” section you will be able to see the output.
    2. You may want to look at the Base64 decoded object.
    3. In this case, we are filtering and transforming the stocks where price is 5.0 or greater. The one that we are using for testing has a 4.73 as price, so this record ends as a “Dropped” record, indicating that is not going to be part of the transformation set, but it did not provoke an error.
    4. A record that will be part of the transformation set will have a result attribute of “OK”.
    5. You can remove the filter if you want to transform all your data.
  5. Now you can go back to the Kinesis Firehose tab, you can return to to this tab later if you want to dig deeper.
  6. Back into the Firehose delivery stream wizard, close the “Choose Lambda blueprint” dialog.
  7. Select your newly created function in the “Lambda function” dropdown, refresh if necessary.
  8. Ignore the timeout warning, this lambda function does not require too much time to execute, so keep going and select “Next”.

STEP 3: CHOOSE A DESTINATION

We have configured a serverless function to transform our records, but we have not selected where to store them, and neither if we want to keep the raw records. In this case, we will use both options.

  1. Select “Amazon S3” as destination for simplicity. This will be the service where we will store our transformed data.
  2. Select an existing bucket or create one.
  3. You may select a secondary prefix for your files, I will use “transformed” to distinguish it from the source files. Firehose will add a timestamp automatically in any case.
  4. In “S3 backup”, select “Enable” the store the raw data too. Select the destination bucket or create one, you may select a prefix for this too.
  5. Go ahead and press “Next”.

STEP 4: CONFIGURE SETTINGS

  1. Leave your S3 buffer conditions as they are. They indicate the maximum amount of time that must be passed or the maximum quantity of data that must be gathered before to execute your Lambda function. This is an OR condition, meaning when any of these rules are satisfied, the Lambda function will execute.
  2. If you want to save space and secure your data, you can select your desired compression and encryption options. I am using the defaults for this tutorial.
  3. Error logging is enabled by default, you can keep it like that in case you want to debug your code later.
  4. We need an IAM role to access the corresponding resources from Firehose, like S3. In the “IAM role” choose to “Create new, or Choose”, a new tab will open.
  5. As we have selected to use S3 in the previous steps, the IAM policy that we need has already been prepared for us, reviewed if you are interested and press on “Allow”. The role will be created and the tab will be closed.
  6. The new role will be listed in the “IAM role” dropdown, you can select more if needed.
  7. Select “Next” when ready.

STEP 5: REVIEW YOUR CONFIGURATION

  1. Take a moment to check the options that you have indicated, when ready select “Create delivery stream”.
  2. You will be taken to the “Firehose delivery stream” page, you should see your new stream active after some seconds.

STEP 6: TEST YOUR WORK

Firehose allows you to send demo data to your stream, let’s try it out.

  1. Select your stream radio button to enable the “Test with demo data” button.
  2. Click the “Test with demo data” button. You will see the “Test with demo data” section
  3. Select “Start sending demo data”.
  4. Do not leave this page until you complete the next steps, but be sure to stop the demo to save money once you see the results in your S3 bucket(s), if you close the tab, the demo data should stop too.
  5. In this same page, go down and check the “Monitoring” tab. Wait two minutes and use the refresh button to see the changes in the metrics.
  6. Wait up to 5 minutes then check your bucket for results, they will be inside folders representing the date. Download the files produced and see the results. Your “source_recods” folder has the backup data.
  7. What if something goes wrong? Where are the logs? Well, you can take check your logs in Cloudwatch. In the “Monitoring” tab, you will see a link to CloudWatch console, once there, select “Logs” on the menu, then look for your Lambda or Firehose logs in the list.
  8. Go back to the Firehose tab and select “Stop sending demo data”.

Cleaning

Once that you feel comfortable understanding the flow and the services used in this tutorial, it is a good idea to delete these resources. If you are under the Free Tier, you will only incur in costs when your Firehose delivery stream is being fed, and if you are outside of the Lambda and S3 free tier limits, so as long as you are not producing and inserting data into the stream, you will not be charged. Still, it is a good idea to remove all when you are done.

DELETE THE DELIVERY STREAM

  1. Go to the Firehose console page.
  2. Select your delivery stream.
  3. Press on the “Delete” or “Delete Delivery Stream” button depending on your location.

DELETE S3 FILES AND/OR BUCKET

  1. Go to the S3 console page.
  2. Note: To select and item on S3, do not press on the link, select the row or checkbox.
  3. You may want to remove the files only, in that case, access the S3 console, then select the folders inside the bucket, select them and on the “More” menu, select “Delete”.
  4. If you want to delete the bucket too, go back to the S3 console and select the destination bucket that you have used for this tutorial. Press on the “Delete Bucket” or “Delete Delivery Stream” button depending on your location.

DELETE THE LAMBDA FUNCTION

  1. Access the Lambda console.
  2. On the left menu, select “Functions”.
  3. Select your Lambda function and in the “Actions” menu, select “Delete”.
  4. You can also delete the function directly into the Function editor using “Actions” and then “Delete function”.

DELETE THE ROLES

  1. Remember that you have created two roles during this tutorial, one for Lambda and one for Firehose.
  2. Access the IAM console.
  3. Select “Roles” on the left menu.
  4. The one for Lambda was chosen by you in a previous step, look for it and select it. If you are not sure about it, you can check the creation time of the roles using the gear on top of the list to show extra information, this and the firehose role should have been created during the same period of time.
  5. The role created for Firehose should be named “firehose_delivery_role” unless you have chosen a different name.
  6. To delete them, select them using the checkboxes next to the item and then click on “Delete Role”. You will be presented with information about the roles to confirm they are the ones that you want.

Conclusion

Firehose is fully managed service and it will automatically scale to match your throughput requirements without any ongoing administration, you can extend its capabilities with Lamda functions as we have demonstrated in this tutorial where we have ingested data from a system that produces sample stock records, then we have filtered and transformed it to a different format and we are also keeping copy of the raw data for future analysis in S3.

References:

Related Insights

We are results-driven with a focus on providing customer service excellence

IO Connect Services is here to help you by offering high-quality cloud technology solutions.
Connect with us
®2024 IO Connect Services
Privacy PolicyCookie Policy
magnifiercrossmenuchevron-down
linkedin facebook pinterest youtube rss twitter instagram facebook-blank rss-blank linkedin-blank pinterest youtube twitter instagram