Photo by Matthew Fournier on Unsplash
Building a Distributed Job Scheduler from Scratch (Part 3)
A multipart hands on series about building a distributed job scheduler from scratch.
Recap
Welcome back to the third part of our tutorial series on building a distributed job scheduler! In our previous installment, we deep-dived into our storage system by designing a durable storage system to store job details effectively. Now it's time to model repeated jobs and handle job executions.
Modeling Job Execution
Recapping the discussion from the first part of the series
Our platform should support three types of jobs:
Once - These jobs need to be scheduled only once at a specified date and time, such as scheduling a job for August 1, 2023, at 23:00.
Repeated - Repeated jobs occur within a defined date range, with a specified time interval between each occurrence. For example, scheduling a job to run every 30 minutes between August 1 and August 31, 2023.
Recurring - Recurring jobs are scheduled for specific dates and times, e.g., on August 1, 2023, at 16:00, and on August 5, 2023, at 12:30.
This indicates that there is a one-to-many relationship between jobs and the way they are executed.
Introducing the Plan Class
Since a job can be executed multiple times, we need to introduce another model to track the execution history of a job.
Whenever a job is scheduled, we will insert a Plan
corresponding to the Job
in the Plan
Table. We will monitor the Plan
table continuously to fetch eligible Plans
. Once a Plan
is fetched, we will fetch the corresponding Job
details too, and generate the next Plan
e.g. in case of a recurring Job scheduled on August 1 and on August 5 - Initially, the Plan with the expected execution on August 1 will be created. Once that plan is executed, we will generate the next plan with expected execution on August 5 will be created. This is known as lazy evaluation
and will prevent unnecessary insertions into the Plans table in case of a repeated job that runs for years.
public class Plan implements Serializable {
String planId;
String jobId;
LocalDateTime expectedExecutionTime;
}
Updating the JobDAO Class
JobDAO class needs to be updated to account for the creation and storage of newly created Plan
table. getJobDetails()
has also been updated to return all the PlanIDs
mapped with the provided JobId
.
public void registerJob(Job job) throws IOException {
// Create and store the Job in HBase
byte[] row = Bytes.toBytes(job.getId());
Put put = new Put(row);
put.addColumn(columnFamily.getBytes(), data.getBytes(), SerializationUtils.serialize(job));
hBaseManager.put(table, put);
// Create and store the Plan in HBase
Plan plan = planDAO.storePlan(job);
PlanIDs planIDs = PlanIDs.builder()
.plans(List.of(plan.getPlanId()))
.build();
Put jobPlanPut = new Put(row);
jobPlanPut.addColumn(columnFamily.getBytes(), plans.getBytes(), SerializationUtils.serialize(planIDs));
hBaseManager.put(table, jobPlanPut);
log.info("Generated JobId {} and PlanID {}", job.getId(), plan.getPlanId());
}
@VisibleForTesting
public JobDetails getJobDetails(String id) throws IOException {
Result result = hBaseManager.get(table, id);
byte[] jobValue = result.getValue(columnFamily.getBytes(), data.getBytes());
Job job = (Job) SerializationUtils.deserialize(jobValue);
byte[] plansMapping = result.getValue(columnFamily.getBytes(), plans.getBytes());
PlanIDs planIDs = (PlanIDs) SerializationUtils.deserialize(plansMapping);
return JobDetails.builder()
.job(job)
.planIDs(planIDs)
.build();
}
Introducing the PlanDAO Class
This will be similar to JobDAO
and will be responsible for dealing with the CRUD operations related to Plan
entity.
public class PlanDAO {
HBaseManager hBaseManager;
String columnFamily = "cf";
String data = "data";
String tableName = "planDetails";
Table table;
public PlanDAO() throws IOException {
hBaseManager = new HBaseManager();
hBaseManager.ensureTable(tableName, columnFamily);
table = hBaseManager.getTable(tableName);
}
public Plan storePlan(Job job) throws IOException {
// Use of VISITOR pattern to extend functionality of an existing class
Plan plan = job.accept(new PlanGenerator());
byte[] row = Bytes.toBytes(plan.getPlanId());
Put value = new Put(row);
value.addColumn(columnFamily.getBytes(), data.getBytes(), SerializationUtils.serialize(plan));
hBaseManager.put(table, value);
return plan;
}
public Plan getPlanDetails(String planId) throws IOException {
Result result = hBaseManager.get(table, planId);
byte[] value = result.getValue(columnFamily.getBytes(), data.getBytes());
return (Plan) SerializationUtils.deserialize(value);
}
}
Visitor Pattern in Real Life
If you are with me so far, you must have noticed a sneaky new class called PlanGenerator
.
What does it do? Based on the type of Job
, it returns a Plan
.
Why do we need a visitor? If we had not gone with this approach, then either we would have to use if/else
logic to generate a Plan
based on the instance of Job object or use Strategy
pattern to generate a Plan
.
public interface JobVisitor<T> {
T visit(ExactlyOnceJob job);
T visit(RecurringJob job);
T visit(RepeatedJob job);
}
public class PlanGenerator implements JobVisitor<Plan> {
@Override
public Plan visit(ExactlyOnceJob job) {
return Plan.builder()
.jobId(job.getId())
.planId(getRandomID())
.expectedExecutionTime(job.getDateTime())
.build();
}
@Override
public Plan visit(RecurringJob job) {
return Plan.builder()
.jobId(job.getId())
.planId(getRandomID())
.expectedExecutionTime(job.getDateTimes().first())
.build();
}
@Override
public Plan visit(RepeatedJob job) {
return Plan.builder()
.jobId(job.getId())
.planId(getRandomID())
.expectedExecutionTime(getNextExecutionTime(job.getStartTime(), job.getRepeatIntervalTimeUnit(), job.getRepeatInterval()))
.build();
}
private String getRandomID() {
return UUID.randomUUID().toString();
}
private LocalDateTime getNextExecutionTime(LocalDateTime start, TemporalUnit repeatIntervalTimeUnit, long repeatInterval) {
return start.plus(repeatInterval, repeatIntervalTimeUnit);
}
}
Tests
Finally, the tests to verify the integration.
public class JobDAOTest {
JobDAO jobDAO;
PlanDAO planDAO;
@Before
public void setUp() throws IOException {
jobDAO = new JobDAO();
planDAO = new PlanDAO();
}
@Test
public void testRegisterJob() throws IOException {
JobDAO jobDAO = new JobDAO();
String jobId = UUID.randomUUID().toString();
ExactlyOnceJob exactlyOnceJob = ExactlyOnceJob.builder()
.id(jobId)
.callbackUrl("http://localhost:8080/test")
.successStatusCode(500)
.build();
Assertions.assertDoesNotThrow(() -> jobDAO.registerJob(exactlyOnceJob));
JobDetails jobDetails = jobDAO.getJobDetails(jobId);
Assertions.assertTrue(exactlyOnceJob.equals(jobDetails.getJob()));
PlanIDs planIDs = jobDetails.getPlanIDs();
Assertions.assertNotNull(planIDs);
String planId = planIDs.getPlans().get(0);
Plan plan = planDAO.getPlanDetails(planId);
Assertions.assertEquals(planId, plan.getPlanId());
Assertions.assertEquals(jobId, plan.getJobId());
}
}
Appendix
Project Structure
Conclusion
Congratulations! In this third part of our tutorial series, we've made significant progress. We modeled multiple executions of the same job, implemented the necessary code, used Strategy pattern in real life and validated it through test cases. But the journey doesn't end here. In the next installment, which we'll cover in part 4, we'll delve into an even more complicated part - identifying which Plans to execute and executing them. Do take a pause and think about the various challenges that can come in fetching Plans from the Plan table. Stay tuned for more exciting insights!