One of the repetitive tasks that I always seem to have, is handling large amounts of data in chunks. That is, typically I loop through every row in the database, but the result set is too large, or too slow to take into memory all at once. After reading my code a few too many times, I realized how lazy I had been on the first iteration, and decided to dry it up.
So, the result, is a couple of useful classes for managing large QuerySet’s, and cursors in Django.
First let’s handle iterating over a large QuerySet. Let’s just say we have a million rows in our database, and need to write each one to the file. Now in some situations, this could be a locking query, which isn’t good. In others, it’s simply just too slow and network heavy to send all million rows over the network. What we do instead, which is a little bit slower (overall), is query for a chunk of data at a time, and simply yield the current results.
123456789101112131415
classIterableQuerySet(object):"""Allows iteration over a QuerySet breaking it off into smaller chunks."""def__init__(self,queryset,batch=10000):self.batch=batchself.queryset=querysetdef__iter__(self):at=0results=self.queryset[at:at+self.batch]whileresults:forresultinresults:yieldresultat+=self.batchresults=self.queryset[at:at+self.batch]
Now one of the other common tasks we had with similar applications, was doing something similar but with cursors. So instead of writing the same class, but for a cursor, we instead made a new class which would allow a cursor to easily act like a QuerySet. This one’s a bit more complex, but also allows you to use a Paginator class on it.
importreCURSOR_UNION_REGEX=re.compile(r'^SELECT .*? FROM (.*)$',re.I)classQuerySetCursor(object):def__init__(self,connection,sql,params=[],model_class=None):self._result_cache=Noneself._offset=0self._limit=Noneself._params=paramsself._connection=connectionself._sql=sqlself._model_class=model_classdef__getitem__(self,k):ifnotisinstance(k,(slice,int,long)):raiseTypeErrorassert(notisinstance(k,slice)and(k>=0)) \
or(isinstance(k,slice)and(k.startisNoneork.start>=0)and(k.stopisNoneork.stop>=0)), \
"Negative indexing is not supported."iftype(k)==slice:ifself._offset<k.startork.stop-k.start>self._limit:self._result_cache=Noneelse:ifknotinrange(self._offset,self._limit+self._offset):self._result_cache=Noneifself._result_cacheisNone:iftype(k)==slice:self._offset=k.startself._limit=k.stop-k.startreturnself._get_results()else:self._offset=kself._limit=1returnself._get_results()[0]else:returnself._result_cache[k]def__len__(self):returnlen(self._get_data())def__iter__(self):returniter(self._get_data())def_get_data(self):ifself._result_cacheisNone:self._result_cache=list(self._get_results())returnself._result_cachedef_get_results(self):ifself._limit:query=self._sql+' LIMIT %s, %s'params=self._params+[self._offset,self._offset+self._limit]else:params=self._paramsquery=self._sqlcursor=self._connection.cursor()try:cursor.execute(query,params)results=cursor.fetchall()ifself._model_class:results=[self._model_class(*r)forrinresults]finally:cursor.close()returnresultsdefcount(self):statements=self._sql.split(' UNION ')prepared=[]forstatementinstatements:end_of_stmt=CURSOR_UNION_REGEX.match(statement)ifnotend_of_stmt:raiseException("Error getting SQL")prepared.append("SELECT COUNT(1) FROM %s"%(end_of_stmt.group(1),))query='SELECT (%s)'%(') + ('.join(prepared),)cursor=self._connection.cursor()try:cursor.execute(query,self._params)results=cursor.fetchone()[0]finally:cursor.close()returnresults
I believe there’s also some optimization that can be doing with cursor.fetchmany() to also chunk the SQL in memory, but I’ve yet to research how the cursor classes work internally.