Simple thread pools

It's easy to write multi-threaded programs with python. I've always done it using a pool of worker threads that get their input from one queue and send their output to another. Another thread uses their output queue as its own input queue and processes the results in some manner, e.g. by inserting it into a database. Here's a basic framework for doing this:

import Queue import threading import time import random class Worker(threading.Thread): """A worker thread.""" def __init__(self, input, output): self._get_job = input.get if output: self._put_job = output.put threading.Thread.__init__(self) def run(self): """Get a job and process it. Stop when there's no more jobs""" while True: job = self._get_job() if job is None: break self._process_job(job) def _process_job(self, job): """Do useful work here.""" time.sleep(random.random()) result = job + 1 self._put_job(result) class Recorder(Worker): def _process_job(self, job): """Override Worker's _process_job method. Just print our input""" print job def main(): NUM_WORKERS = 20 job_queue = Queue.Queue(0) results_queue = Queue.Queue(0) # Create our pool of worker threads for x in range(NUM_WORKERS): Worker(job_queue, results_queue).start() # Create our single recording thread Recorder(results_queue, None).start() # Give the workers some numbers to crunch for x in range(NUM_WORKERS*2): job_queue.put(x) # Insert end of job markers for x in range(NUM_WORKERS): job_queue.put(None) # Wait for all workers to end while threading.activeCount() > 2: time.sleep(0.1) # Tell recording thread it can stop results_queue.put(None) # Wait for recording thread to stop while threading.activeCount() == 2: time.sleep(0.1) if __name__ == '__main__': main()

As you can see, this doesn't do anything useful, but you could easily subclass Worker with its own _process_job function to make it do what you want. Any questions?

