Building a Distributed Job Scheduler from Scratch (Part 3)

A multipart hands on series about building a distributed job scheduler from scratch.

Recap

Welcome back to the third part of our tutorial series on building a distributed job scheduler! In our previous installment, we deep-dived into our storage system by designing a durable storage system to store job details effectively. Now it's time to model repeated jobs and handle job executions.

Modeling Job Execution

Recapping the discussion from the first part of the series

Our platform should support three types of jobs:

  • Once - These jobs need to be scheduled only once at a specified date and time, such as scheduling a job for August 1, 2023, at 23:00.

  • Repeated - Repeated jobs occur within a defined date range, with a specified time interval between each occurrence. For example, scheduling a job to run every 30 minutes between August 1 and August 31, 2023.

  • Recurring - Recurring jobs are scheduled for specific dates and times, e.g., on August 1, 2023, at 16:00, and on August 5, 2023, at 12:30.

This indicates that there is a one-to-many relationship between jobs and the way they are executed.

Introducing the Plan Class

Since a job can be executed multiple times, we need to introduce another model to track the execution history of a job.

Whenever a job is scheduled, we will insert a Plan corresponding to the Job in the Plan Table. We will monitor the Plan table continuously to fetch eligible Plans. Once a Plan is fetched, we will fetch the corresponding Job details too, and generate the next Plan e.g. in case of a recurring Job scheduled on August 1 and on August 5 - Initially, the Plan with the expected execution on August 1 will be created. Once that plan is executed, we will generate the next plan with expected execution on August 5 will be created. This is known as lazy evaluation and will prevent unnecessary insertions into the Plans table in case of a repeated job that runs for years.

public class Plan implements Serializable {
    String planId;
    String jobId;
    LocalDateTime expectedExecutionTime;
}

Updating the JobDAO Class

JobDAO class needs to be updated to account for the creation and storage of newly created Plan table. getJobDetails() has also been updated to return all the PlanIDs mapped with the provided JobId.

public void registerJob(Job job) throws IOException {
    // Create and store the Job in HBase
    byte[] row = Bytes.toBytes(job.getId());
    Put put = new Put(row);
    put.addColumn(columnFamily.getBytes(), data.getBytes(), SerializationUtils.serialize(job));
    hBaseManager.put(table, put);

    // Create and store the Plan in HBase
    Plan plan = planDAO.storePlan(job);
    PlanIDs planIDs = PlanIDs.builder()
            .plans(List.of(plan.getPlanId()))
            .build();

    Put jobPlanPut = new Put(row);
    jobPlanPut.addColumn(columnFamily.getBytes(), plans.getBytes(), SerializationUtils.serialize(planIDs));
    hBaseManager.put(table, jobPlanPut);
    log.info("Generated JobId {} and PlanID {}", job.getId(), plan.getPlanId());
}

@VisibleForTesting
public JobDetails getJobDetails(String id) throws IOException {
    Result result = hBaseManager.get(table, id);

    byte[] jobValue = result.getValue(columnFamily.getBytes(), data.getBytes());
    Job job = (Job) SerializationUtils.deserialize(jobValue);

    byte[] plansMapping = result.getValue(columnFamily.getBytes(), plans.getBytes());
    PlanIDs planIDs = (PlanIDs) SerializationUtils.deserialize(plansMapping);

    return JobDetails.builder()
            .job(job)
            .planIDs(planIDs)
            .build();
}

Introducing the PlanDAO Class

This will be similar to JobDAO and will be responsible for dealing with the CRUD operations related to Plan entity.

public class PlanDAO {
    HBaseManager hBaseManager;
    String columnFamily = "cf";
    String data = "data";
    String tableName = "planDetails";
    Table table;

    public PlanDAO() throws IOException {
        hBaseManager = new HBaseManager();
        hBaseManager.ensureTable(tableName, columnFamily);
        table = hBaseManager.getTable(tableName);
    }

    public Plan storePlan(Job job) throws IOException {
        // Use of VISITOR pattern to extend functionality of an existing class
        Plan plan = job.accept(new PlanGenerator());

        byte[] row = Bytes.toBytes(plan.getPlanId());
        Put value = new Put(row);
        value.addColumn(columnFamily.getBytes(), data.getBytes(), SerializationUtils.serialize(plan));
        hBaseManager.put(table, value);
        return plan;
    }

    public Plan getPlanDetails(String planId) throws IOException {
        Result result = hBaseManager.get(table, planId);
        byte[] value = result.getValue(columnFamily.getBytes(), data.getBytes());
        return (Plan) SerializationUtils.deserialize(value);
    }
}

Visitor Pattern in Real Life

If you are with me so far, you must have noticed a sneaky new class called PlanGenerator.

What does it do? Based on the type of Job, it returns a Plan.

Why do we need a visitor? If we had not gone with this approach, then either we would have to use if/else logic to generate a Plan based on the instance of Job object or use Strategy pattern to generate a Plan.

public interface JobVisitor<T> {
    T visit(ExactlyOnceJob job);

    T visit(RecurringJob job);

    T visit(RepeatedJob job);
}

public class PlanGenerator implements JobVisitor<Plan> {
    @Override
    public Plan visit(ExactlyOnceJob job) {
        return Plan.builder()
                .jobId(job.getId())
                .planId(getRandomID())
                .expectedExecutionTime(job.getDateTime())
                .build();
    }

    @Override
    public Plan visit(RecurringJob job) {
        return Plan.builder()
                .jobId(job.getId())
                .planId(getRandomID())
                .expectedExecutionTime(job.getDateTimes().first())
                .build();
    }

    @Override
    public Plan visit(RepeatedJob job) {
        return Plan.builder()
                .jobId(job.getId())
                .planId(getRandomID())
                .expectedExecutionTime(getNextExecutionTime(job.getStartTime(), job.getRepeatIntervalTimeUnit(), job.getRepeatInterval()))
                .build();
    }

    private String getRandomID() {
        return UUID.randomUUID().toString();
    }

    private LocalDateTime getNextExecutionTime(LocalDateTime start, TemporalUnit repeatIntervalTimeUnit, long repeatInterval) {
        return start.plus(repeatInterval, repeatIntervalTimeUnit);
    }
}

Tests

Finally, the tests to verify the integration.

public class JobDAOTest {

    JobDAO jobDAO;
    PlanDAO planDAO;

    @Before
    public void setUp() throws IOException {
        jobDAO = new JobDAO();
        planDAO = new PlanDAO();
    }

    @Test
    public void testRegisterJob() throws IOException {
        JobDAO jobDAO = new JobDAO();
        String jobId = UUID.randomUUID().toString();
        ExactlyOnceJob exactlyOnceJob = ExactlyOnceJob.builder()
                .id(jobId)
                .callbackUrl("http://localhost:8080/test")
                .successStatusCode(500)
                .build();

        Assertions.assertDoesNotThrow(() -> jobDAO.registerJob(exactlyOnceJob));
        JobDetails jobDetails = jobDAO.getJobDetails(jobId);
        Assertions.assertTrue(exactlyOnceJob.equals(jobDetails.getJob()));
        PlanIDs planIDs = jobDetails.getPlanIDs();
        Assertions.assertNotNull(planIDs);

        String planId = planIDs.getPlans().get(0);
        Plan plan = planDAO.getPlanDetails(planId);
        Assertions.assertEquals(planId, plan.getPlanId());
        Assertions.assertEquals(jobId, plan.getJobId());
    }
}

Appendix

Project Structure


Conclusion

Congratulations! In this third part of our tutorial series, we've made significant progress. We modeled multiple executions of the same job, implemented the necessary code, used Strategy pattern in real life and validated it through test cases. But the journey doesn't end here. In the next installment, which we'll cover in part 4, we'll delve into an even more complicated part - identifying which Plans to execute and executing them. Do take a pause and think about the various challenges that can come in fetching Plans from the Plan table. Stay tuned for more exciting insights!


References

Did you find this article valuable?

Support Snehasish by becoming a sponsor. Any amount is appreciated!