Queues Messaging (via SQS and other AWS services)
In this article I will explain the common uses of queues and will provide some examples based on AWS Simple Queue Service.
What messaging queues are?
“Queue” is a kind of middleware, an intermediate storage, to which producers may submit requests (“messages”) for processing, and then consumers may retrieve and process these requests asynchronously (e.g. later at some point of time). There are many queue solutions over there, but all of them share the same similar concepts.
Basic Concepts
“Queue” is a standalone component, providing an APIs for publishing, receiving and acknowledging messages. On one side of a queue, multiple producers may publish messages into queue via “publish” API. On other side, consumers constantly poll queue for incoming messages in endless loop using “receive” API (or being triggered by incoming messages in case of serverless functions, such as AWS Lambda), process received message and “acknowledge” it in case of successful processing.
Such indirect pattern of communication enables the following benefits:
Load Balancing
In order to enable load balancing in simple “direct” communication (such as REST calls), a Load Balancer component of some kind should be deployed in front of consumers, responsible for requests distribution.
While in queue messaging, load balancing received kind of naturally (without need for a load balancer) since each of consumers receives messages by constantly polling queue.
High Availability
In microservices systems, services call each other in order to accomplish some task. If “direct” communication is in place, then failure of any of services in chain will fail the entire request.
System which make use of queue messaging are much highly available, since each request is just enqueued inside a queue and therefore will always succeed as long as queue available — even if all of consumers or/and any of their dependencies are unavailable at that moment. Messages inside queue will be processed somewhen later — asynchronously — when consumers will become available back again.
This is a good place to explain about “Invisibility Window” concept: when some of consumers “receives” a message from queue, message not actually removed from queue; rather it becomes invisible for all other consumers for predefined duration — which is an Invisibility Window. Only if message successfully processed — then consumer acknowledges that message (“deletes” in SQS) and only then it will be actually deleted from a queue. But if consumer will fail to process a message for some reason (for example, because it was crashed) — then after Invisibility Window expiration the message will become visible back again for other consumers, and they will have an opportunity to receive and process that message once again.
Sometimes message processing may take longer than initially defined Invisibility Window, for legitimate reasons. In such case message will become visible again, other consumer will receive it and we will end with same message processed twice! Solution to this case would be to monitor message processing time, and to extend Invisibility Window if needed. Usually queues provide some kind of dedicated API which consumers may use to extend that invisibility window while processing is in progress (in SQS you use ChangeMessageVisibility API for that). If you want to learn more about Invisibility Window concept for SQS, here is a nice explanation.
However, usually we don’t want messages which failed to be processed over and over again, to circulate in the system forever. The solution to this problem is a “Dead Letters Queue” (DLQ): this is a special queue, to which messages moved automatically based on provided policy. For example, in SQS you may define a policy which says: “if specific message received 5 times and yet not deleted — move it to specified Dead Letters Queue”. For SQS, you can read more about DLQ concept here.
A good practice is to closely monitor Dead Letter Queues, since usually every message arrives to there meaning a processing failure and probably requires a special attention. In AWS, you can define a CloudWatch alerts to get alerted when messages arrive into DLQ.
Scalability
Contrary to “direct” communication systems, where each new consumers must be registered with load balancer, scaling in queue systems is very easy — just start new consumer process and it will start polling queue for new messages. In order to optimize your system running costs, you can scale independently number of running producers and consumers (possibly based on different system metrics and using different machine sizes) — all according to your system needs (I explaining more about this in next section).
“Deflating” system load
Consider the following example: you are running in cloud a system composed of web services, each service able to receive requests over a REST endpoint. Processing each request takes in average 1 second. Which means if you have 10 service instances — throughput of your system is limited to about 10 requests per second.
So far your system receives up to 10 requests per second — everything is good. But what if for some reason an unexpected peak in activity will occur and frequency of the received requests will increase to, say, 50 requests per second? Without any steps taken, your system very fast will drain all available threads and eventually will became unavailable.
One possible solution is to implement some kind of scaling out, and (dynamically) increase amount of running service instances. But there are few issues with this approach:
- Starting new service instance may take some time. Until all new instances will spin up — your system will remain unavailable and some requests will get lost. You can always run several extra service instances — but then you will constantly spend unnecessary expenses and still not completely mitigate this risk, since activity peak may be just too high and these extra instances you have may be not enough.
- What if load of your system increases to 100 requests per second? 1000?Are you going to run 1000 service instances to handle all these requests? Such approach probably will be not optimal, since these processing machines usually not cheap. And many times it will be simply not possible, because your services may communicate with other system components (such as other services, databases and so on ) — and even if you will be able to scale on your end — next link in the chain will became a bottleneck and whole system still will became unavailable.
- After activity peak ends and system load backs to normal, you would like to kill all unneeded service instances (because you don’t want to pay money for idle resources). How you decide when to kill them? Is it immediately after system load reduces, or you will give it some time to keep running, just to be sure? In any case, probably it will be unreal to have at any given moment the exact needed amount of running services — and you will leak money of these transitions.
An alternative approach would be to use a queue. Instead of trying to process request at the time it arrives (synchronously), request will be submitted to queue and processed later, asynchronously. In this approach, you decouple between your exposed API (which receives a request) and the actual request processors (“consumers”):
- API handler (this is our “processor”) receives a message and submits it into queue. Submitting message to queue is a easy task, and usually takes just a tens of milliseconds. Usually queues provide a high rate of throughput (SQS provides nearly unlimited throughput), so you can keep submit messages at any desired rate.
- Desired number of consumers constantly pull and process messages from queue at constant rate.
- Queue serves as kind of buffer between API and consumers, thus “deflating” system load and keeping consumers (and the whole system behind) unaffected by activity peaks.
Ideal candidate for API handlers implementation is an AWS Lambda. Lambda is scalable by definition (each web request triggers new lambda instance) and able to scale to any desired level. So, this way you can implement a tiny lambda which all is doing is pushing a received message into a queue (plus maybe to perform some preliminary checks before). You could handle any kind of activity peaks, your system will remain always available — and you will pay only for what you actually used! Simple and effective.
As for consumers, I would suggest to run a number of AWS Fargate instances. You could always control number of running consumers, for example based on number of pending messages in a queue. I suggest to choose for consumers small and cheap machine types — usually 2 CPUs is enough. Having such consumer units will allow a more granular control over scaling (better to have many small and cheap boxes than few large and expensive ones).
How to choose which queue to use?
You choose the one which best fits your use-cases. Usually queues differ by the following properties:
- Durability: some queues are “in-memory” storages while other persist each message in some kind of a file system. For example, SQS persists each message in storage, so even if SQS will get down — messages will not get lost.
- Management: some queues intended to be deployed and managed by your own; while others (such as SQS) are managed services. Which means you don’t need to worry about deployment and administration — someone else do that for you — you just consume that service. On the paper, running managed services is more expensive to run (10%–30% more), comparing to “deploy and manage on your own” solutions. However, when considering the scale and availability levels required by today’s SaaS solutions and all the needed administration of such systems — consuming managed solutions actually gets much cheaper than running them on your own. And all that before considering the whole cloud eco-system integrated around. In my opinion, unless there are some legacy or maybe regulation reasons — running a self-managed queue solution for production systems usually is a bad idea.
- Queue features: while all queues share the same similar concepts, they do differ by features they offer. SQS provides its own set of features, such as: FIFO, server side encryption, integration with other AWS services (such as SNS and Lambda) and more.
- Delivery guarantees: each queue type provides different guarantees about delivering messages. Usually they result in different tradeoffs for queue throughput and availability. For example, “standard” SQS guarantees “at least once” message delivery (which means at some rare occasions same message may be delivered more than once. There are means to overcome this potential issue, we will discuss them shortly), while “FIFO” SQS guarantees “exactly once processing”.
- Queue clients: queue provides an API, which may be consumed via dedicated clients. You should choose the one which supports programming language(s) of your components.
Common patterns
Here are two common messaging patterns, which contain some of fundamental “building blocks” which you can break apart and combine differently according to your use-cases. I tried to keep things as simple as possible for exampling purpose, but obviously for real production systems you might be needed to add additional details.
Publish / Subscribe
Probably, one of the most common messaging patterns: imagine you have a service which needs to notify about something other (multiple) services in the system. Implementing such notification system in “direct” communication systems, usually means source service needs to send to each of destination services a notification, one by one. This approach has several considerable drawbacks:
- Such approach means source service needs to “know” all its subscribers. This produces coupling. If at some point later a new service will need to subscribe for notifications — source service will need to be aware about it.
- Sending notifications one-by-one is not scalable approach. Consider your service has thousand of subscribers. Sending notifications one by one is a time may require some time. What if during sending notifications, one of the services is not available?
Using queue in this situation is a classical approach to solve these drawbacks. Many of queues have a “topic” subscription functionality, where consumers may subscribe to specific “topic” and to receive messages sent to it. In AWS, in order to achieve such functionality, SQS needs to be combined with another service — SNS:
In the pattern above, “source” service (producer) publishes message into specific topic. Each of “destination” services has its own queue, which subscribed to that topic and receives a copy of published message. After message arrives to the queue — its consumers receive and process it (in the way as described in previous sections).
Advantages of this approach are:
- Producer is not aware about its subscribers. At any given moment new subscribers may subscribe to desired topic — without affecting rest of the system.
- Sending notification process is not affected by number of subscribers.
- System is highly available — even if some or even all of consumers are not available — publishing notification will still succeed.
Distributed Workflow
Imagine you have a SaaS multi-tenant system, and on tenant registration different micro-services in the system need to perform different tasks related to tenant creation (e.g. allocate resources, setup databases, send emails to administrators and so on). Order of actions is important — for example, system needs to send email to administrators only after database initialized. And most important — you need to know when all needed tasks had been completed and tenant is ready for use.
Implementing such system using “direct” communication is very tricky:
- Waiting on pending requests while tasks will be completed will easily bring you to timeouts, if operation takes longer than configured timeout (which is usually less than 30 seconds) to complete. Also keeping thread(s) waiting for requests to complete is not scalable approach and will quickly drain your service out of threads in case of frequent requests. Polling services for completion is not scalable approach for the same reason.
- If any of services will fail to handle request or will be unavailable in the time of request— entire request will fail and system state may become corrupted (because of partial tenant creation).
One possible approach to make things better is to use messaging in order to asynchronously communicate between services:
This is how it proposed to work:
“Workflow” is a list of actors, which supposed to perform needed actions in the order of appearance. In this list should appear queue address of each actor (Queue URL in SQS), in desired order.
- Initially, when services are just deployed, they use a Workflow Service (I not aware about such service out of shelve, but its fairly simple to implement it on your own) to subscribe to desired workflow (e.g. “TenantCreation”). As part of subscription, each service provides address of its queue. During subscription they may request to subscribe themselves after or before some other service, if particular order of execution is required.
- When new tenant has to be created, Flow Initiator (probably some kind of frontend or API) fetches from Workflow Service a workflow for tenant creation. Flow Initiator adds itself as a last in the list, and then sends it to the first queue address appears in the workflow list (in our example — “Queue 1”.
- One of the Queue 1 consumers receives a message, handles it, and then sends it to the next subscriber in list (in our example “Queue 2”).
- After all subscribers listed in the workflow did their part, eventually workflow sent to the last queue in the list — queue of the Flow Initiator.
- When Flow Initiator receives a workflow message (and it listed as a last one), it finalizes the flow (in our example — marks tenant as ready to use). All done!
Common Pitfalls and Best Practices
Metadata
Almost all queues (including SQS) allow to pass message attributes along with the message itself, and this is the best way to pass message metadata, such as message type, message version (explained in next section) and others. Always pass message metadata over the attributes (and not as part of message body), since this will allow you to make decisions before processing message body.
Versioning
Over the time things are changing, and structure of your messages will change. Since messages processed asynchronously, in period of transition you might find yourself having in queue both “new” and “old” messages. The best way to handle migrations without service interruptions is simply apply “message version” to each message.
In case of change, your deployment strategy without a downtime should be as the following:
- Deploy new version of your consumers, able to handle both “old” and “new” message versions.
- Deploy new version of producers, producing messages of “new” version.
- At some point later, when all “old” messages had been processed, deprecate “old” message version and deploy consumers able to handle only “new” messages.
Tracing
Troubleshooting messaging in large systems is a very complex task, since things happen asynchronously, in different system parts and might be out of order.
Logging is a your best friend here. I recommend you to pass (as part of message metadata) and log everywhere a FlowId, similar to as described in my other article.
Large Messages
Most of queues limit max message size to some relatively small value, usually up to 1MB (SQS limit is 256KB). If you have to pass larger messages (which considered as a bad practice, but sometimes there is no other choice) then the solution is to store message body in some temporary storage and inside message to pass only a link to it.
The flow in such situation is as the following:
- Producer stores message body in some intermediate storage.
- Producer sends publishes message into queue, but instead of actual message body it sends a link to stored object.
- Consumer receives a message, realizes that instead of actual message body there is a link (possibly, by using a dedicated message attribute).
- Consumer downloads a message from provided link and deletes it from storage on successful processing (or, depends on storage, stored object has some expiration and auto-deletes).
In SQS you can use an Extended Client, which doing all this for you (its using S3 as an intermediate storage).
Idempotency
Depending on the queue guaranties, in some (rare) occasions same message may be received more than once. Best way to handle such situation is do design your consumers to be idempotent — which means same operation executed more than once will produce the same result.
Example: let’s assume that in case “delete item” message received — consumer needs to delete record from database. If same “delete item” message will be received twice, on the second time record already will be not present in the database. Idempotent handling in such situation will be execute operation “delete if exists”, and acknowledge message even if no “delete” operation was actually performed.
Sometimes, in order to achieve idempotency a tracking of received messages will be required. One way to do that is to calculate hash (e.g. SHA256) of every received message and then query some centralized storage (Redis or AWS Elasticache would be ideal for that) if such message processed before. If such hash already exists in storage — acknowledge message and do nothing; otherwise process message, acknowledge it and then store message hash in the storage. This way you will keep tracking all received and processed messages, and will not process same message twice. SQS FIFO queues offer such functionality out of box, via message deduplication.
Error Handling
As we discussed above, in case message not being acknowledged during specified Invisibility Window, it will became visible back again and processed once more. Sometimes you want it, but some times you don’t. You need to carefully analyze possible exceptions which may be thrown during processing and decide. For example, for temporarily environment errors such out of memory, network issue and such — you definitely would like to propagate exception out and give a chance to process message once again. However, for permanent errors, where trying once again makes no sense — you have to acknowledge message in order to prevent additional retries.
Inside consumer queue polling loop, process each message in a separate try/catch block.
Wrap all queue API operations in a try/catch block.
Security
It’s crucial to properly secure your queue, since yours (possibly sensitive) traffic flows there. Each queue provides different security measures. As for SQS, you have few alternatives:
Authorization
As with other AWS resources, you have a standard IAM role based approach, using which you can granularly provide access to your SQS. You can even provide secure access to to different AWS accounts!
Imagine you have two separate AWS accounts. First one belongs to “consumer” — this is where SQS being defined, consumer receives messages from its queue. And second one belongs to producer — the one which needs to publish messages into consumer’s queue. You need to securely provide “publish” permissions to producer, which located in different AWS account. Obviously, creating a dedicated AIM user in consumer’s account and giving credentials to producer is a bad idea since you will have no control over these credentials and who actually will use them. A better approach is to use a STS AssumeRole mechanics:
- Inside SQS owner account (“consumer”), a dedicated Producer role being created. Inside its policy, an allowed “producer” account id specified. You can control which exactly permissions you allocate for that role (in our example, probably only “publish”).
- Inside “producer” account, calling to AssumeRole with Producer role will issue a temporarily credentials, which may be used in order to call to SQS — according to provided permissions.
This approach is much more secure, since no other account will be able to call “AssumeRole” on that Producer role and consumer has a full control over granted access.
Encryption
SQS supports encryption both at transit (TLS) and at rest — Server Side Encryption. Always nice to have!
Integrity Verification
It is always a good practice to have multiple levels of protection. Not because we want to make things harder for potential attacker, but rather because additional protection levels may help to limit exposure in case of human error (which usually related to misconfiguration).
Such additional level might be message integrity validation:
- Before publishing message, producer calculates hash of the message and then signs (e.g. encrypts) it using a private key. Message signature is being sent as part of message metadata (as message attribute).
- On message receive, consumer calculates message hash on its side as well, decrypts a received hash and compares both of them. This way both message integrity and identity of the sender being verified.
In AWS, you can use a built-in signing functionality of KMS: it allows to sign and verify. Using same “AssumeRole” approach (but this time in reverse direction), a producer may create a dedicated role for consumer with “Verify” permission. When producer will need to sign a hash, it will use its KMS to make a signature. When consumer will need to verify signature, it will use producer’s KMS (since there stored both private and public keys) and using “AssumeRole” will make the verification call.
A word of caution: once ago, I had a part in implementing such message signature verification process between two systems. Both systems, producer and consumer, were using Java 11. On both sides we used exactly the same algorithm in order to calculate the hash, but for some reasons we were producing different hashes and hash verification was failing. It took us some time to discover that different JVM implementations calculate hashes differently, even if same JAVA versions and same hash algorithm were used (crazy but true)! Eventually we do found hashing algorithm which was always producing the same result regardless of JVM vendor. I not sure now which one it was, I think it was MD5 algorithm. Check it carefully!
Conclusion
Queues are a complex subject and many things can easily go wrong. However, for some use-cases there is no better than using a messaging queue. I hope information I shared in this article will help you to understand better how queues work in general and to avoid potential pitfalls.
If you liked this article — please “clap” below. Thank you.