Thursday, January 27, 2011

Twisted adbapi : asynchronous database access


First of all, read the official twisted documentation.
""" It is a non-blocking interface to the standardized DB-API 2.0 API, which allows you to access a number of different RDBMSes."""

When I first used this module I faced some troubles I didn't expected, so I decided to build an example tutorial covering the basics.

Here we will build a simple sqlite database and access to it through a main DB class.
Let say we store user's informations into a database and we want to retrieve an Avatar object build from those infos.

First step : building our test database
We will use the sqlite3 library to build our database.

import tempfile
import sqlite3

USERS = (('yoen', 'Van der Veld', 'Yoen',),
('esteban', 'Garcia Marquez', 'Estebán',),
('mohamed', 'Al Ghâlib', 'Mohamed',),)

def setup_db():
""" Setup our sqlite database """
# Tempfile ensure us not to override an existing file
myfilename = tempfile.mktemp('.sqlite', 'test-bdd', '/tmp')
conn = sqlite3.connect(myfilename)
curs = conn.cursor()
curs.execute("Create table users (login text unique, name text, \
forname text)")
for login, name, forname in USERS:
curs.execute("INSERT INTO USERS VALUES (?, ?, ?)", (login,
return myfilename

dbname = setup_db()

At this point we've got a working database dbname with one table 'users' containing three fields : 'login' (unique), 'name', 'forname'.

Second step : the Avatar Object

class Avatar:
def __init__(self, login, name, forname):
self.login = login
self.name = name
self.forname = forname
def render(self):
msg = "Mr '%s %s' is connected under '%s'" % (self.forname,
return msg

Nothing special to say here, don't need more for our example.

Third step : the base connector
This class will interact with the database and provide a get_user_avatar method.

from twisted.enterprise.adbapi import ConnectionPool

class DBPool:
Sqlite connection pool
def __init__(self, dbname):
self.dbname = dbname
self.__dbpool = ConnectionPool('sqlite3', self.dbname)
def shutdown(self):
Shutdown function
It's a required task to shutdown the database connection pool:
garbage collector doesn't shutdown associated thread
def build_avatar(self, dbentries):
Build avatar from dbentries
login, name, forname = dbentries[0]
return Avatar(login, name, forname)
def get_user_avatar(self, login):
Build associated avatar object
query = 'SELECT * from `users` where login=?'
return self.__dbpool.runQuery(query, (login,)).addCallback(

Now we have all we need to be able to retrieve avatars from our database.
Some special points to underline :
  • The ConnectionPool automatically start within twisted's standard reactor

  • The ConnectionPool has to be stopped after using it to avoid the "thread.error: can't start new thread" error

Last step : Let's try

from twisted.internet.defer import DeferredList
from twisted.internet import reactor
def printResult(result):
for r in result:

# We initialize the db pool with our dbname retrieved in the first step.
dbpool = DBPool(dbname)
# DeferredList seems more adapted than chained callbacks in this sort of cases
ret_render = lambda avatar: avatar.render()
deferreds = [dbpool.get_user_avatar(login).addCallback(ret_render)
for login in ('yoen', 'esteban', 'mohamed',)]

dlist = DeferredList(deferreds)
# We ask our pool to shutdown all the initialized connections
dlist.addCallback(lambda _:dbpool.shutdown)
# Want to get the hand back :-)
reactor.callLater(4, reactor.stop)