Wednesday, 22 September 2010

Improved QueueHandler, QueueListener: dealing with handlers that block

tl; dr: Sometimes you have to get your logging handlers to do their work without blocking the thread you’re logging from. This post discusses one way in which you can do that, and presents a new QueueListener class as well as some improvements in the QueueHandler class discussed in earlier posts. A working script is provided so you can play with these ideas yourself.

Sometimes, you’ll work with code that runs in threads which must do their processing quickly. This is common in Web applications, though of course it also occurs in other scenarios. You may well need to log messages from those threads; however, if you are a library developer, you typically won’t have control over which handlers are configured by the running application. What happens if some of the configured handlers are ones that potentially will block for longish periods of time, causing delays which hold the threads up longer than is acceptable?

The most common culprit which demonstrates sluggish behaviour is the SMTPHandler: sending emails can take a long time, for a number of reasons outside the developer’s control (for example, a poorly performing mail or network infrastructure). But almost any network-based handler can block: Even a SocketHandler operation may do a DNS query under the hood which is too slow (and this query can be deep in the socket library code, below the Python layer, and outside your control).

One answer which pushes itself to the fore is to arrange things so that the time-consuming operation happens on a separate thread. This has been suggested to me once or twice, and suggestions have been made to perhaps provide threaded versions of the existing Handler classes or incorporate threading into some of the existing classes. That didn’t sound like a great idea to me, and for one reason or another I haven’t been able to address it until now.

I recently posted about an addition to Python 3.2 of a QueueHandler class, the addition of which came about as a result of Mike Bayer of SQLAlchemy fame pointing me to a Stack Overflow answer he had given, to a question about using logging with multiprocessing.

Mike’s solution was a specialized handler for use with multiprocessing which delegated to a RotatingFileHandler, which worked for his use case but was not general enough to put in the stdlib. So I came up with QueueHandler, which works not only with multiprocessing queues but also in-process thread-safe queues as implemented in the Queue module (renamed to queue in more recent Pythons). QueueHandler is also easy to subclass, for example to send logging events to a ZeroMQ socket, as described in this post.
QueueHandler also forms the basis of dealing with handlers that block. Before we look at one way to solve that problem, I should mention that the QueueHandler implementation which will be in 3.2 got a slight improvement: a new prepare method was added and the emit method was modified to use it. The docstrings give the reason for this minor refactoring:

    def prepare(self, record):
        """
        Prepares a record for queuing. The object returned by this
        method is enqueued.
        
        The base implementation formats the record to merge the message
        and arguments, and removes unpickleable items from the record
        in-place.
        
        You might want to override this method if you want to convert
        the record to a dict or JSON string, or send a modified copy
        of the record while leaving the original intact.
        """
        # The format operation gets traceback text into record.exc_text
        # (if there's exception data), and also puts the message into
        # record.message. We can then use this to replace the original
        # msg + args, as these might be unpickleable. We also zap the
        # exc_info attribute, as it's no longer needed and, if not None,
        # will typically not be pickleable.
        self.format(record)
        record.msg = record.message
        record.args = None
        record.exc_info = None
        return record

    def emit(self, record):
        """
        Emit a record.

        Writes the LogRecord to the queue, preparing it first.
        """
        try:
            self.enqueue(self.prepare(record))
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

The whole class is to be found here.

Moving on to how to solve the problem of blocking handlers, the answer is in part to attach only a QueueHandler to those loggers which are accessed from performance-critical threads. They simply write to their queue, which can be sized to a large enough capacity or initialized with no upper bound to their size. The write to the queue will typically be accepted quickly, though you will probably need to catch the queue.Full exception as a precaution in your code. If you are a library developer who has performance-critical threads in their code, be sure to document this (together with a suggestion to attach only QueueHandlers to your loggers) for the benefit of other developers who will use your code.

The second part of the solution is QueueListener, which has been designed as the counterpart to QueueHandler.  A QueueListener is very simple: it’s passed a queue and a handler, and it fires up an internal thread which listens to its queue for LogRecords sent from QueueHandlers (or any other source of LogRecords, for that matter). It should be relatively easy to subclass QueueListener to listen using other types of queue – for example, ZeroMQ sockets – though we won’t cover that in this post (it’ll be using a similar approach as was described here for our QueueHandler subclass ZeroMQSocketHandler). Here’s QueueListener:


import threading

class QueueListener(object):

    _sentinel = None
    
    def __init__(self, queue, handler):
        """
        Initialise an instance with the specified queue and
        handler.
        """
        self.queue = queue
        self.handler = handler
        self._stop = threading.Event()
        self._thread = None

    def dequeue(self, block):
        """
        Dequeue a record and return it, optionally blocking.

        The base implementation uses get. You may want to override this method
        if you want to use timeouts or work with custom queue implementations.
        """
        return self.queue.get(block)

    def start(self):
        """
        Start the listener.
        
        This starts up a background thread to monitor the queue for
        LogRecords to process.
        """
        self._thread = t = threading.Thread(target=self._monitor)
        t.setDaemon(True)
        t.start()

    def _monitor(self):
        """
        Monitor the queue for records, and ask the handler
        to deal with them.

        This method runs on a separate, internal thread.
        The thread will terminate if it sees a sentinel object in the queue.
        """
        q = self.queue
        has_task_done = hasattr(q, 'task_done')
        while not self._stop.isSet():
            try:
                record = self.dequeue(True)
                if record is self._sentinel:
                    break
                self.handler.handle(record)
                if has_task_done:
                    q.task_done()
            except queue.Empty:
                pass
        # There might still be records in the queue.
        while True:
            try:
                record = self.dequeue(False)
                if record is self._sentinel:
                    break
                self.handler.handle(record)
                if has_task_done:
                    q.task_done()
            except queue.Empty:
                break

    def stop(self):
        """
        Stop the listener.
        
        This asks the thread to terminate, and then waits for it to do so.
        """
        self._stop.set()
        self.queue.put_nowait(self._sentinel)
        self._thread.join()
        self._thread = None


Since the queue should only ever get LogRecords put into it, it seems reasonable to use None as a sentinel to terminate the thread. Of course, you can set a different sentinel if you wish.

The advantage of having a separate QueueListener class is that you can use the same instance to service multiple QueueHandlers. This is more resource-friendly than, say, having threaded versions of the existing handler classes, which would eat up one thread per handler for no particular benefit.

Here’s a simple snippet showing how to use QueueHandler and QueueListener together:

def main():
    q = queue.Queue(-1)
    qh = QueueHandler(q)
    h = logging.StreamHandler()
    ql = QueueListener(q, h)
    root = logging.getLogger()
    root.addHandler(qh)
    ql.start()
    f = logging.Formatter('%(threadName)s: %(message)s')
    h.setFormatter(f)
    # The log output will display the thread which generated
    # the event (the main thread) rather than the internal
    # thread which monitors the internal queue. This is what
    # you want to happen.
    root.warning('Look out!')
    ql.stop()

if __name__ == '__main__':
    main()


This should be self-explanatory, but of course please feel free to post a comment if you need clarification of anything.

The whole script is here, and if you run it you should see something like the following:

MainThread: Look out!

Notice that QueueListener is not even especially logging-specific: You can pass it as a handler any object that has a handle method which takes a single argument, and that method will be passed any non-sentinel object which appears on the queue.

You should be able to paste QueueHandler and QueueListener into your own code, as you may well be using Python versions earlier than 3.2 :-)

The plan is to add QueueListener to logging.handlers, so before 3.2 enters beta, I’d be grateful for any comments or suggestions you have about any of this stuff.

No comments:

Post a Comment