On 6/10/16 2:08 PM, Peter Eisentraut wrote:
On 6/6/16 9:45 PM, Peter Eisentraut wrote:
Attached is a patch to illustrates how this could be fixed.  There might
be similar issues elsewhere.  The notification propagation in particular
could be affected.

Tracing the code, NotificationResponse messages are converted to the
client encoding during transmission from the worker to the leader and
then sent on binarily to the client, so this should appear to work at
the moment.  But it will break if we make a change like I suggested,
namely changing the client encoding in the worker process to be the
server encoding, because then nothing will transcode it before it
reaches the client anymore.  So this will need a well-considered
solution in concert with the error/notice issue.

Then again, it's not clear to me under what circumstances a parallel
worker could or should be sending a NotificationResponse.

Modulo that last point, here is a patch that shows how I think this could work, in combination with the patch I posted previously that sets the "client encoding" in the parallel worker to the server encoding.

This patch disassembles the NotificationResponse message with a temporary client encoding, and then sends it off to the real frontend using the real client encoding.

Doing it this way also takes care of a few special cases that NotifyMyFrontEnd() handles, such as a client with protocol version 2 that doesn't expect a payload in the message.

--
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/access/transam/parallel.c 
b/src/backend/access/transam/parallel.c
index ab5ef25..0bb93b4 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -810,7 +810,22 @@ HandleParallelMessage(ParallelContext *pcxt, int i, 
StringInfo msg)
                case 'A':                               /* NotifyResponse */
                        {
                                /* Propagate NotifyResponse. */
-                               pq_putmessage(msg->data[0], &msg->data[1], 
msg->len - 1);
+                               int             save_client_encoding;
+                               int32   pid;
+                               const char *channel;
+                               const char *payload;
+
+                               save_client_encoding = pg_get_client_encoding();
+                               SetClientEncoding(GetDatabaseEncoding());
+
+                               pid = pq_getmsgint(msg, 4);
+                               channel = pq_getmsgstring(msg);
+                               payload = pq_getmsgstring(msg);
+                               pq_endmessage(msg);
+
+                               SetClientEncoding(save_client_encoding);
+
+                               NotifyMyFrontEnd(channel, payload, pid);
                                break;
                        }
 
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index c39ac3a..716f1c3 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -390,9 +390,6 @@ static bool asyncQueueProcessPageEntries(volatile 
QueuePosition *current,
                                                         char *page_buffer);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(void);
-static void NotifyMyFrontEnd(const char *channel,
-                                const char *payload,
-                                int32 srcPid);
 static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
 static void ClearPendingActionsAndNotifies(void);
 
@@ -2076,7 +2073,7 @@ ProcessIncomingNotify(void)
 /*
  * Send NOTIFY message to my front end.
  */
-static void
+void
 NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
 {
        if (whereToSendOutput == DestRemote)
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index b4c13fa..95559df 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -28,6 +28,10 @@ extern volatile sig_atomic_t notifyInterruptPending;
 extern Size AsyncShmemSize(void);
 extern void AsyncShmemInit(void);
 
+extern void NotifyMyFrontEnd(const char *channel,
+                                                        const char *payload,
+                                                        int32 srcPid);
+
 /* notify-related SQL statements */
 extern void Async_Notify(const char *channel, const char *payload);
 extern void Async_Listen(const char *channel);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to