On 17.01.2018 19:09, Konstantin Knizhnik wrote:
Hi hackers,

My recent experiments with pthread version of Postgres show that although pthread offers some performance advantages comparing with processes for large number of connections, them still can not eliminate need in connection pooling. Large number even of inactive connections cause significant degrade of Postgres performance.

So we need connection pooling.  Most of enterprise systems working with Postgres are using pgbouncer or similar tools.
But pgbouncer has the following drawbacks:
1. It is an extra entity which complicates system installation and administration. 2. Pgbouncer itself can be a bottleneck and point of failure. For example with enabled SSL, single threaded model of pgbouncer becomes limiting factor when a lot of clients try to simultaneously reestablish connection. This is why some companies are building hierarchy of pgbouncers. 3. Using pool_mode other than "session" makes it not possible to use prepared statements and session variables. Lack of prepared statements can itself decrease speed of simple queries up to two times.

So I thought about built-in connection pooling for Postgres. Ideally it should be integrated with pthreads, because in this case scheduling of sessions can be done more flexible and easily.
But I decided to start with patch to vanilla Postgres.

Idea is the following:
1. We start some number of normal backends (which forms backend pool for serving client sessions). 2. When number of connections exceeds number of backends, then instead of spawning new backend we choose some of existed backend and redirect connection to it. There is more or less portable way in Unix to pass socket descriptors between processes using Unix sockets: for example https://stackoverflow.com/questions/28003921/sending-file-descriptor-by-linux-socket/ (this is one of the places where pthreads Postgres will win). So a session is bounded to a backend. Backends and chosen using round-robin policy which should guarantee more or less unform distribution of sessions between backends if number of sessions is much larger than number of backends. But certainly skews in client application access patterns can violate this assumption. 3. Rescheduling is done at transaction level. So it is enough to have one entry in procarray for backend to correctly handle locks. Also transaction level pooling eliminates problem with false deadlocks (caused by lack of free executors in the pool). Also transaction level pooling minimize changes in Postgres core needed to maintain correct session context:
no need to suspend/resume transaction state, static variables, ....
4. In the main Postgres query loop in PostgresMain  we determine a moment when backend is not in transaction state and perform select of sockets of all active sessions and choose one of them. 5. When client is disconnected, then we close session but do not terminate backend. 6. To support prepared statements, we append session identifier to the name of the statement. So prepared statements of different sessions will not interleave. As far as session is bounded to the backend, it is possible to use prepared statements.

This is minimal plan for embedded session pooling I decided to implement as prototype.

Several things are not addressed now:

1. Temporary tables. In principle them can be handled in the same way as prepared statements: by concatenating session identifier to the name of the table. But it require adjusting references to this table in all queries. It is much more complicated than in case of prepared statements. 2. Session level GUCs. In principle it is not difficult to remember GUCs modified by session and save/restore them on session switch.
But it is just not implemented now.
3. Support of multiple users/databases/... It is the most critical drawback. Right now my prototype implementation assumes that all clients are connected to the same database under the same user with some connection options. And it is a challenge about which I want to know option of community. The name of the database and user are retrieved from client connection by ProcessStartupPacket function. In vanilla Posgres this function is executed by spawned backend. So I do not know which database a client is going to access before calling this function and reading data from the client's socket. Now I just choose random backend and assign connection to this backend. But it can happen that this backend is working with different database/user. Now I just return error in this case. Certainly it is possible to call ProcessStartupPacket at postmaster and then select proper backend working with specified database/user. But I afraid that postmaster can become bottleneck i this case, especially in case of using SSL. Also larger number of databases/users can significantly suffer efficiency of pooling if each backend will be responsible only for database/user combination. May be backend should be bounded only to the database and concrete role should be set on session switch. But it can require flushing backend caches whichdevalues idea of embedded session pooling. This problem can be easily solved with multithreaded Postgres where it is possible to easily reassign session to another thread.

Now results shown by my prototype. I used pgbench with scale factor 100 in readonly  mode (-S option). Precise pgbench command is "pgbench -S -c N -M prepared -T 100 -P 1 -n". Results in the table below are in kTPS:

Connections
        Vanilla Postgres
        Postgres with session pool size=10
10
        186
        181
100
        118
        224
1000
        59
        191



As you see instead of degrade of performance with increasing number of connections, Postgres with session pool shows stable performance result. Moreover, for vanilla Postgres best results at my system are obtained for 10 connections, but Postgres with session pool shows better performance for 100 connections with the same number of spawned backends.

My patch to the Postgres is attached to this mail.
To switch on session polling set session_pool_size to some non-zero value. Another GUC variable which I have added is "max_sessions" which specifies maximal number of sessions handled by backend. So total number of handled client connections is session_pool_size*max_sessions.

Certainly it is just prototype far from practical use.
In addition to the challenges mentioned above, there are also some other issues which should be considered:

1. Long living transaction in client application blocks all other sessions in the backend and so can suspend work of the Postgres. So Uber-style programming when database transaction is started with opening door of a car and finished at the end of the trip is completely not compatible with this approach. 2. Fatal errors cause disconnect not only of one client caused the problem but bunch of client sessions scheduled to this backend. 3. It is possible to use PL-APIs, such as plpython, but session level variables may not be used. 4. There may be some memory leaks caused by allocation of memory using malloc or in top memory context which is expected to be freed on backend exit. But it is not deallocated at session close, so large number of handled sessions can cause memory overflow. 5. Some applications, handling mutliple connections inside single thread and multiplexing them at statement level (rather than on transaction level) may not work correctly. It seems to be quite exotic use case. But pgbench actually behaves in this way! This is why attempt to start pgbench with multistatement transactions (-N) will fail if number of threads (-j) is smaller than number of connections (-c). 6. The approach with passing socket descriptors between processes was implemented only for Unix and tested only at Linux, although is expected to work also as MacOS and other Unix dialects. Windows is not supported now.

I will be glad to receive an feedback and suggestion concerning perspectives of embedded connection pooling.

--
Konstantin Knizhnik
Postgres Professional:http://www.postgrespro.com
The Russian Postgres Company

Attached please find new version of the patch with few fixes.
And more results at NUMA system with 144 cores and 3Tb of RAM.

Read-only pgbench (-S):


#Connections\kTPS
        Vanilla Postgres
        Session pool size 256
1k
        1300    1505
10k
        633
        1519
100k
        -       1425




Read-write contention test: access to small number of records with 1% of updates.

#Clients\TPS    Vanilla Postgres        Session pool size 256
100     557232  573319
200     520395  551670
300     511423  533773
400     468562  523091
500     442268  514056
600     401860  526704
700     363912  530317
800     325148  512238
900     301310  512844
1000    278829  554516


So, as you can see, there is no degrade of performance with increased number of 
connections in case of using session pooling.

--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index b945b15..a73d584 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -813,3 +813,30 @@ build_regtype_array(Oid *param_types, int num_params)
 	result = construct_array(tmp_ary, num_params, REGTYPEOID, 4, true, 'i');
 	return PointerGetDatum(result);
 }
+
+
+void
+DropSessionPreparedStatements(char const* sessionId)
+{
+	HASH_SEQ_STATUS seq;
+	PreparedStatement *entry;
+	size_t idLen = strlen(sessionId);
+
+	/* nothing cached */
+	if (!prepared_queries)
+		return;
+
+	/* walk over cache */
+	hash_seq_init(&seq, prepared_queries);
+	while ((entry = hash_seq_search(&seq)) != NULL)
+	{
+		if (strncmp(entry->stmt_name, sessionId, idLen) == 0 && entry->stmt_name[idLen] == '.')
+		{
+			/* Release the plancache entry */
+			DropCachedPlan(entry->plansource);
+
+			/* Now we can remove the hash table entry */
+			hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL);
+		}
+	}
+}
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..5b07a88 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -1029,6 +1029,18 @@ pq_peekbyte(void)
 }
 
 /* --------------------------------
+ *		pq_peekbyte		- peek at next byte from connection
+ *
+ *	 Same as pq_getbyte() except we don't advance the pointer.
+ * --------------------------------
+ */
+int
+pq_available_bytes(void)
+{
+	return PqRecvLength - PqRecvPointer;
+}
+
+/* --------------------------------
  *		pq_getbyte_if_available - get a single byte from connection,
  *			if available
  *
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index aba1e92..56ec998 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o send_sock.o $(TAS)
 
 ifeq ($(PORTNAME), win32)
 SUBDIRS += win32
diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c
new file mode 100644
index 0000000..7b36923
--- /dev/null
+++ b/src/backend/port/send_sock.c
@@ -0,0 +1,89 @@
+/*-------------------------------------------------------------------------
+ *
+ * send_sock.c
+ *	  Send socket descriptor to another process
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/port/send_sock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+/*
+ * Send socket descriptor "sock" to backend process through Unix socket "chan"
+ */
+int pg_send_sock(pgsocket chan, pgsocket sock)
+{
+    struct msghdr msg = { 0 };
+	struct iovec io;
+	struct cmsghdr * cmsg;
+    char buf[CMSG_SPACE(sizeof(sock))];
+    memset(buf, '\0', sizeof(buf));
+
+    /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */
+    io.iov_base = "";
+	io.iov_len = 1;
+
+    msg.msg_iov = &io;
+    msg.msg_iovlen = 1;
+    msg.msg_control = buf;
+    msg.msg_controllen = sizeof(buf);
+
+    cmsg = CMSG_FIRSTHDR(&msg);
+    cmsg->cmsg_level = SOL_SOCKET;
+    cmsg->cmsg_type = SCM_RIGHTS;
+    cmsg->cmsg_len = CMSG_LEN(sizeof(sock));
+
+    memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock));
+    msg.msg_controllen = cmsg->cmsg_len;
+
+    if (sendmsg(chan, &msg, 0) < 0)
+	{
+		return -1;
+	}
+	return 0;
+}
+
+
+/*
+ * Receive socket descriptor from postmaster process through Unix socket "chan"
+ */
+pgsocket pg_recv_sock(pgsocket chan)
+{
+    struct msghdr msg = {0};
+    char c_buffer[256];
+    char m_buffer[256];
+    struct iovec io;
+	struct cmsghdr * cmsg;
+	pgsocket sock;
+
+    io.iov_base = m_buffer;
+	io.iov_len = sizeof(m_buffer);
+    msg.msg_iov = &io;
+    msg.msg_iovlen = 1;
+
+    msg.msg_control = c_buffer;
+    msg.msg_controllen = sizeof(c_buffer);
+
+    if (recvmsg(chan, &msg, 0) < 0)
+	{
+		return -1;
+	}
+
+    cmsg = CMSG_FIRSTHDR(&msg);
+    memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock));
+
+    return sock;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index f3ddf82..710e22c 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -169,6 +169,7 @@ typedef struct bkend
 	pid_t		pid;			/* process id of backend */
 	int32		cancel_key;		/* cancel key for cancels for this backend */
 	int			child_slot;		/* PMChildSlot for this backend, if any */
+	pgsocket    session_send_sock;  /* Write end of socket pipe to this backend used to send session socket descriptor to the backend process */
 
 	/*
 	 * Flavor of backend or auxiliary process.  Note that BACKEND_TYPE_WALSND
@@ -182,6 +183,15 @@ typedef struct bkend
 } Backend;
 
 static dlist_head BackendList = DLIST_STATIC_INIT(BackendList);
+/*
+ * Pointer in backend list used to implement round-robin distribution of sessions through backends.
+ * This variable either NULL, either points to the normal backend.
+ */
+static Backend*   BackendListClockPtr;
+/*
+ * Number of active normal backends
+ */
+static int        nNormalBackends;
 
 #ifdef EXEC_BACKEND
 static Backend *ShmemBackendArray;
@@ -412,7 +422,6 @@ static void BackendRun(Port *port) pg_attribute_noreturn();
 static void ExitPostmaster(int status) pg_attribute_noreturn();
 static int	ServerLoop(void);
 static int	BackendStartup(Port *port);
-static int	ProcessStartupPacket(Port *port, bool SSLdone);
 static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options);
 static void processCancelRequest(Port *port, void *pkt);
 static int	initMasks(fd_set *rmask);
@@ -568,6 +577,22 @@ HANDLE		PostmasterHandle;
 #endif
 
 /*
+ * Move current backend pointer to the next normal backend.
+ * This function is called either when new session is started to implement round-robin policy, either when backend pointer by BackendListClockPtr is terminated
+ */
+static void AdvanceBackendListClockPtr(void)
+{
+	Backend* b = BackendListClockPtr;
+	do {
+		dlist_node* node = &b->elem;
+		node = node->next ? node->next : BackendList.head.next;
+		b = dlist_container(Backend, elem, node);
+	} while (b->bkend_type != BACKEND_TYPE_NORMAL && b != BackendListClockPtr);
+
+	BackendListClockPtr = b;
+}
+
+/*
  * Postmaster main entry point
  */
 void
@@ -1944,8 +1969,8 @@ initMasks(fd_set *rmask)
  * send anything to the client, which would typically be appropriate
  * if we detect a communications failure.)
  */
-static int
-ProcessStartupPacket(Port *port, bool SSLdone)
+int
+ProcessStartupPacket(Port *port, bool SSLdone, MemoryContext memctx)
 {
 	int32		len;
 	void	   *buf;
@@ -2043,7 +2068,7 @@ retry1:
 #endif
 		/* regular startup packet, cancel, etc packet should follow... */
 		/* but not another SSL negotiation request */
-		return ProcessStartupPacket(port, true);
+		return ProcessStartupPacket(port, true, memctx);
 	}
 
 	/* Could add additional special packet types here */
@@ -2073,7 +2098,7 @@ retry1:
 	 * not worry about leaking this storage on failure, since we aren't in the
 	 * postmaster process anymore.
 	 */
-	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	oldcontext = MemoryContextSwitchTo(memctx);
 
 	if (PG_PROTOCOL_MAJOR(proto) >= 3)
 	{
@@ -2449,7 +2474,7 @@ ConnCreate(int serverFd)
 		ConnFree(port);
 		return NULL;
 	}
-
+	port->session_recv_sock = PGINVALID_SOCKET;
 	/*
 	 * Allocate GSSAPI specific state struct
 	 */
@@ -3236,6 +3261,24 @@ CleanupBackgroundWorker(int pid,
 }
 
 /*
+ * Unlink backend from backend's list and free memory
+ */
+static void UnlinkBackend(Backend* bp)
+{
+	if (bp->bkend_type == BACKEND_TYPE_NORMAL)
+	{
+		if (bp == BackendListClockPtr)
+			AdvanceBackendListClockPtr();
+		if (bp->session_send_sock != PGINVALID_SOCKET)
+			close(bp->session_send_sock);
+		elog(DEBUG2, "Cleanup backend %d", bp->pid);
+		nNormalBackends -= 1;
+	}
+	dlist_delete(&bp->elem);
+	free(bp);
+}
+
+/*
  * CleanupBackend -- cleanup after terminated backend.
  *
  * Remove all local state associated with backend.
@@ -3312,8 +3355,7 @@ CleanupBackend(int pid,
 				 */
 				BackgroundWorkerStopNotifications(bp->pid);
 			}
-			dlist_delete(iter.cur);
-			free(bp);
+			UnlinkBackend(bp);
 			break;
 		}
 	}
@@ -3415,8 +3457,7 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
 				ShmemBackendArrayRemove(bp);
 #endif
 			}
-			dlist_delete(iter.cur);
-			free(bp);
+			UnlinkBackend(bp);
 			/* Keep looping so we can signal remaining backends */
 		}
 		else
@@ -4017,6 +4058,19 @@ BackendStartup(Port *port)
 {
 	Backend    *bn;				/* for backend cleanup */
 	pid_t		pid;
+	int         session_pipe[2];
+
+	if (SessionPoolSize != 0 && nNormalBackends >= SessionPoolSize)
+	{
+		/* Instead of spawning new backend open new session at one of the existed backends. */
+		Assert(BackendListClockPtr && BackendListClockPtr->session_send_sock != PGINVALID_SOCKET);
+		elog(DEBUG2, "Start new session for socket %d at backend %d total %d", port->sock, BackendListClockPtr->pid, nNormalBackends);
+		if (pg_send_sock(BackendListClockPtr->session_send_sock, port->sock) < 0)
+			elog(FATAL, "Failed to send session socket: %m");
+		AdvanceBackendListClockPtr(); /* round-robin */
+		return STATUS_OK;
+	}
+
 
 	/*
 	 * Create backend data structure.  Better before the fork() so we can
@@ -4030,7 +4084,6 @@ BackendStartup(Port *port)
 				 errmsg("out of memory")));
 		return STATUS_ERROR;
 	}
-
 	/*
 	 * Compute the cancel key that will be assigned to this backend. The
 	 * backend will have its own copy in the forked-off process' value of
@@ -4063,12 +4116,23 @@ BackendStartup(Port *port)
 	/* Hasn't asked to be notified about any bgworkers yet */
 	bn->bgworker_notify = false;
 
+	if (SessionPoolSize != 0)
+		if (socketpair(AF_UNIX, SOCK_DGRAM, 0, session_pipe) < 0)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg_internal("could not create socket pair for launching sessions: %m")));
+
 #ifdef EXEC_BACKEND
 	pid = backend_forkexec(port);
 #else							/* !EXEC_BACKEND */
 	pid = fork_process();
 	if (pid == 0)				/* child */
 	{
+		if (SessionPoolSize != 0)
+		{
+			port->session_recv_sock = session_pipe[0];
+			close(session_pipe[1]);
+		}
 		free(bn);
 
 		/* Detangle from postmaster */
@@ -4110,9 +4174,19 @@ BackendStartup(Port *port)
 	 * of backends.
 	 */
 	bn->pid = pid;
+	if (SessionPoolSize != 0)
+	{
+		bn->session_send_sock = session_pipe[1];
+		close(session_pipe[0]);
+	}
+	else
+		bn->session_send_sock = PGINVALID_SOCKET;
 	bn->bkend_type = BACKEND_TYPE_NORMAL;	/* Can change later to WALSND */
 	dlist_push_head(&BackendList, &bn->elem);
-
+	if (BackendListClockPtr == NULL)
+		BackendListClockPtr = bn;
+	nNormalBackends += 1;
+	elog(DEBUG2, "Start backend %d total %d", pid, nNormalBackends);
 #ifdef EXEC_BACKEND
 	if (!bn->dead_end)
 		ShmemBackendArrayAdd(bn);
@@ -4299,7 +4373,7 @@ BackendInitialize(Port *port)
 	 * Receive the startup packet (which might turn out to be a cancel request
 	 * packet).
 	 */
-	status = ProcessStartupPacket(port, false);
+	status = ProcessStartupPacket(port, false, TopMemoryContext);
 
 	/*
 	 * Stop here if it was bad or a cancel packet.  ProcessStartupPacket
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index e6706f7..9c42fab 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -76,6 +76,7 @@ struct WaitEventSet
 {
 	int			nevents;		/* number of registered events */
 	int			nevents_space;	/* maximum number of events in this set */
+	int         free_events;    /* L1-list of free events linked by "pos" and terminated by -1*/
 
 	/*
 	 * Array, of nevents_space length, storing the definition of events this
@@ -129,7 +130,7 @@ static void drainSelfPipe(void);
 #if defined(WAIT_USE_EPOLL)
 static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
 #elif defined(WAIT_USE_POLL)
-static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove);
 #elif defined(WAIT_USE_WIN32)
 static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event);
 #endif
@@ -562,6 +563,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 
 	set->latch = NULL;
 	set->nevents_space = nevents;
+	set->free_events = -1;
 
 #if defined(WAIT_USE_EPOLL)
 #ifdef EPOLL_CLOEXEC
@@ -667,6 +669,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 				  void *user_data)
 {
 	WaitEvent  *event;
+	int free_event;
 
 	/* not enough space */
 	Assert(set->nevents < set->nevents_space);
@@ -690,8 +693,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 	if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
 		elog(ERROR, "cannot wait on socket event without a socket");
 
-	event = &set->events[set->nevents];
-	event->pos = set->nevents++;
+	free_event = set->free_events;
+	if (free_event >= 0)
+	{
+		event = &set->events[free_event];
+		set->free_events = event->pos;
+		event->pos = free_event;
+	}
+	else
+	{
+		event = &set->events[set->nevents];
+		event->pos = set->nevents;
+	}
+	set->nevents += 1;
 	event->fd = fd;
 	event->events = events;
 	event->user_data = user_data;
@@ -718,7 +732,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 #if defined(WAIT_USE_EPOLL)
 	WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
 #elif defined(WAIT_USE_POLL)
-	WaitEventAdjustPoll(set, event);
+	WaitEventAdjustPoll(set, event, false);
 #elif defined(WAIT_USE_WIN32)
 	WaitEventAdjustWin32(set, event);
 #endif
@@ -727,6 +741,27 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 }
 
 /*
+ * Remove event with specified socket descriptor
+ */
+void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd)
+{
+	int i, n = set->nevents;
+	for (i = 0; i < n; i++)
+	{
+		WaitEvent  *event = &set->events[i];
+		if (event->fd == fd)
+		{
+#if defined(WAIT_USE_EPOLL)
+			WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_POLL)
+			WaitEventAdjustPoll(set, event, true);
+#endif
+			break;
+		}
+	}
+}
+
+/*
  * Change the event mask and, in the WL_LATCH_SET case, the latch associated
  * with the WaitEvent.
  *
@@ -774,7 +809,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
 #if defined(WAIT_USE_EPOLL)
 	WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
 #elif defined(WAIT_USE_POLL)
-	WaitEventAdjustPoll(set, event);
+	WaitEventAdjustPoll(set, event, false);
 #elif defined(WAIT_USE_WIN32)
 	WaitEventAdjustWin32(set, event);
 #endif
@@ -827,14 +862,33 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
 		ereport(ERROR,
 				(errcode_for_socket_access(),
 				 errmsg("epoll_ctl() failed: %m")));
+
+	if (action == EPOLL_CTL_DEL)
+	{
+		int pos = event->pos;
+		event->fd = PGINVALID_SOCKET;
+		set->nevents -= 1;
+		event->pos = set->free_events;
+		set->free_events = pos;
+	}
 }
 #endif
 
 #if defined(WAIT_USE_POLL)
 static void
-WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove)
 {
-	struct pollfd *pollfd = &set->pollfds[event->pos];
+	int pos = event->pos;
+	struct pollfd *pollfd = &set->pollfds[pos];
+
+	if (remove)
+	{
+		set->nevents -= 1;
+		*pollfd = set->pollfds[set->nevents];
+		set->events[pos] = set->events[set->nevents];
+		event->pos = pos;
+		return;
+	}
 
 	pollfd->revents = 0;
 	pollfd->fd = event->fd;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index ddc3ec8..ffc1494 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -75,6 +75,7 @@
 #include "utils/snapmgr.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
+#include "utils/builtins.h"
 #include "mb/pg_wchar.h"
 
 
@@ -169,6 +170,10 @@ static ProcSignalReason RecoveryConflictReason;
 static MemoryContext row_description_context = NULL;
 static StringInfoData row_description_buf;
 
+static WaitEventSet* SessionPool;
+static int64         SessionCount;
+static char*         CurrentSessionId;
+
 /* ----------------------------------------------------------------
  *		decls for routines only used in this file
  * ----------------------------------------------------------------
@@ -194,6 +199,15 @@ static void log_disconnections(int code, Datum arg);
 static void enable_statement_timeout(void);
 static void disable_statement_timeout(void);
 
+/*
+ * Generate session ID unique within this backend
+ */
+static char* CreateSessionId(void)
+{
+	char buf[64];
+	pg_lltoa(++SessionCount, buf);
+	return strdup(buf);
+}
 
 /* ----------------------------------------------------------------
  *		routines to obtain user input
@@ -473,7 +487,7 @@ SocketBackend(StringInfo inBuf)
 			 * fatal because we have probably lost message boundary sync, and
 			 * there's no good way to recover.
 			 */
-			ereport(FATAL,
+		    ereport(FATAL,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
 					 errmsg("invalid frontend message type %d", qtype)));
 			break;
@@ -1232,6 +1246,12 @@ exec_parse_message(const char *query_string,	/* string to execute */
 	bool		save_log_statement_stats = log_statement_stats;
 	char		msec_str[32];
 
+	if (CurrentSessionId && stmt_name[0] != '\0')
+	{
+		/* Make names of prepared statements unique for session in case of using internal session pool */
+		stmt_name = psprintf("%s.%s", CurrentSessionId, stmt_name);
+	}
+
 	/*
 	 * Report query to various monitoring facilities.
 	 */
@@ -1503,6 +1523,12 @@ exec_bind_message(StringInfo input_message)
 	portal_name = pq_getmsgstring(input_message);
 	stmt_name = pq_getmsgstring(input_message);
 
+	if (CurrentSessionId && stmt_name[0] != '\0')
+	{
+		/* Make names of prepared statements unique for session in case of using internal session pool */
+		stmt_name = psprintf("%s.%s", CurrentSessionId, stmt_name);
+	}
+
 	ereport(DEBUG2,
 			(errmsg("bind %s to %s",
 					*portal_name ? portal_name : "<unnamed>",
@@ -2325,6 +2351,12 @@ exec_describe_statement_message(const char *stmt_name)
 	CachedPlanSource *psrc;
 	int			i;
 
+	if (CurrentSessionId && stmt_name[0] != '\0')
+	{
+		/* Make names of prepared statements unique for session in case of using internal session pool */
+		stmt_name = psprintf("%s.%s", CurrentSessionId, stmt_name);
+	}
+
 	/*
 	 * Start up a transaction command. (Note that this will normally change
 	 * current memory context.) Nothing happens if we are already in one.
@@ -3603,7 +3635,6 @@ process_postgres_switches(int argc, char *argv[], GucContext ctx,
 #endif
 }
 
-
 /* ----------------------------------------------------------------
  * PostgresMain
  *	   postgres main loop -- all backends, interactive or otherwise start here
@@ -3654,6 +3685,10 @@ PostgresMain(int argc, char *argv[],
 							progname)));
 	}
 
+	/* Assign session ID if use session pooling */
+	if (SessionPoolSize != 0)
+		CurrentSessionId = CreateSessionId();
+
 	/* Acquire configuration parameters, unless inherited from postmaster */
 	if (!IsUnderPostmaster)
 	{
@@ -3783,7 +3818,7 @@ PostgresMain(int argc, char *argv[],
 	 * ... else we'd need to copy the Port data first.  Also, subsidiary data
 	 * such as the username isn't lost either; see ProcessStartupPacket().
 	 */
-	if (PostmasterContext)
+	if (PostmasterContext && SessionPoolSize == 0)
 	{
 		MemoryContextDelete(PostmasterContext);
 		PostmasterContext = NULL;
@@ -4069,6 +4104,102 @@ PostgresMain(int argc, char *argv[],
 
 			ReadyForQuery(whereToSendOutput);
 			send_ready_for_query = false;
+
+			if (MyProcPort && MyProcPort->session_recv_sock != PGINVALID_SOCKET && !IsTransactionState() && pq_available_bytes() == 0)
+			{
+				WaitEvent ready_client;
+				whereToSendOutput = DestRemote;
+				if (SessionPool == NULL)
+				{
+					SessionPool = CreateWaitEventSet(TopMemoryContext, MaxSessions);
+					AddWaitEventToSet(SessionPool, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
+					AddWaitEventToSet(SessionPool, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+					AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, MyProcPort->session_recv_sock, NULL, NULL);
+					AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, MyProcPort->sock, NULL, NULL);
+				}
+			  Retry:
+				DoingCommandRead = true;
+				if (WaitEventSetWait(SessionPool, -1, &ready_client, 1, PG_WAIT_CLIENT) != 1)
+				{
+					/* TODO: do some error recovery here */
+					elog(FATAL, "Failed to poll client sessions");
+				}
+				CHECK_FOR_INTERRUPTS();
+				DoingCommandRead = false;
+
+				if (ready_client.events & WL_POSTMASTER_DEATH)
+					ereport(FATAL,
+							(errcode(ERRCODE_ADMIN_SHUTDOWN),
+							 errmsg("terminating connection due to unexpected postmaster exit")));
+
+				if (ready_client.events & WL_LATCH_SET)
+				{
+					ResetLatch(MyLatch);
+					ProcessClientReadInterrupt(true);
+					goto Retry;
+				}
+
+				if (ready_client.fd == MyProcPort->session_recv_sock)
+				{
+					int		 status;
+					Port     port;
+					Port*    myPort;
+					StringInfoData buf;
+					pgsocket sock = pg_recv_sock(MyProcPort->session_recv_sock);
+					if (sock < 0)
+						elog(FATAL, "Failed to receive session socket: %m");
+
+					/*
+					 * Receive the startup packet (which might turn out to be a cancel request
+					 * packet).
+					 */
+					port.sock = sock;
+					myPort = MyProcPort;
+					MyProcPort = &port;
+					status = ProcessStartupPacket(&port, false, MessageContext);
+					MyProcPort = myPort;
+					if (strcmp(port.database_name, MyProcPort->database_name) ||
+						strcmp(port.user_name, MyProcPort->user_name))
+					{
+						elog(FATAL, "Failed to open session (dbname=%s user=%s) in backend %d (dbname=%s user=%s)",
+							 port.database_name, port.user_name,
+							 MyProcPid, MyProcPort->database_name, MyProcPort->user_name);
+					}
+					else if (status == STATUS_OK)
+					{
+						elog(DEBUG2, "Start new session %d in backend %d for database %s user %s",
+							 sock, MyProcPid, port.database_name, port.user_name);
+						CurrentSessionId = CreateSessionId();
+						AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, sock, NULL, CurrentSessionId);
+						MyProcPort->sock = sock;
+						send_ready_for_query = true;
+
+						SetCurrentStatementStartTimestamp();
+						StartTransactionCommand();
+						PerformAuthentication(MyProcPort);
+						CommitTransactionCommand();
+
+						/*
+						 * Send this backend's cancellation info to the frontend.
+						 */
+						pq_beginmessage(&buf, 'K');
+						pq_sendint32(&buf, (int32) MyProcPid);
+						pq_sendint32(&buf, (int32) MyCancelKey);
+						pq_endmessage(&buf);
+						/* Need not flush since ReadyForQuery will do it. */
+						continue;
+					}
+					elog(LOG, "Session startup failed");
+					close(sock);
+					goto Retry;
+				}
+				else
+				{
+					elog(DEBUG2, "Switch to session %d in backend %d", ready_client.fd, MyProcPid);
+					MyProcPort->sock = ready_client.fd;
+					CurrentSessionId = (char*)ready_client.user_data;
+				}
+			}
 		}
 
 		/*
@@ -4350,13 +4481,36 @@ PostgresMain(int argc, char *argv[],
 				 * it will fail to be called during other backend-shutdown
 				 * scenarios.
 				 */
+				if (SessionPool)
+				{
+					DeleteWaitEventFromSet(SessionPool, MyProcPort->sock);
+					elog(DEBUG1, "Close session %d in backend %d", MyProcPort->sock, MyProcPid);
+
+					pq_getmsgend(&input_message);
+					if (pq_is_reading_msg())
+						pq_endmsgread();
+
+					if (CurrentSessionId)
+					{
+						DropSessionPreparedStatements(CurrentSessionId);
+						free(CurrentSessionId);
+						CurrentSessionId = NULL;
+					}
+
+					close(MyProcPort->sock);
+					MyProcPort->sock = PGINVALID_SOCKET;
+
+					send_ready_for_query = true;
+					break;
+				}
+				elog(DEBUG1, "Terminate backend %d", MyProcPid);
 				proc_exit(0);
 
 			case 'd':			/* copy data */
 			case 'c':			/* copy done */
 			case 'f':			/* copy fail */
 
-				/*
+				/*!
 				 * Accept but ignore these messages, per protocol spec; we
 				 * probably got here because a COPY failed, and the frontend
 				 * is still sending data.
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 54fa4a3..b2f43a8 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -120,7 +120,9 @@ int			maintenance_work_mem = 16384;
  * register background workers.
  */
 int			NBuffers = 1000;
+int			SessionPoolSize = 0;
 int			MaxConnections = 90;
+int			MaxSessions = 1000;
 int			max_worker_processes = 8;
 int			max_parallel_workers = 8;
 int			MaxBackends = 0;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index f9b3309..571c80f 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -65,7 +65,7 @@
 
 static HeapTuple GetDatabaseTuple(const char *dbname);
 static HeapTuple GetDatabaseTupleByOid(Oid dboid);
-static void PerformAuthentication(Port *port);
+void PerformAuthentication(Port *port);
 static void CheckMyDatabase(const char *name, bool am_superuser);
 static void InitCommunication(void);
 static void ShutdownPostgres(int code, Datum arg);
@@ -180,7 +180,7 @@ GetDatabaseTupleByOid(Oid dboid)
  *
  * returns: nothing.  Will not return at all if there's any failure.
  */
-static void
+void
 PerformAuthentication(Port *port)
 {
 	/* This should be set already, but let's make sure */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 72f6be3..02373a3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1871,6 +1871,26 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"max_sessions", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+			gettext_noop("Sets the maximum number of client session."),
+			NULL
+		},
+		&MaxSessions,
+		1000, 1, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"session_pool_size", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+			gettext_noop("Sets number of backends serving client sessions."),
+			NULL
+		},
+		&SessionPoolSize,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
 			gettext_noop("Sets the number of connection slots reserved for superusers."),
 			NULL
diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h
index ffec029..cb5f8d4 100644
--- a/src/include/commands/prepare.h
+++ b/src/include/commands/prepare.h
@@ -56,5 +56,6 @@ extern TupleDesc FetchPreparedStatementResultDesc(PreparedStatement *stmt);
 extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt);
 
 extern void DropAllPreparedStatements(void);
+extern void DropSessionPreparedStatements(char const* sessionId);
 
 #endif							/* PREPARE_H */
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 49cb263..f31f89b 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -127,7 +127,8 @@ typedef struct Port
 	int			remote_hostname_errcode;	/* see above */
 	char	   *remote_port;	/* text rep of remote port */
 	CAC_state	canAcceptConnections;	/* postmaster connection status */
-
+	pgsocket    session_recv_sock;   /* socket for receiving descriptor of new session sockets */
+	
 	/*
 	 * Information that needs to be saved from the startup packet and passed
 	 * into backend execution.  "char *" fields are NULL if not set.
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 2e7725d..9169b21 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -71,6 +71,7 @@ extern int	pq_getbyte(void);
 extern int	pq_peekbyte(void);
 extern int	pq_getbyte_if_available(unsigned char *c);
 extern int	pq_putbytes(const char *s, size_t len);
+extern int  pq_available_bytes(void);
 
 /*
  * prototypes for functions in be-secure.c
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 54ee273..a9f9228 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -157,6 +157,8 @@ extern PGDLLIMPORT char *DataDir;
 extern PGDLLIMPORT int NBuffers;
 extern PGDLLIMPORT int MaxBackends;
 extern PGDLLIMPORT int MaxConnections;
+extern PGDLLIMPORT int MaxSessions;
+extern PGDLLIMPORT int SessionPoolSize;
 extern PGDLLIMPORT int max_worker_processes;
 extern int	max_parallel_workers;
 
@@ -420,6 +422,7 @@ extern void InitializeMaxBackends(void);
 extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 			 Oid useroid, char *out_dbname);
 extern void BaseInit(void);
+extern void PerformAuthentication(struct Port *port);
 
 /* in utils/init/miscinit.c */
 extern bool IgnoreSystemIndexes;
diff --git a/src/include/port.h b/src/include/port.h
index 3e528fa..c14a20d 100644
--- a/src/include/port.h
+++ b/src/include/port.h
@@ -41,6 +41,10 @@ typedef SOCKET pgsocket;
 extern bool pg_set_noblock(pgsocket sock);
 extern bool pg_set_block(pgsocket sock);
 
+/* send/receive socket descriptor */
+extern int pg_send_sock(pgsocket chan, pgsocket sock);
+extern pgsocket pg_recv_sock(pgsocket chan);
+
 /* Portable path handling for Unix/Win32 (in path.c) */
 
 extern bool has_drive_prefix(const char *filename);
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index 1877eef..c9527c9 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -62,6 +62,9 @@ extern Size ShmemBackendArraySize(void);
 extern void ShmemBackendArrayAllocation(void);
 #endif
 
+struct Port;
+extern int	ProcessStartupPacket(struct Port *port, bool SSLdone, MemoryContext memctx);
+
 /*
  * Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved
  * for buffer references in buf_internals.h.  This limitation could be lifted
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index a4bcb48..10f30d1 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -176,6 +176,8 @@ extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout,
 extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents,
 				  pgsocket sock, long timeout, uint32 wait_event_info);
 
+extern void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd);
+
 /*
  * Unix implementation uses SIGUSR1 for inter-process signaling.
  * Win32 doesn't need this.

Reply via email to