Notifications over Websockets with Http4s and Skunk

Notifications over Websockets with Http4s and Skunk

Sometimes our applications have asynchronous processes that require users to wait for results. Two examples in Groundwork are processing imagery so users can see map layers for their projects and exporting STAC catalogs of users’ imagery and labels. It’s nice for users to receive information about the status of these processes without having to refresh the page.

You can make this work with polling from a front-end application if you’re not worried that too many status checks will make your database sad. There are also plenty of third-party services specifically tailored to sending notifications or that look like notification services if you squint at them, like Intercom. In recent R&D though, I wanted to try out a different strategy. I wanted to figure out whether I could send notifications from the server using only PostgreSQL and Scala.

Here’s what the application does in GIF form:

server demo
Notifications over websockets from the Http4s server

The application backing the example above is available on GitHub.

The database

This project used PostgreSQL as its database. The basic database schema was borrowed from the world countries database in the doobie documentation. I chose it because it has only a few tables, it’s publicly available, and it will be familiar to anyone who’s read through the doobie documentation.

I added a trigger on the city table that runs a function to notify a channel matching the new city’s country code every time a row is added. The city table ends up looking like this:

pgsockets4s=# \d city
                       Table "public.city"
   Column    |       Type        | Collation | Nullable | Default
-------------+-------------------+-----------+----------+---------
 id          | integer           |           | not null |
 name        | character varying |           | not null |
 countrycode | character(3)      |           | not null |
 district    | character varying |           | not null |
 population  | integer           |           | not null |
Indexes:
    "city_pkey" PRIMARY KEY, btree (id)
Referenced by:
    TABLE "country" CONSTRAINT "country_capital_fkey" FOREIGN KEY (capital) REFERENCES city(id)
Triggers:
    new_city_trigger AFTER INSERT OR UPDATE OF id ON city FOR EACH ROW EXECUTE PROCEDURE new_city_notify()

The new_city_notify() function mentioned at the bottom is also pretty straightforward:

CREATE OR REPLACE FUNCTION new_city_notify()
?RETURNS trigger AS
$$
BEGIN
    PERFORM pg_notify(lower(NEW.countrycode), row_to_json(NEW)::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

This function creates a JSON representation of the new city’s row and sends it to a channel matching the new city’s country code. NEW refers to the row that’s been inserted and has a schema matching the row’s table.row_to_json converts the inserted row into a JSON record, and ::text lets PostgreSQL know that I just care about the string containing that JSON.

The notification webservice

I built the webservice with http4s. Helpfully, the http4sopen source community maintains an example of a websocket server.

The crux of the example is that if I can produce a Stream[F, WebSocketFrame] from my PostgreSQL database, I can hook up the async notifications in the database to my webservice. For that task, I turned to skunk. skunk is a new database library for Scala that targets only PostgreSQL, instead of any JDBC-compliant database. Scala developers and other people at Azavea who are interested have been reading Practical Functional Programming in Scala by Gabriel Volpe, so we have some recent experience with skunk and http4s that I thought would help me here.

skunk provides a way to listen to PostgreSQL notification channels. The return type is a Stream[F, Notification], which looks a lot like the Stream[F, WebSocketFrame] that I needed. Streams can be mapped, Notifications can be converted to Strings, and WebSocketFrames have a String constructor, so the integration was amazingly easy:

val sess: Session[F] = ???
val identifier: Identifer = ???
session.channel(identifer).listen(1).map(notificationData => Text(s"Ping! $notificationData"))

In the application, it looks a little more complex, but not much. The extra work around the call to listen is present to handle cases where a user provides a URL parameter that isn’t a valid PostgreSQL identifier.

Putting it all together

The rest was configuration. skunk needs a way to obtain sessions from a database. Fortunately, that’s easy to set up with my world countries database in docker-compose. Constructing a session pool in skunk requires a simple call to Session.pooled[F] for some effect type F with your standard database connection parameters ー database name, user, password, and address ー that were easy to match up with what I had in my docker-compose.yml file.

What’s missing here?

Should you clone the repo, throw it into a container, and deploy it to your managed Kubernetes service right now? Probably not!

Since this was an experiment, I didn’t handle some important concerns that you’d probably have with a production-grade notification service.

First, there’s no concept of dead letters in PostgreSQL as far as I’m aware, so if PostgreSQL sends a notification that no one is listening for, nothing cares and no one finds out.

Secondly, I have no idea how well this setup performs under a more substantial load than the tests I ran on my computer. While it held up fine against inserting a country every quarter of a second and notifying 14 consumers, I don’t know that that means that it could handle twenty-five, or one hundred, or one thousand. I assume it’s not one thousand? But I don’t know.

Third, there are a number of caveats in the WebSocketBuilder class that make it seem like a server running for long enough would slowly accumulate connections that it doesn’t know it should close.

But it worked! It is 100% possible to service asynchronous notifications over websockets using PostgreSQL and Scala. If you’d like to try it out locally, you can clone the repository and follow the instructions in the README.