Stream queries

Watch SQL queries in drift

A core feature of drift is that every query can be turned into an auto-updating stream. This works regardless of whether the query returns a single or multiple rows, or whether the query is reading from a single table or joining multiple others.

Basics#

In drift, a runnable query is represented by the Selectable<T> interface, which has the following methods:

  • Future<List<T>> get(): Runs the query once, returning all rows.
  • Future<T> getSingle(): Runs the query once, asserts that it yields a single row which is returned.
  • Future<T?> getSingleOrNull(): Like getSingle(), but allows returning null for empty result sets.

And each of these methods has a matching watch() method returning a stream:

  • Stream<List<T>> watch(): Watches the query, returning all rows.
  • Stream<T> watchSingle(): Watches the query, asserting that a single row is reported each time the query runs.
  • Stream<T?> watchSingleOrNull(): Like watchSingle(), but returning empty result sets as null.

All drift APIs for building queries return a Selectable that can be watched:

Selectable<TodoItem> allItemsAfter(DateTime min) {
  return select(todoItems)
    ..where((row) => row.createdAt.isBiggerThanValue(min));
}
Selectable<TodoItem> allItemsAfter(DateTime min) {
  return managers.todoItems.filter((c) => c.createdAt.isAfter(min));
}

Selectable<TodoItem> allItemsAfter(DateTime min) {
  return customSelect(
    'SELECT * FROM todo_items WHERE created_at > ?',
    variables: [Variable.withDateTime(min)],
    readsFrom: {todoItems}, // (1)!
  ).map((row) => todoItems.map(row.data));
}

  1. Drift needs to know which tables are involved in a query to watch them. This is inferred automatically in most cases, but this information is necessary for custom queries.

When defining a SELECT statement in a drift file, drift generates a method in the database class returning a Selectable. For instance,

allItemsAfter: SELECT * FROM todo_items WHERE created_at > :min;

Will make drift generate this method:

Selectable<TodoItem> allItemsAfter({required DateTime min}) {
    // ...
}

Regardless of the method used, a stream can then be created with allItemsAfter(value).watch(). And as Streams are a common building block in Dart, they can be consumed by most frameworks:

All drift streams will emit an up-to-date result after listening to them (so you'll receive a snapshot even if the tables never change, and don't have to combine get() and watch()).

Advanced uses#

In addition to listening on queries, you can also listen for update events on tables directly:

Future<void> listenForUpdates() async {
  final stream = tableUpdates(
    TableUpdateQuery.onTable(todoItems, limitUpdateKind: UpdateKind.update),
  );

  await for (final event in stream) {
    print('Update on todos table: $event');
  }
}

Note that the entire query stream functionality is implemented in drift, so stream updates are a heuristic that might fire more often than necessary. It's also possible to mark a table as updated manually:

void markUpdated() {
  notifyUpdates({TableUpdate.onTable(todoItems, kind: UpdateKind.insert)});
}

Caveats#

While streams are useful to automically get updates for whatever queries you're running, it's important to understand their functionality and limitations. Stream queries are implemented as a heuristic in drift: For each active stream, drift tracks which tables it's listening on (information that is available from the query builder). Whenever an insert, an update, or a deletion is made through drift APIs, the associated queries are rescheduled and will run again.

This means that:

  1. Other uses of the database, e.g. a native SQLite client, will not trigger stream query updates. You can manually inject updates as a workaround.
  2. Stream queries generally update more often than they have to, since we can't filter for updates on specific rows only. This is typically not a problem, but something to be aware of. Stream queries should typically return relatively few rows and not be too computationally expensive to execute.