Hello Friends, this post is going to be very interesting post where I will prepare data for a machine learning. In this post I will show you how you can parse the JSON data received from an API, stream it using Kinesis stream, modify it using Kinesis Analytics service followed by finally using Kiensis Firehose to transfer and store data on S3.
This article assumes that you have basic understanding about the Amazon EC2, S3 and AWS lambda. This is not beginner level article.
The first step to this process is get the data from API. We will be using this URL to get JSON data: https://randomuser.me/api/ . If you click on this URL, you can see that you will get new data every time you refresh it. I will poll this API to get a good amount of data to process it and store it.
Getting the Data from API using EC2
I will spin of an EC2 instance. This EC2 instance should have access to internet to poll this API. And this EC2 should have access to Amazon Kinesis service. We will discuss later about this. If you want to see how to create an EC2 instance with internet access, you can see this article: https://www.dotnetforall.com/aws-cloud-ec2-linux-apache-web-server/ and https://www.dotnetforall.com/creating-your-first-aws-vpc-for-publicly-accessible-resource-on-internet/.
Once you have an EC2 instance ready, you need to install nodejs and nvm as I am using NodeJS script to poll the API and send the data to Kinesis.
Run the below commands to get Node
curl -o- https://raw.githubusercontent.com/creationix/nvm/v0.33.2/install.sh | bash nvm install node.
Further you need to install aws-sdk module from the node package manager on the EC2 instance.
After Node is installed on the EC2 instance, take the following script to the EC2 instance keep it.
const https = require('https'); var aws = require('aws-sdk'); var kinesis = new aws.Kinesis({ region: 'us-east-1' }); var app = {}; app.handler = function(){ let timerId = setInterval(() => pollURL(function (result, error) { if (error) { console.log('error') } else { console.log("Final result is"+JSON.stringify(result)); var sensor = 'sensor-' + Math.floor(Math.random() * 100000); var recordsParams = { Data: JSON.stringify(result), StreamName: 'DataStreamForUserAPI', PartitionKey: sensor }; console.log(recordsParams); kinesis.putRecord(recordsParams, function(err, data) { if (err) { console.log('Error'); console.log(err.errorMessage); } else { console.log('Sent'); } }); } }),1000); setTimeout(() => { clearInterval(timerId); console.log('stop') }, 900000); }; var pollURL = function (callback) { var url = "https://randomuser.me/api/"; var req = https.get(url, (res) => { var body = ""; res.on("data", (chunk) => { body += chunk; }); res.on("end", () => { var result = JSON.parse(body); callback(result); }); }).on("error", (error) => { //callback(err); console.log('error'); }); }; app.handler();
A brief description about the above code. The above code uses the aws-sdk node module to get an instance of the kinesis-api. I have named my kinesis stream as DataStreamForUserAPI.
In the method pollURL, I am continuously polling the API and getting the data. I will run the above code continuously for 15 minutes to get enough data in the S3. Also I am sending each request to the kinesis API at an interval of 1 second.
The next part of the process is to set up a Kinesis Stream to ingest the data and make it ready for processing.
Setting Up Kinesis Stream of Data Ingestion
AWS Kinesis is a fully managed streaming service provided by Amazon. The service can be used to ingest any amount of data which can be further consumed by other service to find the trends and organize the data.
We have to go to the Kinesis service in the AWS console. And create our data stream by selecting “Ingest and process streaming data with Kinesis streams” and click “Create Data Stream”.
As you can see in the figure below I have named the stream as “DataStreamForUserAPI”, the same I have used in the above code to send data to. Since the data processed and sent by the above NodeJS code is not very huge we can survive with 1 shard as shown in the figure below.
Finally click on the “Create Kinesis Stream”.
Kinesis Analytics
The next step is to create a Kinesis Analytics application to ingest the data and write real time queries to create subset of data. As you can see by fetching the data from user API url provided above. It provides hell lot of data with many fields. But we only need FirstName, LastName, Gender, Latitude, Longitude and Age.
Lets start and ssh into the ec2 instance to run the NodeJS Script. Once you start the NodeJS script we can see the data being sent as I have put some logs as shown in below figure.
Go back to the dashboard for the Kinesis service and click on the “Create Analytics Service”. Now give the name, description and pick whatever runtime you are comfortable with. I will use SQL for this application And Create application.
Once our application is complete we need to hook in a data source. This datasource will provide out streaming data. In our case this is our kinesis stream which we created above. In the kinesis stream dropdown we will get our Kinesis stream pre populated.
On the same page we have other options “Record pre-processing with AWS Lambda”which we need to keep as disabled. Finally in the “Access Permission” we need to select “Create / update IAM role kinesis-analytics-data-transformation-application-us-east-1” as this Analytics application needs permissions to access the Kinesis Stream.
In the same page we have an option to discover schema. This help us to see our data in tabular format. For this we need some live data coming to our Kinesis service which in turn is the data source for Kinesis Analytics service. Once you click “Discover Schema” you can see live data from the Analytics service as shown in the figure below.
Click “Save and Continue”. Now we want to write real time SQL queries on our streaming data. We can do the same in the next screen.
In next screen you can see a section named “Real time analytics”. Click on “Go to SQL Editor” as shown in the screen below.
This is the place where we write query to form the subset of data based on our condition. If you are not familiar with query writing, I have written the query below.
CREATE OR REPLACE STREAM "USER_DATA" ( first VARCHAR(16), last VARCHAR(16), age INTEGER, gender VARCHAR(16), latitude FLOAT, longitude FLOAT ); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "USER_DATA" SELECT STREAM "first", "last", "age", "gender", "latitude", "longitude" FROM "SOURCE_SQL_STREAM_001" WHERE "age" >= 21;
In the above query I have created a temporary stream named “USER_DATA” with a schema of the fields we need in our output.
The next row will pump the data from our live stream to our custom stream.
Once we are done we can save and run the SQL and query and see the current live data from the API in console. As you can see these are the columns which we selected in the query.
Once we start recieving our data from the stream we need to store the data in a data store to keep the data. We will use the Kinesis Delivery stream for the same.
We will click on the Destination tab present on the above figure.
Once we click on the “Connect to Destination” we have couple of options to select from. We have to select “Kinesis Firehose Delivery Stream”. And since we need to create a new “Kinesis Firehose Delivery Stream” since we do not have one. Click on “Create New”.
Once we are the delivery stream page we need to provide a name. I will call it “Data-Delivery-Stream”. And select “Direct put or other sources” option in the Choose Source section as the data is coming from “Kinesis Analytics”. Refer the below figure.
Come down and click next. At this point Kinesis Firehose can format or transform records before we deliver them to S3. Since we have already converted the records by selecting the specific columns that we need by using Kinesis Analytics.
But there is one more piece of transformation we need to do. We need to add a new line to end of each record so that the records are properly formatted.
In the “Transform source records with AWS lambda” section select “Enabled” radio button. We need to click “Create New” lambda function. Check the below figure.
Now there are already created lambda functions provided by Kinesis Firehose to ease the process. We will select “General Firehose processing” out of these.
Once we are in the lambda function console. I have named my function as “new-line-function” and select execution role as “Create a new role with basic lambda permission”. Come down and click “Create new Function”.
Now we need to do some modifications to the lambda function. Since the data received from Kinesis Firehose is base64 encoded we need to encode out new line character(‘\n’) to base 64 format as shown in the below code. And add at the end of data received.
exports.handler = async (event, context) => {
let buffer = new Buffer('\n');
let base64data = buffer.toString('base64');
/* Process the list of records and transform them */
const output = event.records.map((record) => ({
/ This transformation is the "identity" transformation, the data is left intact */
recordId: record.recordId,
result: 'Ok',
data: record.data + base64data,
}));
console.log(Processing completed. Successful records ${output.length}.
);
return { records: output };
};
There is one more setting we need to do. We need to increase the timeout of the function to 1 minute, in case if there is any delay in data transmission.
Go back to the Firehose screen and select the new created lambda function as shown in figure below.
Keep “Convert Record format” as Disabled and click Next.
In the “Choose Destination” section select “Amazon S3”. And create a new S3 bucket from the same page. I have named my bucket as “my-user-data-new”. Keep other settings as they and click Next.
In the configure section as shown in the figure below. Change buffer settings to not to accumulate the data for long.
In the IAM role section, create a new role to give the Firehose service access to the S3 bucket. Finally click next, review your changes and click Create Delivery stream.
Select the newly create Firehose stream in the Kinesis Analytics section from where we started couple of sections above.
Choose In application stream as “User-Data” which I created in SQL query and select output format as JSON. Click “Save And Continue”. It might take some time to create the stream.
Once the whole setup is done, go to the EC2 instance and start the NodeJS script to start sending data to Kinesis stream.
After some time you can go to the S3 bucket and open it and go to the root of the folder. You can see may files in the S3 bucket folder.
Try downloading any one of them and you may find the data and fields which you have provided in the SQL query in Firehose Analytics. Have a look at the below data from my S3 bucket.
{"FIRST":"paula","LAST":"domingue","AGE":51,"GENDER":"female","LATITUDE":-20.5403,"LONGITUDE":125.9552}
{"FIRST":"ayse","LAST":"aubert","AGE":44,"GENDER":"female","LATITUDE":-48.7075,"LONGITUDE":179.9309}
{"FIRST":"emilia","LAST":"jarvi","AGE":65,"GENDER":"female","LATITUDE":-15.1059,"LONGITUDE":36.5814}
Conclusion:
This was a long article about streaming the live data and converting to as per our needs using various AWS services. As you can see I haven’t used any of the on premise service and everything was taken care by AWS. That is the power of cloud.
Leave a Reply