David Cramer's Blog

Large SQL Result Sets in Django

One of the repetitive tasks that I always seem to have, is handling large amounts of data in chunks. That is, typically I loop through every row in the database, but the result set is too large, or too slow to take into memory all at once. After reading my code a few too many times, I realized how lazy I had been on the first iteration, and decided to dry it up.

So, the result, is a couple of useful classes for managing large QuerySet’s, and cursors in Django.

First let’s handle iterating over a large QuerySet. Let’s just say we have a million rows in our database, and need to write each one to the file. Now in some situations, this could be a locking query, which isn’t good. In others, it’s simply just too slow and network heavy to send all million rows over the network. What we do instead, which is a little bit slower (overall), is query for a chunk of data at a time, and simply yield the current results.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class IterableQuerySet(object):
    """Allows iteration over a QuerySet breaking it off into smaller chunks."""
    def __init__(self, queryset, batch=10000):
        self.batch = batch
        self.queryset = queryset

    def __iter__(self):
        at = 0

        results = self.queryset[at:at+self.batch]
        while results:
            for result in results:
                yield result
            at += self.batch
            results = self.queryset[at:at+self.batch]
Now one of the other common tasks we had with similar applications, was doing something similar but with cursors. So instead of writing the same class, but for a cursor, we instead made a new class which would allow a cursor to easily act like a QuerySet. This one’s a bit more complex, but also allows you to use a Paginator class on it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import re

CURSOR_UNION_REGEX = re.compile(r'^SELECT .*? FROM (.*)$', re.I)

class QuerySetCursor(object):
    def __init__(self, connection, sql, params=[], model_class=None):
        self._result_cache = None
        self._offset = 0
        self._limit = None
        self._params = params
        self._connection = connection
        self._sql = sql
        self._model_class = model_class

    def __getitem__(self, k):
        if not isinstance(k, (slice, int, long)):
            raise TypeError
        assert (not isinstance(k, slice) and (k >= 0)) \
            or (isinstance(k, slice) and (k.start is None or k.start >= 0) and (k.stop is None or k.stop >= 0)), \
                "Negative indexing is not supported."
        if type(k) == slice:
            if self._offset < k.start or k.stop-k.start > self._limit:
                self._result_cache = None
            else:
                if k not in range(self._offset, self._limit+self._offset):
                    self._result_cache = None
        if self._result_cache is None:
            if type(k) == slice:
                self._offset = k.start
                self._limit = k.stop-k.start
                return self._get_results()
            else:
                self._offset = k
                self._limit = 1
                return self._get_results()[0]
        else:
            return self._result_cache[k]

    def __len__(self):
        return len(self._get_data())

    def __iter__(self):
        return iter(self._get_data())

    def _get_data(self):
        if self._result_cache is None:
            self._result_cache = list(self._get_results())
        return self._result_cache

    def _get_results(self):
        if self._limit:
            query = self._sql + ' LIMIT %s, %s'
            params = self._params + [self._offset, self._offset+self._limit]
        else:
            params = self._params
            query = self._sql

        cursor = self._connection.cursor()
        try:
            cursor.execute(query, params)
            results = cursor.fetchall()
            if self._model_class:
                results = [self._model_class(*r) for r in results]
        finally:
            cursor.close()
        return results

    def count(self):
        statements = self._sql.split(' UNION ')
        prepared = []
        for statement in statements:
            end_of_stmt = CURSOR_UNION_REGEX.match(statement)
            if not end_of_stmt:
                raise Exception("Error getting SQL")
            prepared.append("SELECT COUNT(1) FROM %s" % (end_of_stmt.group(1),))
        query = 'SELECT (%s)' % (') + ('.join(prepared),)
        cursor = self._connection.cursor()
        try:
            cursor.execute(query, self._params)
            results = cursor.fetchone()[0]
        finally:
            cursor.close()
        return results
Want to combine both results?
1
2
for r in IterableQuerySet(QuerySetCursor(connection, sql, params, MyModelClass)):
    print r

I believe there’s also some optimization that can be doing with cursor.fetchmany() to also chunk the SQL in memory, but I’ve yet to research how the cursor classes work internally.