Tuesday, 7 September 2010

Using logging with multiprocessing

There can be a few gotchas when using logging with the multiprocessing module. For example, if you want to write rotated log files from your multi-process application, a naïve implementation might just configure a RotatingFileHandler directly. This could lead to multiple processes trying to write to the file concurrently, which will almost certainly lead to the log getting corrupted because of interleaved writes by different processes.

Note that logging to the same rotated files from multiple threads in a single-process application would be fine; the logging package uses threading locks to ensure that no log corruption occurs. There's no equivalent cross-platform synhronisation for processes in the stdlib, however; that's why you can get corruption with multi-process applications.

To circumvent the problem scenario, you can use a multiprocessing Queue and a listener process which listens for logging events sent to the queue. When it sees these events, it pops them off the queue and processes them; as it's the only process which will write to files directly, there are no contention issues which lead to corruption. The other processes just need to configure a QueueHandler, which will send logging events via the queue to the listener process.

The plan is to add QueueHandler to Python 3.2, but the implementation here is simple enough and should be copy-pastable into your own code for use with earlier Python versions.

The script is fairly well annotated so I'll say no more.

#!/usr/bin/env python
# Copyright (C) 2010 Vinay Sajip. All Rights Reserved.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose and without fee is hereby granted,
# provided that the above copyright notice appear in all copies and that
# both that copyright notice and this permission notice appear in
# supporting documentation, and that the name of Vinay Sajip
# not be used in advertising or publicity pertaining to distribution
# of the software without specific, written prior permission.
# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
"""
An example script showing how to use logging with multiprocessing.

The basic strategy is to set up a listener process which can have any logging
configuration you want - in this example, writing to rotated log files. Because
only the listener process writes to the log files, you don't have file
corruption caused by multiple processes trying to write to the file.

The listener process is initialised with a queue, and waits for logging events
(LogRecords) to appear in the queue. When they do, they are processed according
to whatever logging configuration is in effect for the listener process.

Other processes can delegate all logging to the listener process. They can have
a much simpler logging configuration: just one handler, a QueueHandler, needs
to be added to the root logger. Other loggers in the configuration can be set
up with levels and filters to achieve the logging verbosity you need.

A QueueHandler processes events by sending them to the multiprocessing queue
that it's initialised with.

In this demo, there are some worker processes which just log some test messages
and then exit.

This script was tested on Ubuntu Jaunty and Windows 7.

Copyright (C) 2010 Vinay Sajip. All Rights Reserved.
"""
# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing

# Next two import lines for this demo only
from random import choice, random
import time

class QueueHandler(logging.Handler):
    """
    This is a logging handler which sends events to a multiprocessing queue.
    
    The plan is to add it to Python 3.2, but this can be copy pasted into
    user code for use with earlier Python versions.
    """

    def __init__(self, queue):
        """
        Initialise an instance, using the passed queue.
        """
        logging.Handler.__init__(self)
        self.queue = queue
        
    def emit(self, record):
        """
        Emit a record.

        Writes the LogRecord to the queue.
        """
        try:
            ei = record.exc_info
            if ei:
                dummy = self.format(record) # just to get traceback text into record.exc_text
                record.exc_info = None  # not needed any more
            self.queue.put_nowait(record)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)
#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do ths logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('/tmp/mptest.log', 'a', 300, 10)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a 
# LogRecord.
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None: # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record) # No level or filter logic applied - just do it!
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            import sys, traceback
            print >> sys.stderr, 'Whoops! Problem:'
            traceback.print_exc(file=sys.stderr)

# Arrays used for random selections in this demo

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
    h = QueueHandler(queue) # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    root.setLevel(logging.DEBUG) # send all messages, for demo; no other level or filter logic applied.

# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
    configurer(queue)
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)
    print('Worker finished: %s' % name)

# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        worker = multiprocessing.Process(target=worker_process,
                                       args=(queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    listener.join()

if __name__ == '__main__':
    main()

10 comments:

  1. "There's no equivalent cross-platform synhronisation for processes in the stdlib" - Hey Vinay - we do have cross platform locks in the stdlib, they're included as part of the multiprocessing package.

    ReplyDelete
  2. Hey Vinay -

    Nice, but seems like I should integrate QueueHandler with my recipe at http://stackoverflow.com/questions/641420/how-should-i-log-while-using-multiprocessing-in-python/894284#894284 , as it is less heavyweight than the example above - presented as a fully functional handler of its own its configurable via fileConfig() alone, and also doesn't need to spawn a child process to handle events, instead just using a daemon thread, though that might just be an artifact of how you are demonstrating QueueHandler here.

    ReplyDelete
  3. Also note a recent fix in my approach, which is that I don't only blow away exc_info from the record, I also blow away record.args, by applying record.msg % record.args, record.args = None. This so that unpickleable things don't get sent to the multiprocessing queue and cause hard-to-track errors, and it also reduces the load on pickle as well as the message size sent over the pipe.

    ReplyDelete
  4. @Jesse: I do realise that locks are available, but I thought they were contingent on having a non-zero HAVE_SEM_OPEN / sem_open() availability, which I thought were absent from some platforms. I believe there were problems on some FreeBSD versions; is that no longer the case?

    Also, is it better to implement this kind of processing using multiprocessing locks, or to use the message passing approach which I've used here? Since processes by default don't share memory etc. I would assume the message passing would be more performant; I don't want to give people reasons to complain about logging performance ;-)

    @Mike: Agreed about my example being more heavyweight, but the approach supports not just RotatingFileHandler but also TimedRotatingFileHandler, FileHandler and any user-defined subclasses thereof, optionally using fileConfig or dictConfig to configure. To my mind that flexibility justifies the slightly heavier approach.

    Of course in the demonstration I've used a listener process, but that's just happenstance, as you've surmised. I know it's perfectly valid to use a listener thread as you did in your post. I may expand on the demonstration to illustrate this; but I think we're into the realms of things which the application developer, rather than the library developer, is better placed to decide. For example, another alternative would be for one of the existing processes to do the listening, but exactly which one might depend on application specifics. It doesn't seem good policy to make these kind of decisions in the logging package itself.

    Re. the record.args, I agree with your analysis but I can't use that exact implementation in the stdlib, because logging supports arbitrary objects as messages, not just strings. I will address this for the QueueHandler implementation which I check into SVN.

    Guys, thanks for the feedback - much appreciated.

    ReplyDelete
  5. Yes; it can have issues on platforms where shared semaphores aren't included, good reminder. As for the queue based approach - you're right it's a good idea, wasn't disagreeing there :)

    ReplyDelete
  6. This comment has been removed by the author.

    ReplyDelete
  7. Hi Vinay -

    I've started to play around with this and the code you provide throws an exception. FWIW this is python 2.6 (as provided by Apple) on SL

    Worker started: Process-11
    Traceback (most recent call last):
    File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/logging/handlers.py", line 74, in emit
    if self.shouldRollover(record):
    File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/logging/handlers.py", line 145, in shouldRollover
    msg = "%s\n" % self.format(record)
    File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/logging/__init__.py", line 637, in format
    return fmt.format(record)
    File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/logging/__init__.py", line 428, in format
    s = self._fmt % record.__dict__
    KeyError: 'processName'
    .
    .
    .

    One for each worker..

    Thanks

    ReplyDelete
  8. @Steven: The "processName" attribute in the LogRecord doesn't work in Pythons earlier than 2.6.2; if your Apple-provided version is older than this, then you won't have "processName" - but you should still be able to use "process" to get you the pid of the process where the logging call occurred.

    ReplyDelete
  9. Hey Vinay,

    Yup I figured that out - thanks.

    ReplyDelete
  10. It works on Python2.7/Windows7
    Thank you!

    ReplyDelete