Collision Detection and History with SQLAlchemy

Balrog is one of the more crucial systems that Release Engineering works on. Many of our automated builds send data to it and all Firefox installations in the wild regularly query it to look for updates. It is an SQLAlchemy based app, but because of its huge importance it became clear in the early stages of development that we had a couple of requirements that went beyond those of most other SQLAlchemy apps, specifically:

  • Collision Detection: Changes to the database must always be done safely. Balrog must not allow one change to silently override another one.
  • Full History: Balrog must provide complete auditability and history for all changes. We must be able to associate every change with an account and a timestamp.

Implementing these two requirements ended up being a interesting project and I'd like to share the details of how it all works.

Collision Detection

Anyone who's used Bugzilla for awhile has probably encountered this screen before:



This screenshot shows how Bugzilla detects and warns if you try to make a change to a bug before loading changes someone else has made. While it's annoying when this screen slows you down, it's important that Bugzilla doesn't let you unknowingly overwrite other folks' changes. This is very similar to what we wanted to do in Balrog, except that we needed to enforce it at the API level, not just in the UI. In fact, we decided it was best to enforce it at the lowest level possible to minimize the change of needing to duplicate it in different parts of the app.

To do this, we started by creating a thin wrapper around SQLAlchemy which ensures that each table has a "data_version" column, and requires an "old_data_version" to be passed when doing an UPDATE or DELETE. Here's a slimmed down version of how it works with UPDATE:

class AUSTable(object):
    """Base class for Balrog tables. Subclasses must create self.table as an
    SQLAlchemy Table object prior to calling AUSTable.__init__()."""
    def __init__(self, engine):
        self.engine = engine
        # Ensure that the table has a data_version column on it.
        self.table.append_column(Column("data_version", Integer, nullable=False))

    def update(self, where, what, old_data_version):
        # Enforce the data_version check at the query level to eliminate
        # the possibility of a race condition between the time we can
        # retrieve the current data_version, and when we can update the row.
        where.append(self.table.data_version == old_data_version)

        with self.engine.connect().begin() as transaction:
            row = self.select(where=where, transaction=transaction)
            row["data_version"] += 1
            for col in what:
                row[col] = what[col]

            query = self.table.update(values=what):
            for cond in where:
                query = query.where(cond)
            ret = transaction.execute(query)

            if ret.rowcount != 1:
                raise OutdatedDataError("Failed to update row, old_data_version doesn't match data_version")

And one of our concrete tables:

class Releases(AUSTable):
    def __init__(self, engine, metadata):
        self.table = Table("releases", metadata,
            Column("name", String(100), primary_key=True),
            Column("product", String(15), nullable=False),
            Column("data", Text(), nullable=False),
        )

    def updateRelease(self, name, old_data_version, product, data):
        what = {
            "product": product,
            "data": data,
        }
        self.update(where=[self.table.name == name], what=what, old_data_version=old_data_version)

As you can see, the data_version check is added as a clause to the UPDATE statement - so there's no way we can race with other changes. The usual workflow for callers is to retrieve the current version of the data, modify it, and pass it back along with old data_version (most of the time retrieval and pass back happens through the REST API). It's worth pointing out that a client could pass a different value as old_data_version in an attempt to thwart the system. This is something we explicitly do not try to protect against (and honestly, I don't think we could) -- data_version is a protection against accidental races, not against malicious changes.

Full History

Having history of all changes to Balrog's database is not terribly important on a day-to-day basis, but when we have issues related to updates it's extremely important that we're able to look back in time and see why a particular issue happened, how long it existed for, and who made the change. Like collision detection, this is implemented at a low level of Balrog to make sure it's difficult to bypass when writing new code.

To achieve it we create a History table for each primary data table. For example, we have both "releases" and "releases_history" tables. In addition to all of the Releases columns, the associated History table also has columns for the account name that makes each change and a timestamp of when it was made. Whenever an INSERT, UPDATE, or DELETE is executed, the History table has a new row inserted with the full contents of the new version. These are done is a single transaction to make sure it is an all-or-nothing operation.

Building on the code from above, here's a simplified version of how we implement History:

class AUSTable(object):
    """Base class for Balrog tables. Subclasses must create self.table as an
    SQLAlchemy Table object prior to calling AUSTable.__init__()."""
    def __init__(self, engine, history=True, versioned=True):
        self.engine = engine
        self.versioned = versioned
        # Versioned tables (generally, non-History tables) need a data_version.
        if versioned:
            self.table.append_column(Column("data_version", Integer, nullable=False))

        # Well defined interface to the primary_key columns, needed by History tables.
        self.primary_key = []
        for col in self.table.get_children():
            if col.primary_key:
                self.primary_key.append(col)

        if history:
            self.history = History(self.table.metadata, self)
        else:
            self.history = None

    def update(self, where, what, old_data_version=None, changed_by=None):
        # Because we're a base for History tables as well as normal tables
        # these must be optional parameters, but enforced when the features
        # are enabled.
        if self.history and not changed_by:
            raise ValueError("changed_by must be passed for Tables that have history")
        if self.versioned and not old_data_version:
            raise ValueError("update: old_data_version must be passed for Tables that are versioned")

        # Enforce the data_version check at the query level to eliminate
        # the possibility of a race condition between the time we can
        # retrieve the current data_version, and when we can update the row.
        where.append(self.table.data_version == old_data_version)

        with self.engine.connect().begin() as transaction:
            row = self.select(where=where, transaction=transaction)
            row["data_version"] += 1
            for col in what:
                row[col] = what[col]

            query = self.table.update(values=what):
            for cond in where:
                query = query.where(cond)
            ret = transaction.execute(query)
            if self.history:
                transaction.execute(self.history.forUpdate(row, changed_by))
            if ret.rowcount != 1:
                raise OutdatedDataError("Failed to update row, old_data_version doesn't match data_version")


class History(AUSTable):
    def __init__(self, metadata, baseTable):
        self.baseTable = baseTable
        self.Table("%s_history" % baseTable.table.name, metadata,
            Column("change_id", Integer(), primary_key=True, autoincrement=True),
            Column("changed_by", String(100), nullable=False),
            Column("timestamp", BigInteger(), nullable=False),
        )

        self.base_primary_key = [pk.name for pk in baseTable.primary_key]
        # In addition to the above columns, we need a copy of each Column
        # from our base table.
        for col in baseTable.table.get_children():
            newcol = col.copy()
            # We have our own primary_key Column, and don't want our
            # base table's PK to be part of it.
            if col.primary_key:
                newcol.primary_key = False
            # And while the base table's primary key is always required for
            # history rows, all other columns (including those that are
            # required in the base table) must be nullable.
            else:
                newcol.nullable = True
            self.table.append_column(newcol)

        AUSTable.__init__(self, history=False, versioned=False)

    def forUpdate(self, rowData, changed_by):
        row = {}
        # Copy in the data that's about to be updated in the base table...
        for k in rowData:
            row[k] = rowData[k]
        # ...and add the extra data that we need to track history accurately.
        row["changed_by"] = changed_by
        row["timestamp"] = time.time()
        return self.table.insert(values=rows)

A key thing to notice here is that the History tables are maintained automatically with only a minor tweak to the query interface (addition of "changed_by"). And while not shown here, it's important to note that the History table objects are not queryable directly through any exposed API. Even if an attacker got access to Balrog's REST API with admin permissions, they cannot delete rows from those tables.

If you'd like to see the complete implementation of either of these, you can find it over in the Balrog repository.

Enhancements

These things were implemented a few years ago, and since then we've discovered a couple of rough edges that would be nice to fix.

The biggest complaint is that the History tables are extremely inefficient. Many of our Release objects are a few hundred kilobytes, which means every change to them (thousands per day) significantly grows the releases_history table. We've dealt with this by limiting how long we keep history for certain types of releases, but it's far less than ideal. We'd love to have a more efficient way of storing history. We've discussed storing history as diffs rather than full copies or compressing the data before inserting the rows, but haven't settled on anything yet. If you have any ideas about this we'd love to hear them!

I mentioned earlier how annoying it is when Bugzilla throws you a mid-air collision, and it's no different in Balrog. We get hundreds of them a day when locales l10n repacks all try to update the same Releases. These can be dealt with by retrying but it's very inefficient. We might be able to do a better here if we inspected the details of changes that collide, and only reject them if they try to modify the same parts of an object.

Finally, all of this awesome collision detection and history code is in no way tied to Balrog - the classes that implement it are already very generic. I would love to pull out these features and ship them as their own module, which Balrog (and hopefully others!) can then consume.

Comments

Comments powered by Disqus