Call Us: 972-74-767-94-67

PostgreSQL Real Time Notification

Overview

One of our clients there was a requirement to modify the data streamed into Elasticsearch according to changed network mapping in PostgreSQL.

The customer used Elasticsearch as its big data platform and PostgreSQL to store metadata such as network mapping.

Many Applications which work with databases need to get updates data from the database for their processing. For Example:

  1. Metadata tables which define application behavior : such as timeout parameters , bulk size when pushing / retrieving data from the database
  2. Lookup tables which used for data transformation, filtering and enrichment (ETL)
  3. Displaying / processing real time events which are pushed to the database from external sources like sensors data.

The requirement:

Application clients (Backend server, UI) want to know when there is new data or data is changed in the database and get notified about it in real time.

The Solution:

There are 2 common ways to achieve it:

  1. Having dedicate process which examine the relevant tables in the database every interval of time and in case of change (By Timestamp / ID) it send notification to the application and the application select the relevant data from the changed tables.
  2. Using Listening and Notification processes .In one side client side listen for change notifications on the other side data change processes send notification which the client consume and process accordingly.

PostgreSQL Solution

PostgreSQL has a nice NOTIFY and LISTEN feature which enable to send notification in case data is changed and clients which listen to this messages can act accordingly.

The following example demonstrate how to create C++ program which listen to specific message and upon data change , the data is being send as JSON from the database to the client program with no to run query in the database .

  1. Create the Database Objects:
  1. Create temp table which will be used for real time notifications:

create table temp_table (a integer , b varchar(40),c varchar(40));

  1. Create trigger procedure

CREATE OR REPLACE FUNCTION add_notification_json () RETURNS TRIGGER AS $BODY$

DECLARE

v_row json;

v_notify_id integer;

cmd varchar(4000);

BEGIN

    –Convert Row Data to JSON;

     v_row = row_to_json(new);

    RAISE INFO ‘This is %’ , v_row;

    cmd=’NOTIFY TEMP_TABLE, ”’ || v_row||””;

    execute cmd;

returning notify_id into v_notify_id;

    RETURN new;

END;

$BODY$

language plpgsql;

  1. Create a trigger which call the trigger procedure

create table temp_table (a integer , b varchar(40),c varchar(40));

CREATE TRIGGER temp_table_notify_trg

     AFTER INSERT OR UPDATE  ON temp_table

     FOR EACH ROW

     EXECUTE PROCEDURE add_notification_json();

  1. Create the C++ Program

The C++ database drive for PostgreSQL is libpq

#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#include <errno.h>

#include <sys/time.h>

#include <libpq-fe.h>

/* Define Database Connection Properties */

#define PG_HOST    “10.10.10.10”

#define PG_USER    “postgres”

#define PG_DB      “test”

#define PG_PASS    “postgres”

#define PG_PORT    5432

static void

exit_nicely(PGconn *conn)

{

    PQfinish(conn);

    exit(1);

}

int

main(int argc, char **argv)

{

    char *conninfo;

    PGconn     *conn;

    PGresult   *res;

    PGnotify   *notify;

    int         nnotifies;

   sprintf(conninfo,

            “user=%s password=%s dbname=%s hostaddr=%s port=%d”,

            PG_USER, PG_PASS, PG_DB, PG_HOST, PG_PORT);

    /* Make a connection to the database */

    conn = PQconnectdb(conninfo);

    /* Check to see that the backend connection was successfully made */

    if (PQstatus(conn) != CONNECTION_OK)

    {

        fprintf(stderr, “Connection to database failed: %s”,

                PQerrorMessage(conn));

        exit_nicely(conn);

    }

    /*

     * Issue LISTEN command to enable notifications from the trigger NOTIFY.

     */

    res = PQexec(conn, “LISTEN TEMP_TABLE”);

    if (PQresultStatus(res) != PGRES_COMMAND_OK)

    {

        fprintf(stderr, “LISTEN command failed: %s”, PQerrorMessage(conn));

        PQclear(res);

        exit_nicely(conn);

    }

    /*

     * should PQclear PGresult whenever it is no longer needed to avoid memory

     * leaks

     */

    PQclear(res);

    /* Quit after four notifies are received. */

    nnotifies = 0;

    while (nnotifies < 4)

    {

        /*

         * Sleep until something happens on the connection.  We use select(2)

         * to wait for input, but you could also use poll() or similar

         * facilities.

         */

        int         sock;

        fd_set      input_mask;

        sock = PQsocket(conn);

        if (sock < 0)

            break;              /* shouldn’t happen */

        FD_ZERO(&input_mask);

        FD_SET(sock, &input_mask);

        if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0)

        {

            fprintf(stderr, “select() failed: %s\n”, strerror(errno));

            exit_nicely(conn);

        }

        /* Now check for input , the extra contain the json data */

        PQconsumeInput(conn);

        while ((notify = PQnotifies(conn)) != NULL)

        {

            fprintf(stderr,

                    “ASYNC NOTIFY of ‘%s’ received from backend ,Message %s , PID %d\n”,

                    notify->relname,notify -> extra, notify->be_pid);

            PQfreemem(notify);

            nnotifies++;

        }

    }

    fprintf(stderr, “Done.\n”);

    /* close the connection to the database and cleanup */

    PQfinish(conn);

    return 0;

}

  1. Testing

insert into temp_table values (10,’alon’,’eldi’);

select * from temp_table

Written By Alon Eldi – CEO , Big Data & Cloud Expert in SeaData

SeaData is specialized in Elasticsearch and PostgreSQL

For Database Consulting and Support please Contact:

support@seadata.co.il

972-54-4080111

We will be happy to assist in any time.

Call Now Button Skip to content