PostgreSQL Real Time Notification

Overview

There was a requirement requested by one of our clients to modify the data streamed into Elasticsearch according to changed network mapping in PostgreSQL. The customer used Elasticsearch as its big data platform and PostgreSQL to store metadata such as network mapping. Many applications which work with databases need to get updated data from the database for their processing.

For Example:

  • Metadata tables which define application behavior: such as timeout parameters, bulk size when pushing / retrieving data from the database
  • Lookup tables which are used for data transformation, filtering and enrichment (ETL)
  • Displaying / processing real time events which are pushed to the database from external sources like sensors data

The Requirement

Application clients (Backend server, UI) want to know when there is new data or data is changed in the database and be notified in real time.

The Solution

There are 2 common ways to achieve it:

  1. Having dedicated processes which examine the relevant tables in the database; every interval of time and each change (by timestamp / ID) sends notification to the application and the application selects the relevant data from the changed tables.
  2. Using Listening and Notification processes. The client side listens for change notifications, on the other side data change processes send notifications which the client can consume and process accordingly.

PostgreSQL Solution

PostgreSQL has a nice NOTIFY and LISTEN feature which sends notifications in the case in which data is changed, as a result clients listening to these messages can act accordingly.

 

The following example demonstrates how to create C++ program which listens to specific messages and upon data change, the data is being sent as JSON from the database to the client program with “no” to run queries in the database.

  1. Create the Database Objects:
  2. Create temp table which will be used for real time notifications:
  3. Create table temp_table (a integer , b varchar(40),c varchar(40));
  4. 4. Create trigger procedure:

 

CREATE OR REPLACE FUNCTION add_notification_json () RETURNS TRIGGER AS $BODY$

DECLARE

v_row json;

v_notify_id integer;

cmd varchar(4000);

BEGIN

            –Convert Row Data to JSON;

            v_row = row_to_json(new);

            RAISE INFO ‘This is %’ , v_row;

            cmd=’NOTIFY TEMP_TABLE, ”’ || v_row||””;

            execute cmd;

returning notify_id into v_notify_id;

            RETURN new;

END;

$BODY$

language plpgsql;

 

  1. Create a trigger which call the trigger procedure

create table temp_table (a integer , b varchar(40),c varchar(40));

CREATE TRIGGER temp_table_notify_trg

            AFTER INSERT OR UPDATE  ON temp_table

            FOR EACH ROW

            EXECUTE PROCEDURE add_notification_json();

 

  1. Create the C++ Program

The C++ database drive for PostgreSQL is libpq

#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#include <errno.h>

#include <sys/time.h>

#include <libpq-fe.h>

/* Define Database Connection Properties */

#define PG_HOST      “10.10.10.10”

#define PG_USER       “postgres”

#define PG_DB           “test”

#define PG_PASS       “postgres”

#define PG_PORT      5432

static void

exit_nicely(PGconn *conn)

{

            PQfinish(conn);

            exit(1);

}

int

main(int argc, char **argv)

{

            char *conninfo;

            PGconn           *conn;

            PGresult   *res;

            PGnotify   *notify;

            int        nnotifies;

   sprintf(conninfo,

            “user=%s password=%s dbname=%s hostaddr=%s port=%d”,

            PG_USER, PG_PASS, PG_DB, PG_HOST, PG_PORT);

            /* Make a connection to the database */

            conn = PQconnectdb(conninfo);

            /* Check to see that the backend connection was successfully made */

            if (PQstatus(conn) != CONNECTION_OK)

            {

            fprintf(stderr, “Connection to database failed: %s”,

                        PQerrorMessage(conn));

            exit_nicely(conn);

            }

            /*

            * Issue LISTEN command to enable notifications from the trigger NOTIFY.

            */

            res = PQexec(conn, “LISTEN TEMP_TABLE”);

            if (PQresultStatus(res) != PGRES_COMMAND_OK)

            {

            fprintf(stderr, “LISTEN command failed: %s”, PQerrorMessage(conn));

            PQclear(res);

            exit_nicely(conn);

            }

            /*

            * should PQclear PGresult whenever it is no longer needed to avoid memory

            * leaks

            */

            PQclear(res);

            /* Quit after four notifies are received. */

            nnotifies = 0;

            while (nnotifies < 4)

            {

            /*

            * Sleep until something happens on the connection.  We use select(2)

            * to wait for input, but you could also use poll() or similar

            * facilities.

            */

            int        sock;

            fd_set              input_mask;

            sock = PQsocket(conn);

            if (sock < 0)

            break;              /* shouldn’t happen */

            FD_ZERO(&input_mask);

            FD_SET(sock, &input_mask);

            if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0)

            {

            fprintf(stderr, “select() failed: %s\n”, strerror(errno));

            exit_nicely(conn);

            }

            /* Now check for input , the extra contain the json data */

            PQconsumeInput(conn);

            while ((notify = PQnotifies(conn)) != NULL)

            {

            fprintf(stderr,

                        “ASYNC NOTIFY of ‘%s’ received from backend ,Message %s , PID %d\n”,

                        notify->relname,notify -> extra, notify->be_pid);

            PQfreemem(notify);

            nnotifies++;

            }

            }

            fprintf(stderr, “Done.\n”);

            /* close the connection to the database and cleanup */

            PQfinish(conn);

            return 0;

}

  1. Testing

insert into temp_table values (10,’alon’,’eldi’);

select * from temp_table

 

Written By Alon Eldi – CEO , Big Data & Cloud Expert in SeaData

SeaData is specialized in Elasticsearch and PostgreSQL

For Database Consulting and Support please contact:

support@seadata.co.il

972-54-408-0111

We will be happy to assist at any time.

Call Now Button
התחל שיחה
פנו אלינו בוואצאפ!
היי, אנחנו זמינים עבורך בוואצאפ!
Skip to content
oldversion.com
playstation 2 emulator android