On Mon, Jun 27, 2016 at 12:53 PM, Robert Haas <robertmh...@gmail.com> wrote:
> On Mon, Jun 13, 2016 at 10:27 PM, Peter Eisentraut
> <peter.eisentr...@2ndquadrant.com> wrote:
>> Modulo that last point, here is a patch that shows how I think this could
>> work, in combination with the patch I posted previously that sets the
>> "client encoding" in the parallel worker to the server encoding.
>>
>> This patch disassembles the NotificationResponse message with a temporary
>> client encoding, and then sends it off to the real frontend using the real
>> client encoding.
>>
>> Doing it this way also takes care of a few special cases that
>> NotifyMyFrontEnd() handles, such as a client with protocol version 2 that
>> doesn't expect a payload in the message.
>
> How does this address the concern raised in the last sentence of
> https://www.postgresql.org/message-id/CA+TgmoaAAEXmjVMB4nM=znf_5b9jhuim6q3afro4spt23ti...@mail.gmail.com
> ?  It seems that if an error occurs between the two SetClientEncoding
> calls, the change will persist for the rest of the session, resulting
> in chaos.

Please find attached an a patch for a proposed alternative approach.
This does the following:

1. When the client_encoding GUC is changed in the worker,
SetClientEncoding() is not called.  Thus, GetClientEncoding() in the
worker always returns the database encoding, regardless of how the
client encoding is set.  This is better than your approach of calling
SetClientEncoding() during worker startup, I think, because the worker
could call a parallel-safe function with SET clause, and one of the
GUCs temporarily set could be client_encoding.  That would be stupid,
but somebody could do it.

2. A new function pq_getmsgrawstring() is added.  This is like
pq_getmsgstring() but it does no encoding conversion.
pq_parse_errornotice() is changed to use pq_getmsgrawstring() instead
of pq_getmsgstring().  Because of (1), when the leader receives an
ErrorResponse or NoticeResponse from the worker, it will not have been
subject to encoding conversion; because of this item, the leader will
not try to convert it either when initially parsing it.  So the extra
encoding round-trip is avoided.

3. The changes for NotifyResponse which you proposed are included
here, but with the modification that pq_getmsgrawstring() is used.

This seems to fix your original test case for me, and hopefully all of
the related cases also.  Review is appreciated.  The main thing about
this is that it doesn't rely on being able to make temporary changes
to global variables, thus hopefully making it robust in the face of
non-local transfer of control.

(Official status update: I'll commit this on Thursday unless anyone
reports a problem with it before then.)

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 088700e..3996897 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -810,7 +810,17 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
 		case 'A':				/* NotifyResponse */
 			{
 				/* Propagate NotifyResponse. */
-				pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1);
+				int32		pid;
+				const char *channel;
+				const char *payload;
+
+				pid = pq_getmsgint(msg, 4);
+				channel = pq_getmsgrawstring(msg);
+				payload = pq_getmsgrawstring(msg);
+				pq_endmessage(msg);
+
+				NotifyMyFrontEnd(channel, payload, pid);
+
 				break;
 			}
 
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index c39ac3a..716f1c3 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -390,9 +390,6 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 							 char *page_buffer);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(void);
-static void NotifyMyFrontEnd(const char *channel,
-				 const char *payload,
-				 int32 srcPid);
 static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
 static void ClearPendingActionsAndNotifies(void);
 
@@ -2076,7 +2073,7 @@ ProcessIncomingNotify(void)
 /*
  * Send NOTIFY message to my front end.
  */
-static void
+void
 NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
 {
 	if (whereToSendOutput == DestRemote)
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index 962d75d..7180be6 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -755,6 +755,15 @@ assign_client_encoding(const char *newval, void *extra)
 {
 	int			encoding = *((int *) extra);
 
+	/*
+	 * Even though the value of the client_encoding GUC might change within a
+	 * parallel worker, we don't really change the client encoding in that
+	 * case.  For a parallel worker, the client is the leader process, which
+	 * always expects the database encoding from us.
+	 */
+	if (IsParallelWorker())
+		return;
+
 	/* We do not expect an error if PrepareClientEncoding succeeded */
 	if (SetClientEncoding(encoding) < 0)
 		elog(LOG, "SetClientEncoding(%d) failed", encoding);
diff --git a/src/backend/libpq/pqformat.c b/src/backend/libpq/pqformat.c
index 4ddea82..b5d9d64 100644
--- a/src/backend/libpq/pqformat.c
+++ b/src/backend/libpq/pqformat.c
@@ -65,6 +65,7 @@
  *		pq_copymsgbytes - copy raw data from a message buffer
  *		pq_getmsgtext	- get a counted text string (with conversion)
  *		pq_getmsgstring - get a null-terminated text string (with conversion)
+ *		pq_getmsgrawstring - get a null-terminated text string - NO conversion
  *		pq_getmsgend	- verify message fully consumed
  */
 
@@ -640,6 +641,35 @@ pq_getmsgstring(StringInfo msg)
 }
 
 /* --------------------------------
+ *		pq_getmsgrawstring - get a null-terminated text string - NO conversion
+ *
+ *		Returns a pointer directly into the message buffer.
+ * --------------------------------
+ */
+const char *
+pq_getmsgrawstring(StringInfo msg)
+{
+	char	   *str;
+	int			slen;
+
+	str = &msg->data[msg->cursor];
+
+	/*
+	 * It's safe to use strlen() here because a StringInfo is guaranteed to
+	 * have a trailing null byte.  But check we found a null inside the
+	 * message.
+	 */
+	slen = strlen(str);
+	if (msg->cursor + slen >= msg->len)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("invalid string in message")));
+	msg->cursor += slen + 1;
+
+	return str;
+}
+
+/* --------------------------------
  *		pq_getmsgend	- verify message fully consumed
  * --------------------------------
  */
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 3225c1f..0dcdee0 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -232,7 +232,7 @@ pq_parse_errornotice(StringInfo msg, ErrorData *edata)
 			pq_getmsgend(msg);
 			break;
 		}
-		value = pq_getmsgstring(msg);
+		value = pq_getmsgrawstring(msg);
 
 		switch (code)
 		{
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index b4c13fa..95559df 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -28,6 +28,10 @@ extern volatile sig_atomic_t notifyInterruptPending;
 extern Size AsyncShmemSize(void);
 extern void AsyncShmemInit(void);
 
+extern void NotifyMyFrontEnd(const char *channel,
+							 const char *payload,
+							 int32 srcPid);
+
 /* notify-related SQL statements */
 extern void Async_Notify(const char *channel, const char *payload);
 extern void Async_Listen(const char *channel);
diff --git a/src/include/libpq/pqformat.h b/src/include/libpq/pqformat.h
index 65ebf37..3c0d4b2 100644
--- a/src/include/libpq/pqformat.h
+++ b/src/include/libpq/pqformat.h
@@ -44,6 +44,7 @@ extern const char *pq_getmsgbytes(StringInfo msg, int datalen);
 extern void pq_copymsgbytes(StringInfo msg, char *buf, int datalen);
 extern char *pq_getmsgtext(StringInfo msg, int rawbytes, int *nbytes);
 extern const char *pq_getmsgstring(StringInfo msg);
+extern const char *pq_getmsgrawstring(StringInfo msg);
 extern void pq_getmsgend(StringInfo msg);
 
 #endif   /* PQFORMAT_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to