A simple and dirty batch job scheduler in python

Last week I needed to feed jobs defined in an internal web interface to the computation engine. Below is the code I wrote, which looks at the MySQL table for the defined jobs, and executes a command line code. Can it be done better? let me know.

Let's import the necessary libraries first:

#!/usr/bin/python
import sys, os, time
from time import gmtime, strftime
import string
import MySQLdb as mdb
import atexit
import threading
import subprocess
import shlex

Next, define how to connect to mysql:

# Connect to DB -----------
db_host = "127.0.0.1"
db_db = "test"
db_user = "root"
db_password = "tooth"

con = mdb.connect(db_host, db_user,
        db_password, db_db);

app_path = "/Users/danialt/software/"
db_port = 3306
pr = {}

This is the function that basically manages how the tasks get executed; it asks what is there and then runs it, and waits for it to finish:

def assignTask():
    while(True):

        if (not isAnythingRunning()):
            startingDir = os.getcwd()

            currentRunningJobID = getNextComputeID()

            if currentRunningJobID != None:
                print 'Next job is ' + str(currentRunningJobID)

                JAVA_COMMAND=r"java -Djobid="+str(currentRunningJobID) + " -jar " + str(app_path)+"myapp.jar"
                cmdline = shlex.split(JAVA_COMMAND)


        print 'Running job #' + str(currentRunningJobID)

        devnull = open('/dev/null', 'w')
        pr_ = subprocess.Popen(cmdline, env={'CLASSPATH':str(app_path)}, cwd=str(app_path), bufsize=1)
        pr[(currentRunningJobID)] = pr_

        # check if the job is finished, if not wait
        pr_.wait()


        # wait a bit
        time.sleep(5)

def isAnythingRunning():
    with con:
        cur = con.cursor()
        cur.execute("Select count(job_ID) as runningJobCount from Compute_jobs where status=1")
        c = cur.fetchone()
        return c[0]

def cleanUpTheCancelledProcesses():
    cur = con.cursor()
    cur.execute("Select job_ID, progress from Compute_jobs where status = 3")
    numrows = int(cur.rowcount)

    for i in range(numrows):
        row = cur.fetchone()
        pr__ = (row[i][0].toInt())

def getNextComputeID():
    cur = con.cursor()
    cur.execute("Select job_ID from Compute_jobs where status = 0 order by created_time ASC limit 1")
    row = cur.fetchone()
    if row is not None:
        return row[0]
    else:
        return None

# let the ball roll
if __name__ == "__main__":
    hw_thread = threading.Thread(target = assignTask)
    hw_thread.daemon = True
    hw_thread.start()
    try:
        time.sleep(500000)
    except KeyboardInterrupt:
        print '\nGoodbye!'

The MySQL table looks like this:

mysql> describe Compute_jobs;

+------------------+--------------+------+-----+-------------------+----------------+
| Field            | Type         | Null | Key | Default           | Extra          |
+------------------+--------------+------+-----+-------------------+----------------+
| job_ID           | int(11)      | NO   | PRI | NULL              | auto_increment |
| name             | varchar(255) | NO   |     |                   |                |
| progress         | int(11)      | NO   |     | 0                 |                |
| result_file_path | text         | NO   |     | NULL              |                |
| created_time     | timestamp    | NO   |     | CURRENT_TIMESTAMP |                |
| started_time     | timestamp    | YES  |     | NULL              |                |
| completed_time   | timestamp    | YES  |     | NULL              |                |
| notes            | text         | YES  |     | NULL              |                |
| status           | tinyint(1)   | NO   |     | 0                 |                |
+------------------+--------------+------+-----+-------------------+----------------+

How it works

  • The web interface defines a job linked to a job_ID
  • The Python code is running on background, wherever as long as it has access to MySQL
  • The Python code picks up the job and executes the app, and waits for it to finish.
  • The app sets the status to 1 (running) and 2 (completed), or 3 (cancelled)
  • The loop starts again.