One part of the rewrite of all of my sites is to make dynamic content more real-time, specifically using Websockets, so that content is as close as live as it’s possible. For this I’m using the current RabbitMQ release running within a docker container with the web_stomp plugin enabled. With this I can then have a webpage connect over websockets directly into the message broker and listen for messages.
Now the problem. Most events are defined as some change within a database so I now need a method to allow a database trigger to be able to send a message to RabbitMQ without hindering performance – more so when some tables are being updated 20-40 times a second.
Fortunately PostgreSQL provides us with asynchronous notifications via the LISTEN and NOTIFY commands, so we can add a notify command to the trigger to a named queue and then have a separate process listen to that queue. All I needed to implement was an application that did the listening.
notify-rabbit
This is a simple standalone nodejs application that runs within a Docker container. With a simple(ish) bit of configuration it can connect to one or more databases and listen for notification events. When those events are received it will then send them straight to a RabbitMQ topic. All client’s need to do is to bind to that topic with a specific routing key to receive the messages.
Firstly you need to be running Docker and retrieve the current version of the tool:
docker pull area51/node-rabbit
Next you need to create the config file & put it somewhere where the tool can read it. A sample is on GitHub but I’ll go through the basics here.
The config.yaml file consists of three seconds, one to define the database(s) to connect to, one the RabbitMQ instance(s) and one that defines the notify queue(s) to listen two.
databases
This section contains connection details to connect to your databases:
databases: testDB: enabled: true host: localhost port: 5432 database: postgres user: postgres password: postgres ssl: false
Here we have just one database configured called testDB which will be referred to later.
rabbit
This section defines details of the rabbitmq instances you want to connect to. It simply consists of a name for the instance and the connection URI to connect to it.
rabbit: testRabbit: amqp://guest:password@localhost
Note: You can put an IP address here instead of the hostname. If it’s an IPv6 address then wrap it within a pair of [ ].
notify
This section defines which databases you want to listen to for notifications. You usually have one entry per database (but you are not limited to this).
Simple messages
For the simple usecase of sending all messages as-is to a single routingKey.
notify: - enabled: true database: testdb name: rabbit handlers: rabbit: instance: testRabbit key: job.status
Here we are telling the application to listen for notifications sent to the ‘rabbit’ queue on the “testdb” database. All messages received would be sent as-is to the testRabbit RabbitMQ instance with the routing key ‘job.status’.
Then from PostgreSQL you can use the following to send the message.
SELECT pg_notify('rabbit','My message');
Set the routing key in PostgreSQL
More often you need PostgreSQL to define the routing key based on the data being notified. To do this we need to send the message from PostgreSQL as a JSON object with the routing key defined in one property and the message to send in another:
notify: - enabled: true database: testdb name: rabbit json: true handlers: rabbit: instance: testRabbit routingKey: key payload: body
Here we are telling the application to expect a JSON object from PostgreSQL with two properties.
- “key” will contain the routing key to use
- “body” will contain the message to send.
For example:
SELECT pg_notify('rabbit','{"key":"job.status","body": "My message"}');
This will then send a JSON string “My message” to rabbit with the routing key “job.status”.
You can send structured data as well, as long as it’s valid JSON:
SELECT pg_notify('rabbit','{"key":"td.SA","body":{"CT_MSG":{"time":"1349696911000", "area_id":"SA", "msg_type":"CT", "report_time":"1249"}}');
This will then send a JSON object {“CT_MSG”:{“time”:”1349696911000″, “area_id”:”SA”, “msg_type”:”CT”, “report_time”:”1249″}} with a routing key of “td.SA”/
Now the payload entry is optional. If you don’t define one then the entire original message is sent to Rabbit, including the original routing key property.
Running
Now you have it configued it’s time to rest:
docker run -it --rm -v $(pwd)/config.yaml:/opt/config.yaml:ro area51/node-notify
If all goes well you should not see any errors. If you connect to RabbitMQ’s Mnagement console and look under Connections you should then see something like this:
The label “Notify timetable reportengine” here will be different, “timetable” being the database name in the config and “reportengine” the notify queue name it’s listening on. If you have multiple notify entries then there will be one connection for each.
If all goes well press ^C to close the app and run it in the background:
docker run -d -v $(pwd)/config.yaml:/opt/config.yaml:ro area51/node-notify
Adding a trigger to a database
The final part is a real-life example of a trigger which issues notifications. Now I could have done a simple example but thought I might as well put a real life one instead. It’s a bit long but it should show what you can do more than a simple one.
Here we add triggers to two tables in our report engine. This engine manages regular jobs that run at specific intervals and with these triggers we send one of two events to a webpage showing the job status:
- “DELETED” removes the entry from the page when the job is made private.
- All other messages are updates, e.g. the job is running, or it has completed successfully or has failed.
CREATE OR REPLACE FUNCTION reportengine.notifystate(_id INTEGER) RETURNS VOID AS $$ DECLARE rec TEXT; BEGIN SELECT INTO rec row_to_json(t)::TEXT FROM ( SELECT n.name, m.disabled AS DISABLED, s.status, to_char(s.lastrun, 'YYYY-MM-DDThh24:MI:SSZ') AS "lastRun", to_char(s.nextrun, 'YYYY-MM-DDThh24:MI:SSZ') AS "nextRun", to_char(s.lastpass, 'YYYY-MM-DDThh24:MI:SSZ') AS "lastPass", to_char(s.lastfail, 'YYYY-MM-DDThh24:MI:SSZ') AS "lastFail", CASE WHEN s.status = 'RUNNING' AND s.nextRun IS NOT NULL AND t.showruntime THEN (now()::TIMESTAMP WITHOUT TIME ZONE -s.nextRun)::TEXT WHEN s.lastrun IS NULL THEN NULL WHEN s.lastfail IS NULL OR s.lastpass > s.lastfail THEN (s.lastpass - s.lastrun)::TEXT WHEN s.lastfail IS NOT NULL THEN (s.lastfail - s.lastrun)::TEXT ELSE NULL END AS duration, m.description AS description, t.name AS type FROM reportengine.name n INNER JOIN reportengine.status s ON n.id=s.id INNER JOIN reportengine.meta m ON n.id=m.id INNER JOIN reportengine.jobtype t ON m.type=t.id WHERE n.id = _id AND m.public LIMIT 1 ) t; IF FOUND THEN PERFORM pg_notify('reportengine', rec); END IF; END $$ LANGUAGE plpgsql; -- Notify that a job should be deleted. This is usually -- an actual delete or it's been made private CREATE OR REPLACE FUNCTION reportengine.notifydelete(_id INTEGER) RETURNS VOID AS $$ DECLARE rec TEXT; BEGIN SELECT INTO rec row_to_json(t)::TEXT FROM ( SELECT n.name, 'DELETED' AS type FROM reportengine.name n WHERE n.id = _id LIMIT 1 ) t; IF FOUND THEN PERFORM pg_notify('reportengine', rec); END IF; END $$ LANGUAGE plpgsql; -- Notify when a job status is updated issue a notification CREATE OR REPLACE FUNCTION reportengine.notifyupdate() RETURNS TRIGGER AS $$ BEGIN PERFORM reportengine.notifystate(NEW.id::INTEGER); RETURN NEW; END $$ LANGUAGE plpgsql; CREATE TRIGGER report_notifystate AFTER UPDATE ON reportengine.status FOR EACH ROW EXECUTE PROCEDURE reportengine.notifyupdate(); -- Delete if made private or update if public CREATE OR REPLACE FUNCTION reportengine.notifydisabled() RETURNS TRIGGER AS $$ DECLARE rec RECORD; BEGIN IF NEW.public THEN PERFORM reportengine.notifystate(NEW.id::INTEGER); ELSE PERFORM reportengine.notifydelete(NEW.id::INTEGER); END IF; RETURN NEW; END $$ LANGUAGE plpgsql; CREATE TRIGGER report_notifydisabled AFTER UPDATE ON reportengine.meta FOR EACH ROW EXECUTE PROCEDURE reportengine.notifydisabled();
config.yaml example in full
databases: testDB: enabled: true host: localhost port: 5432 database: postgres user: postgres password: postgres ssl: false rabbit: testRabbit: amqp://guest:password@localhost notify: - enabled: true database: testdb name: rabbit handlers: rabbit: instance: testRabbit key: job.status - enabled: true database: testdb name: rabbit json: true handlers: rabbit: instance: testRabbit routingKey: key payload: body
Links:
- Sources on GitHub
- Docker image on Docker Hub