On 18.01.2018 18:02, Tomas Vondra wrote:
Hi Konstantin,

On 01/18/2018 03:48 PM, Konstantin Knizhnik wrote:
On 17.01.2018 19:09, Konstantin Knizhnik wrote:
Hi hackers,

...
I haven't looked at the code yet, but after reading your message I have
a simple question - gow iss this going to work with SSL? If you're only
passing a file descriptor, that does not seem to be sufficient for the
backends to do crypto (that requires the SSL stuff from Port).

Maybe I'm missing something and it already works, though ...


regards

Ooops, I missed this aspect with SSL. Thank you.
New version of the patch which correctly maintain session context is attached. Now each session has its own allocator which should beĀ  used instead of TopMemoryAllocator.
SSL connections work now.

--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index b945b15..a73d584 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -813,3 +813,30 @@ build_regtype_array(Oid *param_types, int num_params)
 	result = construct_array(tmp_ary, num_params, REGTYPEOID, 4, true, 'i');
 	return PointerGetDatum(result);
 }
+
+
+void
+DropSessionPreparedStatements(char const* sessionId)
+{
+	HASH_SEQ_STATUS seq;
+	PreparedStatement *entry;
+	size_t idLen = strlen(sessionId);
+
+	/* nothing cached */
+	if (!prepared_queries)
+		return;
+
+	/* walk over cache */
+	hash_seq_init(&seq, prepared_queries);
+	while ((entry = hash_seq_search(&seq)) != NULL)
+	{
+		if (strncmp(entry->stmt_name, sessionId, idLen) == 0 && entry->stmt_name[idLen] == '.')
+		{
+			/* Release the plancache entry */
+			DropCachedPlan(entry->plansource);
+
+			/* Now we can remove the hash table entry */
+			hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL);
+		}
+	}
+}
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..5b07a88 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -1029,6 +1029,18 @@ pq_peekbyte(void)
 }
 
 /* --------------------------------
+ *		pq_peekbyte		- peek at next byte from connection
+ *
+ *	 Same as pq_getbyte() except we don't advance the pointer.
+ * --------------------------------
+ */
+int
+pq_available_bytes(void)
+{
+	return PqRecvLength - PqRecvPointer;
+}
+
+/* --------------------------------
  *		pq_getbyte_if_available - get a single byte from connection,
  *			if available
  *
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index aba1e92..56ec998 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o send_sock.o $(TAS)
 
 ifeq ($(PORTNAME), win32)
 SUBDIRS += win32
diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c
new file mode 100644
index 0000000..7b36923
--- /dev/null
+++ b/src/backend/port/send_sock.c
@@ -0,0 +1,89 @@
+/*-------------------------------------------------------------------------
+ *
+ * send_sock.c
+ *	  Send socket descriptor to another process
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/port/send_sock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+/*
+ * Send socket descriptor "sock" to backend process through Unix socket "chan"
+ */
+int pg_send_sock(pgsocket chan, pgsocket sock)
+{
+    struct msghdr msg = { 0 };
+	struct iovec io;
+	struct cmsghdr * cmsg;
+    char buf[CMSG_SPACE(sizeof(sock))];
+    memset(buf, '\0', sizeof(buf));
+
+    /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */
+    io.iov_base = "";
+	io.iov_len = 1;
+
+    msg.msg_iov = &io;
+    msg.msg_iovlen = 1;
+    msg.msg_control = buf;
+    msg.msg_controllen = sizeof(buf);
+
+    cmsg = CMSG_FIRSTHDR(&msg);
+    cmsg->cmsg_level = SOL_SOCKET;
+    cmsg->cmsg_type = SCM_RIGHTS;
+    cmsg->cmsg_len = CMSG_LEN(sizeof(sock));
+
+    memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock));
+    msg.msg_controllen = cmsg->cmsg_len;
+
+    if (sendmsg(chan, &msg, 0) < 0)
+	{
+		return -1;
+	}
+	return 0;
+}
+
+
+/*
+ * Receive socket descriptor from postmaster process through Unix socket "chan"
+ */
+pgsocket pg_recv_sock(pgsocket chan)
+{
+    struct msghdr msg = {0};
+    char c_buffer[256];
+    char m_buffer[256];
+    struct iovec io;
+	struct cmsghdr * cmsg;
+	pgsocket sock;
+
+    io.iov_base = m_buffer;
+	io.iov_len = sizeof(m_buffer);
+    msg.msg_iov = &io;
+    msg.msg_iovlen = 1;
+
+    msg.msg_control = c_buffer;
+    msg.msg_controllen = sizeof(c_buffer);
+
+    if (recvmsg(chan, &msg, 0) < 0)
+	{
+		return -1;
+	}
+
+    cmsg = CMSG_FIRSTHDR(&msg);
+    memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock));
+
+    return sock;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index f3ddf82..4586b57 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -169,6 +169,7 @@ typedef struct bkend
 	pid_t		pid;			/* process id of backend */
 	int32		cancel_key;		/* cancel key for cancels for this backend */
 	int			child_slot;		/* PMChildSlot for this backend, if any */
+	pgsocket    session_send_sock;  /* Write end of socket pipe to this backend used to send session socket descriptor to the backend process */
 
 	/*
 	 * Flavor of backend or auxiliary process.  Note that BACKEND_TYPE_WALSND
@@ -182,6 +183,15 @@ typedef struct bkend
 } Backend;
 
 static dlist_head BackendList = DLIST_STATIC_INIT(BackendList);
+/*
+ * Pointer in backend list used to implement round-robin distribution of sessions through backends.
+ * This variable either NULL, either points to the normal backend.
+ */
+static Backend*   BackendListClockPtr;
+/*
+ * Number of active normal backends
+ */
+static int        nNormalBackends;
 
 #ifdef EXEC_BACKEND
 static Backend *ShmemBackendArray;
@@ -412,7 +422,6 @@ static void BackendRun(Port *port) pg_attribute_noreturn();
 static void ExitPostmaster(int status) pg_attribute_noreturn();
 static int	ServerLoop(void);
 static int	BackendStartup(Port *port);
-static int	ProcessStartupPacket(Port *port, bool SSLdone);
 static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options);
 static void processCancelRequest(Port *port, void *pkt);
 static int	initMasks(fd_set *rmask);
@@ -568,6 +577,22 @@ HANDLE		PostmasterHandle;
 #endif
 
 /*
+ * Move current backend pointer to the next normal backend.
+ * This function is called either when new session is started to implement round-robin policy, either when backend pointer by BackendListClockPtr is terminated
+ */
+static void AdvanceBackendListClockPtr(void)
+{
+	Backend* b = BackendListClockPtr;
+	do {
+		dlist_node* node = &b->elem;
+		node = node->next ? node->next : BackendList.head.next;
+		b = dlist_container(Backend, elem, node);
+	} while (b->bkend_type != BACKEND_TYPE_NORMAL && b != BackendListClockPtr);
+
+	BackendListClockPtr = b;
+}
+
+/*
  * Postmaster main entry point
  */
 void
@@ -1944,8 +1969,8 @@ initMasks(fd_set *rmask)
  * send anything to the client, which would typically be appropriate
  * if we detect a communications failure.)
  */
-static int
-ProcessStartupPacket(Port *port, bool SSLdone)
+int
+ProcessStartupPacket(Port *port, bool SSLdone, MemoryContext memctx)
 {
 	int32		len;
 	void	   *buf;
@@ -2043,7 +2068,7 @@ retry1:
 #endif
 		/* regular startup packet, cancel, etc packet should follow... */
 		/* but not another SSL negotiation request */
-		return ProcessStartupPacket(port, true);
+		return ProcessStartupPacket(port, true, memctx);
 	}
 
 	/* Could add additional special packet types here */
@@ -2073,7 +2098,7 @@ retry1:
 	 * not worry about leaking this storage on failure, since we aren't in the
 	 * postmaster process anymore.
 	 */
-	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	oldcontext = MemoryContextSwitchTo(memctx);
 
 	if (PG_PROTOCOL_MAJOR(proto) >= 3)
 	{
@@ -2449,7 +2474,7 @@ ConnCreate(int serverFd)
 		ConnFree(port);
 		return NULL;
 	}
-
+	SessionPoolSock = PGINVALID_SOCKET;
 	/*
 	 * Allocate GSSAPI specific state struct
 	 */
@@ -3236,6 +3261,24 @@ CleanupBackgroundWorker(int pid,
 }
 
 /*
+ * Unlink backend from backend's list and free memory
+ */
+static void UnlinkBackend(Backend* bp)
+{
+	if (bp->bkend_type == BACKEND_TYPE_NORMAL)
+	{
+		if (bp == BackendListClockPtr)
+			AdvanceBackendListClockPtr();
+		if (bp->session_send_sock != PGINVALID_SOCKET)
+			close(bp->session_send_sock);
+		elog(DEBUG2, "Cleanup backend %d", bp->pid);
+		nNormalBackends -= 1;
+	}
+	dlist_delete(&bp->elem);
+	free(bp);
+}
+
+/*
  * CleanupBackend -- cleanup after terminated backend.
  *
  * Remove all local state associated with backend.
@@ -3312,8 +3355,7 @@ CleanupBackend(int pid,
 				 */
 				BackgroundWorkerStopNotifications(bp->pid);
 			}
-			dlist_delete(iter.cur);
-			free(bp);
+			UnlinkBackend(bp);
 			break;
 		}
 	}
@@ -3415,8 +3457,7 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
 				ShmemBackendArrayRemove(bp);
 #endif
 			}
-			dlist_delete(iter.cur);
-			free(bp);
+			UnlinkBackend(bp);
 			/* Keep looping so we can signal remaining backends */
 		}
 		else
@@ -4017,6 +4058,19 @@ BackendStartup(Port *port)
 {
 	Backend    *bn;				/* for backend cleanup */
 	pid_t		pid;
+	int         session_pipe[2];
+
+	if (SessionPoolSize != 0 && nNormalBackends >= SessionPoolSize)
+	{
+		/* Instead of spawning new backend open new session at one of the existed backends. */
+		Assert(BackendListClockPtr && BackendListClockPtr->session_send_sock != PGINVALID_SOCKET);
+		elog(DEBUG2, "Start new session for socket %d at backend %d total %d", port->sock, BackendListClockPtr->pid, nNormalBackends);
+		if (pg_send_sock(BackendListClockPtr->session_send_sock, port->sock) < 0)
+			elog(FATAL, "Failed to send session socket: %m");
+		AdvanceBackendListClockPtr(); /* round-robin */
+		return STATUS_OK;
+	}
+
 
 	/*
 	 * Create backend data structure.  Better before the fork() so we can
@@ -4030,7 +4084,6 @@ BackendStartup(Port *port)
 				 errmsg("out of memory")));
 		return STATUS_ERROR;
 	}
-
 	/*
 	 * Compute the cancel key that will be assigned to this backend. The
 	 * backend will have its own copy in the forked-off process' value of
@@ -4063,12 +4116,23 @@ BackendStartup(Port *port)
 	/* Hasn't asked to be notified about any bgworkers yet */
 	bn->bgworker_notify = false;
 
+	if (SessionPoolSize != 0)
+		if (socketpair(AF_UNIX, SOCK_DGRAM, 0, session_pipe) < 0)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg_internal("could not create socket pair for launching sessions: %m")));
+
 #ifdef EXEC_BACKEND
 	pid = backend_forkexec(port);
 #else							/* !EXEC_BACKEND */
 	pid = fork_process();
 	if (pid == 0)				/* child */
 	{
+		if (SessionPoolSize != 0)
+		{
+			SessionPoolSock = session_pipe[0];
+			close(session_pipe[1]);
+		}
 		free(bn);
 
 		/* Detangle from postmaster */
@@ -4110,9 +4174,19 @@ BackendStartup(Port *port)
 	 * of backends.
 	 */
 	bn->pid = pid;
+	if (SessionPoolSize != 0)
+	{
+		bn->session_send_sock = session_pipe[1];
+		close(session_pipe[0]);
+	}
+	else
+		bn->session_send_sock = PGINVALID_SOCKET;
 	bn->bkend_type = BACKEND_TYPE_NORMAL;	/* Can change later to WALSND */
 	dlist_push_head(&BackendList, &bn->elem);
-
+	if (BackendListClockPtr == NULL)
+		BackendListClockPtr = bn;
+	nNormalBackends += 1;
+	elog(DEBUG2, "Start backend %d total %d", pid, nNormalBackends);
 #ifdef EXEC_BACKEND
 	if (!bn->dead_end)
 		ShmemBackendArrayAdd(bn);
@@ -4299,7 +4373,7 @@ BackendInitialize(Port *port)
 	 * Receive the startup packet (which might turn out to be a cancel request
 	 * packet).
 	 */
-	status = ProcessStartupPacket(port, false);
+	status = ProcessStartupPacket(port, false, TopMemoryContext);
 
 	/*
 	 * Stop here if it was bad or a cancel packet.  ProcessStartupPacket
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index e6706f7..9c42fab 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -76,6 +76,7 @@ struct WaitEventSet
 {
 	int			nevents;		/* number of registered events */
 	int			nevents_space;	/* maximum number of events in this set */
+	int         free_events;    /* L1-list of free events linked by "pos" and terminated by -1*/
 
 	/*
 	 * Array, of nevents_space length, storing the definition of events this
@@ -129,7 +130,7 @@ static void drainSelfPipe(void);
 #if defined(WAIT_USE_EPOLL)
 static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
 #elif defined(WAIT_USE_POLL)
-static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove);
 #elif defined(WAIT_USE_WIN32)
 static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event);
 #endif
@@ -562,6 +563,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 
 	set->latch = NULL;
 	set->nevents_space = nevents;
+	set->free_events = -1;
 
 #if defined(WAIT_USE_EPOLL)
 #ifdef EPOLL_CLOEXEC
@@ -667,6 +669,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 				  void *user_data)
 {
 	WaitEvent  *event;
+	int free_event;
 
 	/* not enough space */
 	Assert(set->nevents < set->nevents_space);
@@ -690,8 +693,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 	if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
 		elog(ERROR, "cannot wait on socket event without a socket");
 
-	event = &set->events[set->nevents];
-	event->pos = set->nevents++;
+	free_event = set->free_events;
+	if (free_event >= 0)
+	{
+		event = &set->events[free_event];
+		set->free_events = event->pos;
+		event->pos = free_event;
+	}
+	else
+	{
+		event = &set->events[set->nevents];
+		event->pos = set->nevents;
+	}
+	set->nevents += 1;
 	event->fd = fd;
 	event->events = events;
 	event->user_data = user_data;
@@ -718,7 +732,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 #if defined(WAIT_USE_EPOLL)
 	WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
 #elif defined(WAIT_USE_POLL)
-	WaitEventAdjustPoll(set, event);
+	WaitEventAdjustPoll(set, event, false);
 #elif defined(WAIT_USE_WIN32)
 	WaitEventAdjustWin32(set, event);
 #endif
@@ -727,6 +741,27 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 }
 
 /*
+ * Remove event with specified socket descriptor
+ */
+void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd)
+{
+	int i, n = set->nevents;
+	for (i = 0; i < n; i++)
+	{
+		WaitEvent  *event = &set->events[i];
+		if (event->fd == fd)
+		{
+#if defined(WAIT_USE_EPOLL)
+			WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_POLL)
+			WaitEventAdjustPoll(set, event, true);
+#endif
+			break;
+		}
+	}
+}
+
+/*
  * Change the event mask and, in the WL_LATCH_SET case, the latch associated
  * with the WaitEvent.
  *
@@ -774,7 +809,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
 #if defined(WAIT_USE_EPOLL)
 	WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
 #elif defined(WAIT_USE_POLL)
-	WaitEventAdjustPoll(set, event);
+	WaitEventAdjustPoll(set, event, false);
 #elif defined(WAIT_USE_WIN32)
 	WaitEventAdjustWin32(set, event);
 #endif
@@ -827,14 +862,33 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
 		ereport(ERROR,
 				(errcode_for_socket_access(),
 				 errmsg("epoll_ctl() failed: %m")));
+
+	if (action == EPOLL_CTL_DEL)
+	{
+		int pos = event->pos;
+		event->fd = PGINVALID_SOCKET;
+		set->nevents -= 1;
+		event->pos = set->free_events;
+		set->free_events = pos;
+	}
 }
 #endif
 
 #if defined(WAIT_USE_POLL)
 static void
-WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove)
 {
-	struct pollfd *pollfd = &set->pollfds[event->pos];
+	int pos = event->pos;
+	struct pollfd *pollfd = &set->pollfds[pos];
+
+	if (remove)
+	{
+		set->nevents -= 1;
+		*pollfd = set->pollfds[set->nevents];
+		set->events[pos] = set->events[set->nevents];
+		event->pos = pos;
+		return;
+	}
 
 	pollfd->revents = 0;
 	pollfd->fd = event->fd;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index ddc3ec8..f8abfd0 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -75,9 +75,17 @@
 #include "utils/snapmgr.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
+#include "utils/builtins.h"
 #include "mb/pg_wchar.h"
 
 
+typedef struct SessionContext
+{
+	MemoryContext memory;
+	Port* port;
+	char* id;
+} SessionContext;
+
 /* ----------------
  *		global variables
  * ----------------
@@ -98,6 +106,8 @@ int			max_stack_depth = 100;
 /* wait N seconds to allow attach from a debugger */
 int			PostAuthDelay = 0;
 
+/* Local socket for redirecting sessions to the backends */ 
+pgsocket    SessionPoolSock = PGINVALID_SOCKET;
 
 
 /* ----------------
@@ -169,6 +179,11 @@ static ProcSignalReason RecoveryConflictReason;
 static MemoryContext row_description_context = NULL;
 static StringInfoData row_description_buf;
 
+static WaitEventSet*   SessionPool;
+static int64           SessionCount;
+static SessionContext* CurrentSession;
+static Port*           BackendPort;
+
 /* ----------------------------------------------------------------
  *		decls for routines only used in this file
  * ----------------------------------------------------------------
@@ -194,6 +209,22 @@ static void log_disconnections(int code, Datum arg);
 static void enable_statement_timeout(void);
 static void disable_statement_timeout(void);
 
+/*
+ * Generate session ID unique within this backend
+ */
+static char* CreateSessionId(void)
+{
+	char buf[64];
+	pg_lltoa(++SessionCount, buf);
+	return pstrdup(buf);
+}
+
+static void DeleteSession(SessionContext* session)
+{
+	elog(LOG, "Delete session %p, id=%s,  memory context=%p", session, session->id, session->memory);
+	MemoryContextDelete(session->memory);
+	free(session);
+}
 
 /* ----------------------------------------------------------------
  *		routines to obtain user input
@@ -1232,6 +1263,12 @@ exec_parse_message(const char *query_string,	/* string to execute */
 	bool		save_log_statement_stats = log_statement_stats;
 	char		msec_str[32];
 
+	if (CurrentSession && stmt_name[0] != '\0')
+	{
+		/* Make names of prepared statements unique for session in case of using internal session pool */
+		stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name);
+	}
+
 	/*
 	 * Report query to various monitoring facilities.
 	 */
@@ -1503,6 +1540,12 @@ exec_bind_message(StringInfo input_message)
 	portal_name = pq_getmsgstring(input_message);
 	stmt_name = pq_getmsgstring(input_message);
 
+	if (CurrentSession && stmt_name[0] != '\0')
+	{
+		/* Make names of prepared statements unique for session in case of using internal session pool */
+		stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name);
+	}
+
 	ereport(DEBUG2,
 			(errmsg("bind %s to %s",
 					*portal_name ? portal_name : "<unnamed>",
@@ -2325,6 +2368,12 @@ exec_describe_statement_message(const char *stmt_name)
 	CachedPlanSource *psrc;
 	int			i;
 
+	if (CurrentSession && stmt_name[0] != '\0')
+	{
+		/* Make names of prepared statements unique for session in case of using internal session pool */
+		stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name);
+	}
+
 	/*
 	 * Start up a transaction command. (Note that this will normally change
 	 * current memory context.) Nothing happens if we are already in one.
@@ -3603,7 +3652,6 @@ process_postgres_switches(int argc, char *argv[], GucContext ctx,
 #endif
 }
 
-
 /* ----------------------------------------------------------------
  * PostgresMain
  *	   postgres main loop -- all backends, interactive or otherwise start here
@@ -3654,6 +3702,21 @@ PostgresMain(int argc, char *argv[],
 							progname)));
 	}
 
+	/* Assign session ID if use session pooling */
+	if (SessionPoolSize != 0)
+	{
+		MemoryContext oldcontext;
+		CurrentSession = (SessionContext*)malloc(sizeof(SessionContext));
+		CurrentSession->memory = AllocSetContextCreate(TopMemoryContext,
+													   "SessionMemoryContext",
+													   ALLOCSET_DEFAULT_SIZES);
+		oldcontext = MemoryContextSwitchTo(CurrentSession->memory);
+		CurrentSession->id = CreateSessionId();
+		CurrentSession->port = MyProcPort;
+		BackendPort = MyProcPort;
+		MemoryContextSwitchTo(oldcontext);
+	}
+
 	/* Acquire configuration parameters, unless inherited from postmaster */
 	if (!IsUnderPostmaster)
 	{
@@ -3783,7 +3846,7 @@ PostgresMain(int argc, char *argv[],
 	 * ... else we'd need to copy the Port data first.  Also, subsidiary data
 	 * such as the username isn't lost either; see ProcessStartupPacket().
 	 */
-	if (PostmasterContext)
+	if (PostmasterContext && SessionPoolSize == 0)
 	{
 		MemoryContextDelete(PostmasterContext);
 		PostmasterContext = NULL;
@@ -4069,6 +4132,120 @@ PostgresMain(int argc, char *argv[],
 
 			ReadyForQuery(whereToSendOutput);
 			send_ready_for_query = false;
+
+			if (SessionPoolSock != PGINVALID_SOCKET && !IsTransactionState() && pq_available_bytes() == 0)
+			{
+				WaitEvent ready_client;
+				if (SessionPool == NULL)
+				{
+					SessionPool = CreateWaitEventSet(TopMemoryContext, MaxSessions);
+					AddWaitEventToSet(SessionPool, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, CurrentSession);
+					AddWaitEventToSet(SessionPool, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, CurrentSession);
+					AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, SessionPoolSock, NULL, CurrentSession);
+					AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, MyProcPort->sock, NULL, CurrentSession);
+				}
+			  ChooseSession:
+				DoingCommandRead = true;
+				if (WaitEventSetWait(SessionPool, -1, &ready_client, 1, PG_WAIT_CLIENT) != 1)
+				{
+					/* TODO: do some error recovery here */
+					elog(FATAL, "Failed to poll client sessions");
+				}
+				CHECK_FOR_INTERRUPTS();
+				DoingCommandRead = false;
+
+				if (ready_client.events & WL_POSTMASTER_DEATH)
+					ereport(FATAL,
+							(errcode(ERRCODE_ADMIN_SHUTDOWN),
+							 errmsg("terminating connection due to unexpected postmaster exit")));
+
+				if (ready_client.events & WL_LATCH_SET)
+				{
+					ResetLatch(MyLatch);
+					ProcessClientReadInterrupt(true);
+					goto ChooseSession;
+				}
+
+				if (ready_client.fd == SessionPoolSock)
+				{
+					int		 status;
+					SessionContext* session;
+					StringInfoData buf;
+					Port*    port;
+					pgsocket sock;
+					MemoryContext oldcontext;
+
+					sock = pg_recv_sock(SessionPoolSock);
+					if (sock < 0)
+						elog(FATAL, "Failed to receive session socket: %m");
+
+					session = (SessionContext*)malloc(sizeof(SessionContext));
+					session->memory = AllocSetContextCreate(TopMemoryContext,
+															"SessionMemoryContext",
+															ALLOCSET_DEFAULT_SIZES);
+					oldcontext = MemoryContextSwitchTo(session->memory);
+					port = palloc(sizeof(Port));
+					memcpy(port, BackendPort, sizeof(Port));
+
+					/*
+					 * Receive the startup packet (which might turn out to be a cancel request
+					 * packet).
+					 */
+					port->sock = sock;
+					session->port = port;
+					session->id = CreateSessionId();
+
+					MyProcPort = port;
+					status = ProcessStartupPacket(port, false, session->memory);
+					MemoryContextSwitchTo(oldcontext);
+
+					if (strcmp(port->database_name, MyProcPort->database_name) ||
+						strcmp(port->user_name, MyProcPort->user_name))
+					{
+						elog(FATAL, "Failed to open session (dbname=%s user=%s) in backend %d (dbname=%s user=%s)",
+							 port->database_name, port->user_name,
+							 MyProcPid, MyProcPort->database_name, MyProcPort->user_name);
+					}
+					else if (status == STATUS_OK)
+					{
+						elog(DEBUG2, "Start new session %d in backend %d for database %s user %s",
+							 sock, MyProcPid, port->database_name, port->user_name);
+						CurrentSession = session;
+						AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, sock, NULL, session);
+
+						SetCurrentStatementStartTimestamp();
+						StartTransactionCommand();
+						PerformAuthentication(MyProcPort);
+						CommitTransactionCommand();
+
+						BeginReportingGUCOptions();
+						/*
+						 * Send this backend's cancellation info to the frontend.
+						 */
+						pq_beginmessage(&buf, 'K');
+						pq_sendint32(&buf, (int32) MyProcPid);
+						pq_sendint32(&buf, (int32) MyCancelKey);
+						pq_endmessage(&buf);
+
+						/* Need not flush since ReadyForQuery will do it. */
+						send_ready_for_query = true;
+						continue;
+					}
+					else
+					{
+						DeleteSession(session);
+						elog(LOG, "Session startup failed");
+						close(sock);
+						goto ChooseSession;
+					}
+				}
+				else
+				{
+					elog(DEBUG2, "Switch to session %d in backend %d", ready_client.fd, MyProcPid);
+					CurrentSession = (SessionContext*)ready_client.user_data;
+					MyProcPort = CurrentSession->port;
+				}
+			}
 		}
 
 		/*
@@ -4350,6 +4527,29 @@ PostgresMain(int argc, char *argv[],
 				 * it will fail to be called during other backend-shutdown
 				 * scenarios.
 				 */
+				if (SessionPool)
+				{
+					DeleteWaitEventFromSet(SessionPool, MyProcPort->sock);
+					elog(DEBUG1, "Close session %d in backend %d", MyProcPort->sock, MyProcPid);
+
+					pq_getmsgend(&input_message);
+					if (pq_is_reading_msg())
+						pq_endmsgread();
+
+					close(MyProcPort->sock);
+					MyProcPort->sock = PGINVALID_SOCKET;
+					MyProcPort = NULL;
+
+					if (CurrentSession)
+					{
+						DropSessionPreparedStatements(CurrentSession->id);
+						DeleteSession(CurrentSession);
+						CurrentSession = NULL;
+					}
+					whereToSendOutput = DestRemote;
+					goto ChooseSession;
+				}
+				elog(DEBUG1, "Terminate backend %d", MyProcPid);
 				proc_exit(0);
 
 			case 'd':			/* copy data */
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 54fa4a3..b2f43a8 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -120,7 +120,9 @@ int			maintenance_work_mem = 16384;
  * register background workers.
  */
 int			NBuffers = 1000;
+int			SessionPoolSize = 0;
 int			MaxConnections = 90;
+int			MaxSessions = 1000;
 int			max_worker_processes = 8;
 int			max_parallel_workers = 8;
 int			MaxBackends = 0;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index f9b3309..571c80f 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -65,7 +65,7 @@
 
 static HeapTuple GetDatabaseTuple(const char *dbname);
 static HeapTuple GetDatabaseTupleByOid(Oid dboid);
-static void PerformAuthentication(Port *port);
+void PerformAuthentication(Port *port);
 static void CheckMyDatabase(const char *name, bool am_superuser);
 static void InitCommunication(void);
 static void ShutdownPostgres(int code, Datum arg);
@@ -180,7 +180,7 @@ GetDatabaseTupleByOid(Oid dboid)
  *
  * returns: nothing.  Will not return at all if there's any failure.
  */
-static void
+void
 PerformAuthentication(Port *port)
 {
 	/* This should be set already, but let's make sure */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 72f6be3..02373a3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1871,6 +1871,26 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"max_sessions", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+			gettext_noop("Sets the maximum number of client session."),
+			NULL
+		},
+		&MaxSessions,
+		1000, 1, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"session_pool_size", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+			gettext_noop("Sets number of backends serving client sessions."),
+			NULL
+		},
+		&SessionPoolSize,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
 			gettext_noop("Sets the number of connection slots reserved for superusers."),
 			NULL
diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h
index ffec029..cb5f8d4 100644
--- a/src/include/commands/prepare.h
+++ b/src/include/commands/prepare.h
@@ -56,5 +56,6 @@ extern TupleDesc FetchPreparedStatementResultDesc(PreparedStatement *stmt);
 extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt);
 
 extern void DropAllPreparedStatements(void);
+extern void DropSessionPreparedStatements(char const* sessionId);
 
 #endif							/* PREPARE_H */
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 2e7725d..9169b21 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -71,6 +71,7 @@ extern int	pq_getbyte(void);
 extern int	pq_peekbyte(void);
 extern int	pq_getbyte_if_available(unsigned char *c);
 extern int	pq_putbytes(const char *s, size_t len);
+extern int  pq_available_bytes(void);
 
 /*
  * prototypes for functions in be-secure.c
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 54ee273..a9f9228 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -157,6 +157,8 @@ extern PGDLLIMPORT char *DataDir;
 extern PGDLLIMPORT int NBuffers;
 extern PGDLLIMPORT int MaxBackends;
 extern PGDLLIMPORT int MaxConnections;
+extern PGDLLIMPORT int MaxSessions;
+extern PGDLLIMPORT int SessionPoolSize;
 extern PGDLLIMPORT int max_worker_processes;
 extern int	max_parallel_workers;
 
@@ -420,6 +422,7 @@ extern void InitializeMaxBackends(void);
 extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 			 Oid useroid, char *out_dbname);
 extern void BaseInit(void);
+extern void PerformAuthentication(struct Port *port);
 
 /* in utils/init/miscinit.c */
 extern bool IgnoreSystemIndexes;
diff --git a/src/include/port.h b/src/include/port.h
index 3e528fa..c14a20d 100644
--- a/src/include/port.h
+++ b/src/include/port.h
@@ -41,6 +41,10 @@ typedef SOCKET pgsocket;
 extern bool pg_set_noblock(pgsocket sock);
 extern bool pg_set_block(pgsocket sock);
 
+/* send/receive socket descriptor */
+extern int pg_send_sock(pgsocket chan, pgsocket sock);
+extern pgsocket pg_recv_sock(pgsocket chan);
+
 /* Portable path handling for Unix/Win32 (in path.c) */
 
 extern bool has_drive_prefix(const char *filename);
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index 1877eef..c9527c9 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -62,6 +62,9 @@ extern Size ShmemBackendArraySize(void);
 extern void ShmemBackendArrayAllocation(void);
 #endif
 
+struct Port;
+extern int	ProcessStartupPacket(struct Port *port, bool SSLdone, MemoryContext memctx);
+
 /*
  * Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved
  * for buffer references in buf_internals.h.  This limitation could be lifted
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index a4bcb48..10f30d1 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -176,6 +176,8 @@ extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout,
 extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents,
 				  pgsocket sock, long timeout, uint32 wait_event_info);
 
+extern void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd);
+
 /*
  * Unix implementation uses SIGUSR1 for inter-process signaling.
  * Win32 doesn't need this.
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 63b4e48..191eeaa 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -34,6 +34,7 @@ extern CommandDest whereToSendOutput;
 extern PGDLLIMPORT const char *debug_query_string;
 extern int	max_stack_depth;
 extern int	PostAuthDelay;
+extern pgsocket SessionPoolSock;
 
 /* GUC-configurable parameters */
 

Reply via email to