In Python, thread safe does not mean fork safe

We discovered this in a not so easily reproducible way. Here at Mahalo we use this wonderful distributed task queue called Celery. Upon restarting the celeryd server, rarely (until recently) and only some of the workers would throw MemcachedError during their first task. The cause of the issue would be receiving a “STORED” reponse to a memcached_get (wtf?!) or some other variant of invalid response or an EINPROGRESS (which is the socket is already busy doing something). Now another tidbit, we also use Pylibmc which is a wrapper around the libmemcached C library which we’ve written a custom Django cache interface for.

Sounds like a threading/concurrency issue, no?

First thing to try was to switch our custom interface to use the ThreadMappedPool instead of just using the Client from Pylibmc. It is thread safe after all.
Well, its thread safety comes from using thread.get_ident() as a key to a dictionary of Pylibmc.Clients. According to the docs, thread.get_ident() is a “magic cookie” (oh yay magic!) that can be recycled after a thread using it exits. Sounds all fine and dandy, we don’t mind Clients being reused after a thread exits. ThreadMappedPool seemed to solve the issue for us (well mask it anyway)… until recently that is. Now the real issue comes to light.

Before, tasks that used a Cache client staggered out enough that it allowed one worker to “pop” a Client from the ThreadMappedPool, do some operations and disconnect the Client (thus invalidating the connection for the other workers as well) before another tried using it again (making it reconnect). Recently we added a couple tasks that only handle some cache updating and after Celery wasn’t running for a few minutes a bunch of these queued up. Upon restarting Celery most of the workers grabbed one or more of these tasks and tried processing them. Since these are magnitudes faster than previous tasks that used cache, they all tried using the same Pylibmc Client at the same time causing the lovely invalid response errors even thought we were using a thread safe client.

EXCEPT Celery doesn’t use threading! D’OH!

Celery uses multiprocessing to create a Pool of “Workers.”

On non-Windows systems, multiprocessing uses the fork() system call to create new processes. Forking (simplified and from my understanding) copies the parent’s processes memory into a new processes and continues running. This causes a big issue if there are open file(s) and/or socket(s) (such as database or memcached connections). Since now both the parent and child process(es) share the same descriptors to these resources. More on this in the Python multiprocessing documentation.

Celery (now djcelery) recognized this issue and closes database and cache client connections at the beginning of every task to do “everything necessary for Django to work in a long-living, multiprocessing environment.” However, in our case, the cache client was never being disconnected because Celery would only call .close() if “memcached” was in the class identifier of the Django setting CACHE_BACKEND. Since we rolled a custom interface named “pylibmc_backend”, Celery didn’t recognize it as a memcached client. Oops…

Note: django-pylibmc does not suffer from this issue.

So back to the original meaning of this post “thread safe does not mean fork safe.” Here’s a simple test showing using thread.get_ident() as an identifier for thread safety does not also make it fork safe (hence ThreadMappedPool was still not solving our Celery problem).

=== Multiprocessing Pool ===
(os.pid: 30856)(os.ppid: 30855)(thread.get_ident: 140682307016448)
(os.pid: 30857)(os.ppid: 30855)(thread.get_ident: 140682307016448)
(os.pid: 30860)(os.ppid: 30855)(thread.get_ident: 140682307016448)
(os.pid: 30859)(os.ppid: 30855)(thread.get_ident: 140682307016448)
(os.pid: 30858)(os.ppid: 30855)(thread.get_ident: 140682307016448)
(os.pid: 30856)(os.ppid: 30855)(thread.get_ident: 140682307016448)
(os.pid: 30857)(os.ppid: 30855)(thread.get_ident: 140682307016448)
(os.pid: 30860)(os.ppid: 30855)(thread.get_ident: 140682307016448)
(os.pid: 30859)(os.ppid: 30855)(thread.get_ident: 140682307016448)
(os.pid: 30858)(os.ppid: 30855)(thread.get_ident: 140682307016448)

=== Threading ===
(os.pid: 30855)(os.ppid: 29274)(thread.get_ident: 140682262529808)
(os.pid: 30855)(os.ppid: 29274)(thread.get_ident: 140682254137104)
(os.pid: 30855)(os.ppid: 29274)(thread.get_ident: 140682245744400)
(os.pid: 30855)(os.ppid: 29274)(thread.get_ident: 140682237351696)
(os.pid: 30855)(os.ppid: 29274)(thread.get_ident: 140682228958992)
sleeping 2s...
(os.pid: 30855)(os.ppid: 29274)(thread.get_ident: 140682237351696)
(os.pid: 30855)(os.ppid: 29274)(thread.get_ident: 140682228958992)
(os.pid: 30855)(os.ppid: 29274)(thread.get_ident: 140682262529808)
(os.pid: 30855)(os.ppid: 29274)(thread.get_ident: 140682245744400)
(os.pid: 30855)(os.ppid: 29274)(thread.get_ident: 140682254137104)

And the Python script used to generate this very simple test:

from multiprocessing import Pool
import threading
from time import sleep

def my_ident(a=None):
    import thread
    import os
    print "(os.pid: %s)(os.ppid: %s)(thread.get_ident: %s)" % (os.getpid(),os.getppid(),thread.get_ident())
    sleep(1)

if __name__ == '__main__':
    print "=== Multiprocessing Pool ==="
    p = Pool(processes=5)
    p.map(my_ident,range(10))

    print "\n=== Threading ==="
    for i in range(1,11):
        t = threading.Thread(target=my_ident)
        t.start()
        if i % 5 == 0:
            sleep(2)
            print "sleeping 2s..."