Deep Dive of the Distributed Job Scheduler that powers over 2 Billion daily jobs at PhonePe

Photo by Agê Barros on Unsplash

Deep Dive of the Distributed Job Scheduler that powers over 2 Billion daily jobs at PhonePe

Architectural deep dive of Clockwork - the in-house distributed job scheduler at PhonePe

While working at PhonePe, I had the privilege of working on Clockwork - the system that powers job scheduling across PhonePe. During an internal knowledge transfer session, I presented an architectural overview of the system to the team. After receiving very positive feedback post the session, I decided to write a blog post for the PhonePe tech blog, explaining the internals of PhonePe to the external audience. This article is the uncut version of what I initially wrote for the blog.

Preface

If you have ever missed your morning flight because your alarm never rang, you understand the importance of reliable alarms.

At PhonePe, we ensure that your alarms always ring at the time you want them to ring. Consider a scenario that happens daily at PhonePe - Merchant Settlements. If a merchant has performed multiple transactions during the day, at the end of the day, we want to ensure the final amount gets credited to their account. Any delay in the jobs getting triggered can cause a delay in the merchant settlement which can lead to a loss of customer trust.

Consider another scenario - Coupons invalidation. If you have received a coupon in PhonePe, you must have noticed that the coupon has a fixed validity. Now multiply this by hundreds of thousands or even millions of coupons. This necessitates a platform that can reliably schedule tasks at a predetermined schedule to invalidate all the coupons.

Patterns similar to this keep presenting themselves across different systems and at incredible scale.

The obvious approach would have been to use embedded schedulers, i.e. a scheduler service that runs within the client application and allows the client to schedule jobs in the future. However, this presents a set of challenges, especially the lack of fault tolerance. What will happen to the scheduled job in case the client application goes down? To solve this, we would need persistence, but it would need a lot of complex coordination across hundreds of stateless containers. Why? Because in most cases, we would not want the same job to be executed by multiple containers. Then there is the issue of scale – we can have situations where hundreds of millions of such tasks can be triggered in the span of a few hours which could result in millions of notifications being generated at constant rates nudging users about coupons nearing expiration.

To ensure coordination amongst containers, we would have to employ complicated strategies like partitioning of jobs and leader-election amongst task executor instances in all services handling these kinds of use cases. While possible, this would add a lot of complexity to the service containers themselves, making Garbage Collection, Thread pool, and auto-scaler tuning extremely difficult due to all containers doing mixed workloads, and adding significantly to the storage layer requirements for them.

As a design principle at PhonePe, we keep individual systems simple and build up complexity in layers, similar to building complex command chains by piping simple commands on Unix or GNU/Linux.

As this seemed like a fairly popular requirement across many systems we decided to build a centralized platform that can allow any clients to onboard and schedule jobs in the future without doing any kind of heavy lifting on their own. In this post, we will take a look at the internals of Clockwork - the system that powers job scheduling across various teams at PhonePe.

Scale

  • Over 2 Billion callbacks are made daily as per the schedule defined.

  • Capability to handle over 100,000 job schedules per second with a single-digit millisecond latency.

  • No lag in the job execution at p99 which in the worst case can extend to 1 minute.

What is Clockwork?

Clockwork is a Distributed, Durable, and Fault-Tolerant Task Scheduler. Wow, that's a handful! Let's dissect it!

  • Task Scheduler - In Linux, a job that needs execution at a specific point in time can be scheduled using the at command. In the case of recurring jobs, crontab can be used. Clockwork was designed based on that ideology, allowing clients to submit jobs that can be executed as per their schedule (once or repeated). Instead of executing arbitrary Java code, we limit it to only providing an HTTP callback to the provided URL endpoint at the specified time duration. Clients can schedule jobs that can be executed once, or at fixed intervals e.g. after every one day or daily at 5 PM.

  • Distributed - To support high throughput of callbacks (100K RPS), we would need a service that can scale horizontally.

  • Durable - Any submitted task is stored in a durable storage allowing Clockwork to recover from failures.

  • Fault Tolerant - Any failure during job execution is handled gracefully per the client configuration. If a client wants the job to be retried, it is automatically retried upon failure as per its retry strategy.

Architecture - 1000-foot view

  • Clients schedule Jobs.

  • Jobs get executed.

  • Clients receive callbacks.

  • Profit 💰

Whenever a client schedules jobs, we store the job details in HBase and immediately send an acknowledgment back to the client.

Asynchronously, Clockwork keeps on performing HBase scans to find the list of eligible jobs that need execution. Once found, those jobs are immediately pushed to RabbitMQ (RMQ) to avoid blocking the scanner threads. As the callback is an HTTP callback to a URL endpoint, it can be time-consuming, hence it's important to decouple the job execution from the job extraction. Actual callbacks to the clients are performed in different threads after extracting the Job details from RMQ.

Worker threads subscribe to client-specific queues on RabbitMQ. In case of new messages, they are notified. Upon notification, a callback is made to the client as per the job details present in the message. If the callback is successful, an acknowledgment is sent to the RMQ which removes the message from the queue (acks the message). In case of a failure during callback (either because the downstream is down or an unexpected response code was received), retries are performed based on client-specific configuration.

Why HBase?

HBase is a key-value store structured as a sparse, distributed, persistent, multidimensional sorted map. This means that each cell is indexed by a RowKey, ColumnKey, and timestamp. Additionally, the rows are sorted by row keys. This allows us to efficiently query for rows by a specific key and run scans based on a start/stop key. We rely heavily on scans to find the candidate set of jobs whose scheduled execution time is less than or equal to the current time and callbacks are to be sent for.

Why RabbitMQ?

RabbitMQ is a messaging broker - an intermediary for messaging. It gives applications a common platform to send/receive messages and provides a durable place to store messages until consumed.

Architecture - Deep Dive

Clockwork service can be divided into 5 modules - each entrusted to perform a single responsibility.

Job Acceptor

Job Acceptor is a Client Facing Module. Its responsibility is to accept and validate the incoming requests from clients, persist the job details in HBase, and return an acknowledgment to the client. While persisting job details, a random Partition ID is assigned to the Job ID. We will cover the role of partitions in the subsequent sections.

Job Extractor

The Job Extractor’s responsibility is to find jobs that are eligible for execution. If the scheduled execution time of a job <= current time, a job becomes eligible. It finds eligible plans by running an HBase scan query between a time range. Once an eligible plan is found, the plans are pushed to RMQ one by one (without waiting for the job execution) to perform the next scan as soon as possible.

Leader Elector

At any point in time, there are multiple instances of Clockwork running. Each instance runs job extractors for all clients as all our containers are stateless. This poses a problem - if all of these extractors are trying to find eligible jobs for the same client at a given point in time, they all will get the same data, which will result in duplicate executions of the same job, something that cannot be allowed by any chance. The leader elector’s responsibility is to assign a leader amongst multiple clockwork instances for every client. The leader for a client assigns the partitions to the workers (extractors) running across different instances of clockwork.

  • During the application startup, the application instance registers itself with Zookeeper with a unique worker ID.

  • It then proceeds to check if there is a leader already elected for a client. If not, it tries to become a leader of a client.

  • The client leader moves on to perform the partition assignment amongst existing clockwork instances (workers).

If you are still reading this article (kudos), you must have heard about the term Partition ID mentioned earlier. Why is it required? To support clients that need a lot of concurrent callbacks. Partitioning allows us to increase the number of concurrent scans that can be performed while ensuring a job only comes up in a single scan. If we have 64 partitions, we can perform 64 concurrent scans, allowing a higher rate of throughput.

But with great power comes great responsibility! We still have to ensure that no two instances scan the same partition. Otherwise, it will lead to double execution of the same job! This is where the partition assignment comes into the picture. The leader instance is responsible for assigning partitions to the workers - in a round-robin manner. This ensures fairness in partition distribution and ensures no two workers read the same partition.

RMQ Publisher

Once the list of eligible plans is fetched and some validations are performed, the messages are pushed to RMQ which acts as our message broker. While publishing the message we use Rate limiter.

Rate Limiter ensures that we don't publish more than what we can consume - otherwise, it can lead to the instability of the RMQ cluster - because of a huge backlog of messages. This is achieved by dynamically pausing scans if the queue size goes to a certain configurable threshold. Once the client can receive callbacks, and the queue size starts reducing, subsequent scans start pushing data into the queues.

We had to redesign this rate limiter to handle spiky traffic. Some of the clients schedule a lot of messages (>100k) that need execution at the same time. This leads to the fast producer - slow consumer problem, as the Job Extractor keeps finding the eligible jobs and enqueues to the RMQ queues so rapidly that the RMQ consumers are unable to catch up - leading to flow control and cluster instability across the RMQ cluster. To combat this, we used the Guava Rate Limiter to limit the publish rate to a limit that we know our 5-node RMQ cluster can handle (~100k consumer acknowledgement per second).

A lot of our workflows are time-sensitive, and delayed callbacks are sometimes not useful. Clockwork provides a way for clients to specify this time limit by setting a relevancy window. If the client is slow in accepting callbacks (slow consumer problem) or the scans were paused/slow during a time interval (slow publisher problem), expired callbacks will not be sent even when the system stabilizes. Besides this, we also support callback sidelining. This provides a way for clients to avoid getting overwhelmed by clockwork callbacks right after they recover.

RMQ Consumer

It listens to the incoming messages in the RMQ Queues and executes them by making an HTTP call to the specific URL endpoint. Any failure while making the call is handled by client-specific retry strategies.

Some clients don't want to retry and just want to drop failed messages, whereas some want to perform retries based on exponential backoff with random jitter. In case the retries are exhausted and the callback still fails, the message is pushed to a Dead-Letter-Queue. It's kept there until the messages are moved back to the main queue or the messages expire.

Conclusion

Clockwork's architecture is tailored and scaled to our specific needs and has enabled us to manage the enormous volume of tasks that PhonePe needs to handle efficiently. This system allows us to offer our customers seamless and uninterrupted service.

As engineers, we know that growth comes with challenges to infrastructure and system quality. At PhonePe, we are constantly seeking solutions to maintain that quality while managing costs and accommodating ever-increasing surges in traffic. Our goal is to share our learnings with the larger engineering community so that we can all learn how to address the challenges of growth and adapt our systems accordingly.

The link to the official blog article can be found here.

Please feel free to ask any questions you might have in the comments.

Did you find this article valuable?

Support Snehasish Roy by becoming a sponsor. Any amount is appreciated!