Publishing messages to RabbitMQ from PostgreSQL

One part of the rewrite of all of my sites is to make dynamic content more real-time, specifically using Websockets, so that content is as close as live as it’s possible. For this I’m using the current RabbitMQ release running within a docker container with the web_stomp plugin enabled. With this I can then have a webpage connect over websockets directly into the message broker and listen for messages.

Now the problem. Most events are defined as some change within a database so I now need a method to allow a database trigger to be able to send a message to RabbitMQ without hindering performance – more so when some tables are being updated 20-40 times a second.

Fortunately PostgreSQL provides us with asynchronous notifications via the LISTEN and NOTIFY commands, so we can add a notify command to the trigger to a named queue and then have a separate process listen to that queue. All I needed to implement was an application that did the listening.

notify-rabbit

This is a simple standalone nodejs application that runs within a Docker container. With a simple(ish) bit of configuration it can connect to one or more databases and listen for notification events. When those events are received it will then send them straight to a RabbitMQ topic. All client’s need to do is to bind to that topic with a specific routing key to receive the messages.

Firstly you need to be running Docker and retrieve the current version of the tool:

docker pull area51/node-rabbit

Next you need to create the config file & put it somewhere where the tool can read it. A sample is on GitHub but I’ll go through the basics here.

The config.yaml file consists of three seconds, one to define the database(s) to connect to, one the RabbitMQ instance(s) and one that defines the notify queue(s) to listen two.

databases

This section contains connection details to connect to your databases:

databases:
    testDB:
        enabled: true
        host: localhost
        port: 5432
        database: postgres
        user: postgres
        password: postgres
        ssl: false

Here we have just one database configured called testDB which will be referred to later.

rabbit

This section defines details of the rabbitmq instances you want to connect to. It simply consists of a name for the instance and the connection URI to connect to it.

rabbit:
    testRabbit: amqp://guest:password@localhost

Note: You can put an IP address here instead of the hostname. If it’s an IPv6 address then wrap it within a pair of [ ].

notify

This section defines which databases you want to listen to for notifications. You usually have one entry per database (but you are not limited to this).

Simple messages

For the simple usecase of sending all messages as-is to a single routingKey.

notify:
    -
        enabled: true
        database: testdb
        name: rabbit
        handlers:
            rabbit:
                instance: testRabbit
                key: job.status

Here we are telling the application to listen for notifications sent to the ‘rabbit’ queue on the “testdb” database. All messages received would be sent as-is to the testRabbit RabbitMQ instance with the routing key ‘job.status’.

Then from PostgreSQL you can use the following to send the message.

SELECT pg_notify('rabbit','My message');

Set the routing key in PostgreSQL

More often you need PostgreSQL to define the routing key based on the data being notified. To do this we need to send the message from PostgreSQL as a JSON object with the routing key defined in one property and the message to send in another:

notify:
    -
        enabled: true
        database: testdb
        name: rabbit
        json: true
        handlers:
            rabbit:
                instance: testRabbit
                routingKey: key
                payload: body

Here we are telling the application to expect a JSON object from PostgreSQL with two properties.

  • “key” will contain the routing key to use
  • “body” will contain the message to send.

For example:

SELECT pg_notify('rabbit','{"key":"job.status","body": "My message"}');

This will then send a JSON string “My message” to rabbit with the routing key “job.status”.

You can send structured data as well, as long as it’s valid JSON:

SELECT pg_notify('rabbit','{"key":"td.SA","body":{"CT_MSG":{"time":"1349696911000", "area_id":"SA", "msg_type":"CT", "report_time":"1249"}}');

This will then send a JSON object {“CT_MSG”:{“time”:”1349696911000″, “area_id”:”SA”, “msg_type”:”CT”, “report_time”:”1249″}} with a routing key of “td.SA”/

Now the payload entry is optional. If you don’t define one then the entire original message is sent to Rabbit, including the original routing key property.

Running

Now you have it configued it’s time to rest:

docker run -it --rm -v $(pwd)/config.yaml:/opt/config.yaml:ro area51/node-notify

If all goes well you should not see any errors. If you connect to RabbitMQ’s Mnagement console and look under Connections you should then see something like this:

Example RabbitMQ Connection

The label “Notify timetable reportengine” here will be different, “timetable” being the database name in the config and “reportengine” the notify queue name it’s listening on. If you have multiple notify entries then there will be one connection for each.

If all goes well press ^C to close the app and run it in the background:

docker run -d -v $(pwd)/config.yaml:/opt/config.yaml:ro area51/node-notify

Adding a trigger to a database

The final part is a real-life example of a trigger which issues notifications. Now I could have done a simple example but thought I might as well put a real life one instead. It’s a bit long but it should show what you can do more than a simple one.

Here we add triggers to two tables in our report engine. This engine manages regular jobs that run at specific intervals and with these triggers we send one of two events to a webpage showing the job status:

  • “DELETED” removes the entry from the page when the job is made private.
  • All  other messages are updates, e.g. the job is running, or it has completed successfully or has failed.
CREATE OR REPLACE FUNCTION reportengine.notifystate(_id INTEGER)
RETURNS VOID AS $$
DECLARE
    rec TEXT;
BEGIN
    SELECT INTO rec row_to_json(t)::TEXT FROM (
        SELECT
            n.name,
            m.disabled AS DISABLED,
            s.status,
            to_char(s.lastrun, 'YYYY-MM-DDThh24:MI:SSZ') AS "lastRun",
            to_char(s.nextrun, 'YYYY-MM-DDThh24:MI:SSZ') AS "nextRun",
            to_char(s.lastpass, 'YYYY-MM-DDThh24:MI:SSZ') AS "lastPass",
            to_char(s.lastfail, 'YYYY-MM-DDThh24:MI:SSZ') AS "lastFail",
            CASE
                WHEN s.status = 'RUNNING' AND s.nextRun IS NOT NULL AND t.showruntime THEN
                    (now()::TIMESTAMP WITHOUT TIME ZONE -s.nextRun)::TEXT
                WHEN s.lastrun IS NULL THEN
                    NULL
                WHEN s.lastfail IS NULL OR s.lastpass > s.lastfail THEN
                    (s.lastpass - s.lastrun)::TEXT
                WHEN s.lastfail IS NOT NULL THEN
                    (s.lastfail - s.lastrun)::TEXT
                ELSE
                    NULL
                END AS duration,
            m.description AS description,
            t.name AS type
        FROM reportengine.name n
            INNER JOIN reportengine.status s ON n.id=s.id
            INNER JOIN reportengine.meta m ON n.id=m.id
            INNER JOIN reportengine.jobtype t ON m.type=t.id
        WHERE n.id = _id AND m.public
        LIMIT 1
    ) t;
    IF FOUND THEN
        PERFORM pg_notify('reportengine', rec);
    END IF;
END
$$ LANGUAGE plpgsql;

-- Notify that a job should be deleted. This is usually
-- an actual delete or it's been made private
CREATE OR REPLACE FUNCTION reportengine.notifydelete(_id INTEGER)
RETURNS VOID AS $$
DECLARE
    rec TEXT;
BEGIN
    SELECT INTO rec row_to_json(t)::TEXT FROM (
        SELECT
            n.name,
            'DELETED' AS type
        FROM reportengine.name n
        WHERE n.id = _id
        LIMIT 1
    ) t;
    IF FOUND THEN
        PERFORM pg_notify('reportengine', rec);
    END IF;
END
$$ LANGUAGE plpgsql;

-- Notify when a job status is updated issue a notification

CREATE OR REPLACE FUNCTION reportengine.notifyupdate()
RETURNS TRIGGER AS $$
BEGIN
    PERFORM reportengine.notifystate(NEW.id::INTEGER);
    RETURN NEW;
END
$$ LANGUAGE plpgsql;

CREATE TRIGGER report_notifystate
    AFTER UPDATE ON reportengine.status
    FOR EACH ROW EXECUTE PROCEDURE reportengine.notifyupdate();

-- Delete if made private or update if public

CREATE OR REPLACE FUNCTION reportengine.notifydisabled()
RETURNS TRIGGER AS $$
DECLARE
    rec RECORD;
BEGIN
    IF NEW.public THEN
        PERFORM reportengine.notifystate(NEW.id::INTEGER);
    ELSE
        PERFORM reportengine.notifydelete(NEW.id::INTEGER);
    END IF;
    RETURN NEW;
END
$$ LANGUAGE plpgsql;

CREATE TRIGGER report_notifydisabled
    AFTER UPDATE ON reportengine.meta
    FOR EACH ROW EXECUTE PROCEDURE reportengine.notifydisabled();

config.yaml example in full

databases:
    testDB:
        enabled: true
        host: localhost
        port: 5432
        database: postgres
        user: postgres
        password: postgres
        ssl: false

rabbit:
    testRabbit: amqp://guest:password@localhost
notify:
    -
        enabled: true
        database: testdb
        name: rabbit
        handlers:
            rabbit:
                instance: testRabbit
                key: job.status
    -
        enabled: true
        database: testdb
        name: rabbit
        json: true
        handlers:
            rabbit:
                instance: testRabbit
                routingKey: key
                payload: body

Links:

Author: petermount1

Prolific Open Source developer who also works in the online gaming industry during the day. Develops in Java, Go, Python & any other language as necessary. Still develops on retro machines like BBC Micro & Amiga A1200

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: