Ingest Millions of Messages in the Queue using Serverless

While working on one of the Serverless projects, We came across a problem where I needed to ingest millions of messages from an external API to Azure Storage Queue. We made a series of changes in my approach to reach the solution which worked for us and I believe it can be helpful to anyone who’s struggling with the same problem.

We started with the approach where we were executing the Serverless function through the HTTP endpoint. We call it an Ingestion Function. The ingestion function would then hit an external API to get the messages and try to ingest it in Queue.

Here is how my initial architecture looked like:

If you see the architecture diagram, the single ingestion function is a bottleneck as it can only scale up-to specific limits. Also, we’re not utilizing the strength of the actual serverless function, which is multiple parallel small invocations. Therefore, we decided to go with the approach where I could scale up the ingestion serverless to multiple invocations so that I can get the scalability as much as needed.

The idea was to divide the total number of messages into multiple chunks (of 5000 in my case) and then pass those messages to the ingestion function so that it can finally ingest those messages to the Queue.

We created another serverless function, we call it Chunking Function, to divide the messages into chunks using this helper function:

def chunking_of_list(data):
"""return the list in 5000 chunks"""
  return [data[x:x+5000] for x in range(0, len(data), 5000)]

And then uploaded each chunk in a separate file into Azure Blob Storage using this code:

def upload_chunks(idLists):
  st = storage.Storage(container_name=constant.CHUNK_CONTAINER)

  for index, singleList in enumerate(idLists):'Uploading file{0} to the {1} blob storage'  .format(index, constant.CHUNK_CONTAINER))
     st.upload_chunk('file' + str(index), singleList)

Finally, we set up the ingestion function to listen to the Azure Blob Storage file upload events. As soon as the file gets uploaded, the ingestion function will download the file, read it, and ingest the messages into Queue. As desired we now have multiple invocations of ingestion functions to work in parallel therefore, we achieved scalability.

Here’s how the final architecture looked like:

We essentially followed a fan-out architecture, where we fan out our workload to multiple Serverless invocations instead of one.

Peace ✌

Persist Private IP in AWS Auto Scaling Group

Recently, we were moving our web application, which was on a single EC2 instance, to a highly available and fault-tolerant architecture. For that, we decide to pre-bake AMI and launch it with in an auto-scaling group and attach it to the target group behind an elastic load balancer. We had another server which was in the same VPC but not in the target group behind ELB. Requirements were to access our web application (single EC2 instance we launched using ASG and attached to ELB) privately within VPC due to HIPAA regulatory compliance.

In other words, we needed a way to persist a private IP to EC2 instance in case of scale in or scale out event. We did some research and came up with the solution to create the instance launch life cycle event. Capture the event using AWS CloudWatch Rules, and use AWS Lambda as a target to attach secondary ENI to new launched EC2 Instance in Auto Scaling Group.

Thanks to AWS knowledge center, we were able to modify their solution as per our needs.

But How Does It Work? 🤔

We needed a way to give secondary ENI description to lambda so that it can attach specific ENI to the newly launched instance. Therefore, we decided to get the description of ENI from EC2 instance tag. For that, we created a launch configuration for the auto scaling group with the tag named Eth1. And, give it a value which in actual is the description of secondary ENI.

1. Create life cycle hook on EC2 instance launch event.

life cycle hook

2. Creating AWS Cloud Watch Rule with target as a lambda.

Event pattern with Lambda as a Target

Here is the code to Lambda

Feel free to contribute and change as per your needs.