Attached please find new version of built-in connection pooling
supporting temporary tables and session GUCs.
Also Win32 support was added.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 93c4bbf..dfc072c 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -194,6 +194,7 @@ char *namespace_search_path = NULL;
/* Local functions */
static void recomputeNamespacePath(void);
static void InitTempTableNamespace(void);
+static Oid GetTempTableNamespace(void);
static void RemoveTempRelations(Oid tempNamespaceId);
static void RemoveTempRelationsCallback(int code, Datum arg);
static void NamespaceCallback(Datum arg, int cacheid, uint32 hashvalue);
@@ -441,9 +442,7 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation)
if (strcmp(newRelation->schemaname, "pg_temp") == 0)
{
/* Initialize temp namespace if first time through */
- if (!OidIsValid(myTempNamespace))
- InitTempTableNamespace();
- return myTempNamespace;
+ return GetTempTableNamespace();
}
/* use exact schema given */
namespaceId = get_namespace_oid(newRelation->schemaname, false);
@@ -452,9 +451,7 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation)
else if (newRelation->relpersistence == RELPERSISTENCE_TEMP)
{
/* Initialize temp namespace if first time through */
- if (!OidIsValid(myTempNamespace))
- InitTempTableNamespace();
- return myTempNamespace;
+ return GetTempTableNamespace();
}
else
{
@@ -463,8 +460,7 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation)
if (activeTempCreationPending)
{
/* Need to initialize temp namespace */
- InitTempTableNamespace();
- return myTempNamespace;
+ return GetTempTableNamespace();
}
namespaceId = activeCreationNamespace;
if (!OidIsValid(namespaceId))
@@ -2902,9 +2898,7 @@ LookupCreationNamespace(const char *nspname)
if (strcmp(nspname, "pg_temp") == 0)
{
/* Initialize temp namespace if first time through */
- if (!OidIsValid(myTempNamespace))
- InitTempTableNamespace();
- return myTempNamespace;
+ return GetTempTableNamespace();
}
namespaceId = get_namespace_oid(nspname, false);
@@ -2967,9 +2961,7 @@ QualifiedNameGetCreationNamespace(List *names, char **objname_p)
if (strcmp(schemaname, "pg_temp") == 0)
{
/* Initialize temp namespace if first time through */
- if (!OidIsValid(myTempNamespace))
- InitTempTableNamespace();
- return myTempNamespace;
+ return GetTempTableNamespace();
}
/* use exact schema given */
namespaceId = get_namespace_oid(schemaname, false);
@@ -2982,8 +2974,7 @@ QualifiedNameGetCreationNamespace(List *names, char **objname_p)
if (activeTempCreationPending)
{
/* Need to initialize temp namespace */
- InitTempTableNamespace();
- return myTempNamespace;
+ return GetTempTableNamespace();
}
namespaceId = activeCreationNamespace;
if (!OidIsValid(namespaceId))
@@ -3250,8 +3241,11 @@ void
SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId)
{
/* Worker should not have created its own namespaces ... */
- Assert(myTempNamespace == InvalidOid);
- Assert(myTempToastNamespace == InvalidOid);
+ if (!ActiveSession)
+ {
+ Assert(myTempNamespace == InvalidOid);
+ Assert(myTempToastNamespace == InvalidOid);
+ }
Assert(myTempNamespaceSubID == InvalidSubTransactionId);
/* Assign same namespace OIDs that leader has */
@@ -3771,6 +3765,22 @@ recomputeNamespacePath(void)
list_free(oidlist);
}
+static Oid
+GetTempTableNamespace(void)
+{
+ if (ActiveSession)
+ {
+ if (!OidIsValid(ActiveSession->tempNamespace))
+ InitTempTableNamespace();
+ }
+ else
+ {
+ if (!OidIsValid(myTempNamespace))
+ InitTempTableNamespace();
+ }
+ return myTempNamespace;
+}
+
/*
* InitTempTableNamespace
* Initialize temp table namespace on first use in a particular backend
@@ -3782,8 +3792,6 @@ InitTempTableNamespace(void)
Oid namespaceId;
Oid toastspaceId;
- Assert(!OidIsValid(myTempNamespace));
-
/*
* First, do permission check to see if we are authorized to make temp
* tables. We use a nonstandard error message here since "databasename:
@@ -3822,7 +3830,10 @@ InitTempTableNamespace(void)
(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
errmsg("cannot create temporary tables during a parallel operation")));
- snprintf(namespaceName, sizeof(namespaceName), "pg_temp_%d", MyBackendId);
+ if (ActiveSession)
+ snprintf(namespaceName, sizeof(namespaceName), "pg_temp_%d_%s", MyBackendId, ActiveSession->id);
+ else
+ snprintf(namespaceName, sizeof(namespaceName), "pg_temp_%d", MyBackendId);
namespaceId = get_namespace_oid(namespaceName, true);
if (!OidIsValid(namespaceId))
@@ -3854,8 +3865,10 @@ InitTempTableNamespace(void)
* it. (We assume there is no need to clean it out if it does exist, since
* dropping a parent table should make its toast table go away.)
*/
- snprintf(namespaceName, sizeof(namespaceName), "pg_toast_temp_%d",
- MyBackendId);
+ if (ActiveSession)
+ snprintf(namespaceName, sizeof(namespaceName), "pg_toast_temp_%d_%s", MyBackendId, ActiveSession->id);
+ else
+ snprintf(namespaceName, sizeof(namespaceName), "pg_toast_temp_%d", MyBackendId);
toastspaceId = get_namespace_oid(namespaceName, true);
if (!OidIsValid(toastspaceId))
@@ -3873,7 +3886,11 @@ InitTempTableNamespace(void)
*/
myTempNamespace = namespaceId;
myTempToastNamespace = toastspaceId;
-
+ if (ActiveSession)
+ {
+ ActiveSession->tempNamespace = namespaceId;
+ ActiveSession->tempToastNamespace = toastspaceId;
+ }
/* It should not be done already. */
AssertState(myTempNamespaceSubID == InvalidSubTransactionId);
myTempNamespaceSubID = GetCurrentSubTransactionId();
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index cff49ba..b728ab1 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -25,6 +25,7 @@
#include "access/xloginsert.h"
#include "access/xlogutils.h"
#include "catalog/catalog.h"
+#include "catalog/namespace.h"
#include "catalog/storage.h"
#include "catalog/storage_xlog.h"
#include "storage/freespace.h"
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index b945b15..8e8a737 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -813,3 +813,32 @@ build_regtype_array(Oid *param_types, int num_params)
result = construct_array(tmp_ary, num_params, REGTYPEOID, 4, true, 'i');
return PointerGetDatum(result);
}
+
+/*
+ * Drop all statements prepared in the specified session.
+ */
+void
+DropSessionPreparedStatements(char const* sessionId)
+{
+ HASH_SEQ_STATUS seq;
+ PreparedStatement *entry;
+ size_t idLen = strlen(sessionId);
+
+ /* nothing cached */
+ if (!prepared_queries)
+ return;
+
+ /* walk over cache */
+ hash_seq_init(&seq, prepared_queries);
+ while ((entry = hash_seq_search(&seq)) != NULL)
+ {
+ if (strncmp(entry->stmt_name, sessionId, idLen) == 0 && entry->stmt_name[idLen] == '.')
+ {
+ /* Release the plancache entry */
+ DropCachedPlan(entry->plansource);
+
+ /* Now we can remove the hash table entry */
+ hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL);
+ }
+ }
+}
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..7f40edb 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -1029,6 +1029,17 @@ pq_peekbyte(void)
}
/* --------------------------------
+ * pq_available_bytes - get number of buffered bytes available for reading.
+ *
+ * --------------------------------
+ */
+int
+pq_available_bytes(void)
+{
+ return PqRecvLength - PqRecvPointer;
+}
+
+/* --------------------------------
* pq_getbyte_if_available - get a single byte from connection,
* if available
*
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index aba1e92..56ec998 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o send_sock.o $(TAS)
ifeq ($(PORTNAME), win32)
SUBDIRS += win32
diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c
new file mode 100644
index 0000000..fa4b4a9
--- /dev/null
+++ b/src/backend/port/send_sock.c
@@ -0,0 +1,143 @@
+/*-------------------------------------------------------------------------
+ *
+ * send_sock.c
+ * Send socket descriptor to another process
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/port/send_sock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+/*
+ * Send socket descriptor "sock" to backend process through Unix socket "chan"
+ */
+int pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid)
+{
+#ifdef WIN32
+ InheritableSocket dst;
+ size_t rc;
+ dst.origsocket = sock;
+ if (WSADuplicateSocket(sock, pid, &dst.wsainfo) != 0)
+ {
+ ereport(FATAL,
+ (errmsg("could not duplicate socket %d for use in backend: error code %d",
+ (int)sock, WSAGetLastError())));
+ return -1;
+ }
+ rc = send(sock, &dst, sizeof(dst), 0);
+ if (rc != sizeof(dst))
+ {
+ ereport(FATAL,
+ (errmsg("Failed to send inheritable socket: rc=%d, error code %d",
+ (int)rc, WSAGetLastError())));
+ return -1;
+ }
+ return 0;
+#else
+ struct msghdr msg = { 0 };
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ char buf[CMSG_SPACE(sizeof(sock))];
+ memset(buf, '\0', sizeof(buf));
+
+ /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */
+ io.iov_base = "";
+ io.iov_len = 1;
+
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(sock));
+
+ memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock));
+ msg.msg_controllen = cmsg->cmsg_len;
+
+ if (sendmsg(chan, &msg, 0) < 0)
+ {
+ return -1;
+ }
+ return 0;
+#endif
+}
+
+
+/*
+ * Receive socket descriptor from postmaster process through Unix socket "chan"
+ */
+pgsocket pg_recv_sock(pgsocket chan)
+{
+#ifdef WIN32
+ InheritableSocket src;
+ SOCKET s;
+ size_t rc = recv(chan, &src, sizeof(src), 0);
+ if (rc != sizeof(src))
+ {
+ ereport(FATAL,
+ (errmsg("Failed to receive inheritable socket: rc=%d, error code %d",
+ (int)rc, WSAGetLastError())));
+ }
+ s = WSASocket(FROM_PROTOCOL_INFO,
+ FROM_PROTOCOL_INFO,
+ FROM_PROTOCOL_INFO,
+ &src.wsainfo,
+ 0,
+ 0);
+ if (s == INVALID_SOCKET)
+ {
+ ereport(FATAL,
+ (errmsg("could not create inherited socket: error code %d\n",
+ WSAGetLastError())));
+ }
+
+ /*
+ * To make sure we don't get two references to the same socket, close
+ * the original one. (This would happen when inheritance actually
+ * works..
+ */
+ closesocket(src.origsocket);
+ return s;
+#else
+ struct msghdr msg = {0};
+ char c_buffer[256];
+ char m_buffer[256];
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ pgsocket sock;
+
+ io.iov_base = m_buffer;
+ io.iov_len = sizeof(m_buffer);
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+
+ msg.msg_control = c_buffer;
+ msg.msg_controllen = sizeof(c_buffer);
+
+ if (recvmsg(chan, &msg, 0) < 0)
+ {
+ return -1;
+ }
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock));
+
+ return sock;
+#endif
+}
diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c
index f4356fe..7fd901f 100644
--- a/src/backend/port/win32/socket.c
+++ b/src/backend/port/win32/socket.c
@@ -726,3 +726,65 @@ pgwin32_socket_strerror(int err)
}
return wserrbuf;
}
+
+int pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2])
+{
+ union {
+ struct sockaddr_in inaddr;
+ struct sockaddr addr;
+ } a;
+ SOCKET listener;
+ int e;
+ socklen_t addrlen = sizeof(a.inaddr);
+ DWORD flags = 0;
+ int reuse = 1;
+
+ socks[0] = socks[1] = -1;
+
+ listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (listener == -1)
+ return SOCKET_ERROR;
+
+ memset(&a, 0, sizeof(a));
+ a.inaddr.sin_family = AF_INET;
+ a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ a.inaddr.sin_port = 0;
+
+ for (;;) {
+ if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,
+ (char*) &reuse, (socklen_t) sizeof(reuse)) == -1)
+ break;
+ if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
+ break;
+
+ memset(&a, 0, sizeof(a));
+ if (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR)
+ break;
+ a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ a.inaddr.sin_family = AF_INET;
+
+ if (listen(listener, 1) == SOCKET_ERROR)
+ break;
+
+ socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, flags);
+ if (socks[0] == -1)
+ break;
+ if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
+ break;
+
+ socks[1] = accept(listener, NULL, NULL);
+ if (socks[1] == -1)
+ break;
+
+ closesocket(listener);
+ return 0;
+ }
+
+ e = WSAGetLastError();
+ closesocket(listener);
+ closesocket(socks[0]);
+ closesocket(socks[1]);
+ WSASetLastError(e);
+ socks[0] = socks[1] = -1;
+ return SOCKET_ERROR;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index f3ddf82..473a1d4 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -169,6 +169,7 @@ typedef struct bkend
pid_t pid; /* process id of backend */
int32 cancel_key; /* cancel key for cancels for this backend */
int child_slot; /* PMChildSlot for this backend, if any */
+ pgsocket session_send_sock; /* Write end of socket pipe to this backend used to send session socket descriptor to the backend process */
/*
* Flavor of backend or auxiliary process. Note that BACKEND_TYPE_WALSND
@@ -182,6 +183,15 @@ typedef struct bkend
} Backend;
static dlist_head BackendList = DLIST_STATIC_INIT(BackendList);
+/*
+ * Pointer in backend list used to implement round-robin distribution of sessions through backends.
+ * This variable either NULL, either points to the normal backend.
+ */
+static Backend* BackendListClockPtr;
+/*
+ * Number of active normal backends
+ */
+static int nNormalBackends;
#ifdef EXEC_BACKEND
static Backend *ShmemBackendArray;
@@ -412,7 +422,6 @@ static void BackendRun(Port *port) pg_attribute_noreturn();
static void ExitPostmaster(int status) pg_attribute_noreturn();
static int ServerLoop(void);
static int BackendStartup(Port *port);
-static int ProcessStartupPacket(Port *port, bool SSLdone);
static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options);
static void processCancelRequest(Port *port, void *pkt);
static int initMasks(fd_set *rmask);
@@ -485,6 +494,7 @@ typedef struct
{
Port port;
InheritableSocket portsocket;
+ InheritableSocket sessionsocket;
char DataDir[MAXPGPATH];
pgsocket ListenSocket[MAXLISTEN];
int32 MyCancelKey;
@@ -568,6 +578,22 @@ HANDLE PostmasterHandle;
#endif
/*
+ * Move current backend pointer to the next normal backend.
+ * This function is called either when new session is started to implement round-robin policy, either when backend pointer by BackendListClockPtr is terminated
+ */
+static void AdvanceBackendListClockPtr(void)
+{
+ Backend* b = BackendListClockPtr;
+ do {
+ dlist_node* node = &b->elem;
+ node = node->next ? node->next : BackendList.head.next;
+ b = dlist_container(Backend, elem, node);
+ } while (b->bkend_type != BACKEND_TYPE_NORMAL && b != BackendListClockPtr);
+
+ BackendListClockPtr = b;
+}
+
+/*
* Postmaster main entry point
*/
void
@@ -1944,8 +1970,8 @@ initMasks(fd_set *rmask)
* send anything to the client, which would typically be appropriate
* if we detect a communications failure.)
*/
-static int
-ProcessStartupPacket(Port *port, bool SSLdone)
+int
+ProcessStartupPacket(Port *port, bool SSLdone, MemoryContext memctx)
{
int32 len;
void *buf;
@@ -2043,7 +2069,7 @@ retry1:
#endif
/* regular startup packet, cancel, etc packet should follow... */
/* but not another SSL negotiation request */
- return ProcessStartupPacket(port, true);
+ return ProcessStartupPacket(port, true, memctx);
}
/* Could add additional special packet types here */
@@ -2073,7 +2099,7 @@ retry1:
* not worry about leaking this storage on failure, since we aren't in the
* postmaster process anymore.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ oldcontext = MemoryContextSwitchTo(memctx);
if (PG_PROTOCOL_MAJOR(proto) >= 3)
{
@@ -2449,7 +2475,7 @@ ConnCreate(int serverFd)
ConnFree(port);
return NULL;
}
-
+ SessionPoolSock = PGINVALID_SOCKET;
/*
* Allocate GSSAPI specific state struct
*/
@@ -3236,6 +3262,24 @@ CleanupBackgroundWorker(int pid,
}
/*
+ * Unlink backend from backend's list and free memory
+ */
+static void UnlinkBackend(Backend* bp)
+{
+ if (bp->bkend_type == BACKEND_TYPE_NORMAL)
+ {
+ if (bp == BackendListClockPtr)
+ AdvanceBackendListClockPtr();
+ if (bp->session_send_sock != PGINVALID_SOCKET)
+ close(bp->session_send_sock);
+ elog(DEBUG2, "Cleanup backend %d", bp->pid);
+ nNormalBackends -= 1;
+ }
+ dlist_delete(&bp->elem);
+ free(bp);
+}
+
+/*
* CleanupBackend -- cleanup after terminated backend.
*
* Remove all local state associated with backend.
@@ -3312,8 +3356,7 @@ CleanupBackend(int pid,
*/
BackgroundWorkerStopNotifications(bp->pid);
}
- dlist_delete(iter.cur);
- free(bp);
+ UnlinkBackend(bp);
break;
}
}
@@ -3415,8 +3458,7 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
ShmemBackendArrayRemove(bp);
#endif
}
- dlist_delete(iter.cur);
- free(bp);
+ UnlinkBackend(bp);
/* Keep looping so we can signal remaining backends */
}
else
@@ -4017,6 +4059,20 @@ BackendStartup(Port *port)
{
Backend *bn; /* for backend cleanup */
pid_t pid;
+ int session_pipe[2];
+
+ if (SessionPoolSize != 0 && nNormalBackends >= SessionPoolSize)
+ {
+ /* In case of session pooling instead of spawning new backend open new session at one of the existed backends. */
+ Assert(BackendListClockPtr && BackendListClockPtr->session_send_sock != PGINVALID_SOCKET);
+ elog(DEBUG2, "Start new session for socket %d at backend %d total %d", port->sock, BackendListClockPtr->pid, nNormalBackends);
+ /* Send connection socket to the backend pointed by BackendListClockPtr */
+ if (pg_send_sock(BackendListClockPtr->session_send_sock, port->sock, BackendListClockPtr->pid) < 0)
+ elog(FATAL, "Failed to send session socket: %m");
+ AdvanceBackendListClockPtr(); /* round-robin backends */
+ return STATUS_OK;
+ }
+
/*
* Create backend data structure. Better before the fork() so we can
@@ -4030,7 +4086,6 @@ BackendStartup(Port *port)
errmsg("out of memory")));
return STATUS_ERROR;
}
-
/*
* Compute the cancel key that will be assigned to this backend. The
* backend will have its own copy in the forked-off process' value of
@@ -4063,12 +4118,28 @@ BackendStartup(Port *port)
/* Hasn't asked to be notified about any bgworkers yet */
bn->bgworker_notify = false;
+ /* Create socket pair for sending session sockets to the backend */
+ if (SessionPoolSize != 0)
+ {
+ if (socketpair(AF_UNIX, SOCK_DGRAM, 0, session_pipe) < 0)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg_internal("could not create socket pair for launching sessions: %m")));
+#ifdef WIN32
+ SessionPoolSock = session_pipe[0];
+#endif
+ }
#ifdef EXEC_BACKEND
pid = backend_forkexec(port);
#else /* !EXEC_BACKEND */
pid = fork_process();
if (pid == 0) /* child */
{
+ if (SessionPoolSize != 0)
+ {
+ SessionPoolSock = session_pipe[0]; /* Use this socket for receiving client session socket descriptor */
+ close(session_pipe[1]); /* Close unused end of the pipe */
+ }
free(bn);
/* Detangle from postmaster */
@@ -4110,9 +4181,19 @@ BackendStartup(Port *port)
* of backends.
*/
bn->pid = pid;
+ if (SessionPoolSize != 0)
+ {
+ bn->session_send_sock = session_pipe[1]; /* Use this socket for sending client session socket descriptor */
+ close(session_pipe[0]); /* Close unused end of the pipe */
+ }
+ else
+ bn->session_send_sock = PGINVALID_SOCKET;
bn->bkend_type = BACKEND_TYPE_NORMAL; /* Can change later to WALSND */
dlist_push_head(&BackendList, &bn->elem);
-
+ if (BackendListClockPtr == NULL)
+ BackendListClockPtr = bn;
+ nNormalBackends += 1;
+ elog(DEBUG2, "Start backend %d total %d", pid, nNormalBackends);
#ifdef EXEC_BACKEND
if (!bn->dead_end)
ShmemBackendArrayAdd(bn);
@@ -4299,7 +4380,7 @@ BackendInitialize(Port *port)
* Receive the startup packet (which might turn out to be a cancel request
* packet).
*/
- status = ProcessStartupPacket(port, false);
+ status = ProcessStartupPacket(port, false, TopMemoryContext);
/*
* Stop here if it was bad or a cancel packet. ProcessStartupPacket
@@ -6033,6 +6114,9 @@ save_backend_variables(BackendParameters *param, Port *port,
if (!write_inheritable_socket(¶m->portsocket, port->sock, childPid))
return false;
+ if (!write_inheritable_socket(¶m->sessionsocket, SessionPoolSock, childPid))
+ return false;
+
strlcpy(param->DataDir, DataDir, MAXPGPATH);
memcpy(¶m->ListenSocket, &ListenSocket, sizeof(ListenSocket));
@@ -6265,6 +6349,7 @@ restore_backend_variables(BackendParameters *param, Port *port)
{
memcpy(port, ¶m->port, sizeof(Port));
read_inheritable_socket(&port->sock, ¶m->portsocket);
+ read_inheritable_socket(&SessionPoolSock, ¶m->sessionsocket);
SetDataDir(param->DataDir);
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index e6706f7..13d26fc 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -76,6 +76,7 @@ struct WaitEventSet
{
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */
+ int free_events; /* L1-list of free events linked by "pos" and terminated by -1*/
/*
* Array, of nevents_space length, storing the definition of events this
@@ -129,9 +130,9 @@ static void drainSelfPipe(void);
#if defined(WAIT_USE_EPOLL)
static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
#elif defined(WAIT_USE_POLL)
-static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove);
#elif defined(WAIT_USE_WIN32)
-static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove);
#endif
static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
@@ -562,6 +563,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
set->latch = NULL;
set->nevents_space = nevents;
+ set->free_events = -1;
#if defined(WAIT_USE_EPOLL)
#ifdef EPOLL_CLOEXEC
@@ -667,6 +669,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
void *user_data)
{
WaitEvent *event;
+ int free_event;
/* not enough space */
Assert(set->nevents < set->nevents_space);
@@ -690,8 +693,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
- event = &set->events[set->nevents];
- event->pos = set->nevents++;
+ free_event = set->free_events;
+ if (free_event >= 0)
+ {
+ event = &set->events[free_event];
+ set->free_events = event->pos;
+ event->pos = free_event;
+ }
+ else
+ {
+ event = &set->events[set->nevents];
+ event->pos = set->nevents;
+ }
+ set->nevents += 1;
event->fd = fd;
event->events = events;
event->user_data = user_data;
@@ -718,15 +732,38 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
- WaitEventAdjustWin32(set, event);
+ WaitEventAdjustWin32(set, event, false);
#endif
return event->pos;
}
/*
+ * Remove event with specified socket descriptor
+ */
+void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd)
+{
+ int i, n = set->nevents;
+ for (i = 0; i < n; i++)
+ {
+ WaitEvent *event = &set->events[i];
+ if (event->fd == fd)
+ {
+#if defined(WAIT_USE_EPOLL)
+ WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_POLL)
+ WaitEventAdjustPoll(set, event, true);
+#elif defined(WAIT_USE_WIN32)
+ WaitEventAdjustWin32(set, event, true);
+#endif
+ break;
+ }
+ }
+}
+
+/*
* Change the event mask and, in the WL_LATCH_SET case, the latch associated
* with the WaitEvent.
*
@@ -774,9 +811,9 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
- WaitEventAdjustWin32(set, event);
+ WaitEventAdjustWin32(set, event, false);
#endif
}
@@ -827,14 +864,33 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("epoll_ctl() failed: %m")));
+
+ if (action == EPOLL_CTL_DEL)
+ {
+ int pos = event->pos;
+ event->fd = PGINVALID_SOCKET;
+ set->nevents -= 1;
+ event->pos = set->free_events;
+ set->free_events = pos;
+ }
}
#endif
#if defined(WAIT_USE_POLL)
static void
-WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove)
{
- struct pollfd *pollfd = &set->pollfds[event->pos];
+ int pos = event->pos;
+ struct pollfd *pollfd = &set->pollfds[pos];
+
+ if (remove)
+ {
+ set->nevents -= 1;
+ *pollfd = set->pollfds[set->nevents];
+ set->events[pos] = set->events[set->nevents];
+ event->pos = pos;
+ return;
+ }
pollfd->revents = 0;
pollfd->fd = event->fd;
@@ -865,9 +921,24 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
#if defined(WAIT_USE_WIN32)
static void
-WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove)
{
- HANDLE *handle = &set->handles[event->pos + 1];
+ int pos = event->pos;
+ HANDLE *handle = &set->handles[pos + 1];
+
+ if (remove)
+ {
+ Assert(event->fd != PGINVALID_SOCKET);
+
+ if (*handle != WSA_INVALID_EVENT)
+ WSACloseEvent(*handle);
+
+ set->nevents -= 1;
+ set->events[pos] = set->events[set->nevents];
+ *handle = set->events[set->nevents + 1];
+ event->pos = pos;
+ return;
+ }
if (event->events == WL_LATCH_SET)
{
@@ -880,7 +951,7 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
}
else
{
- int flags = FD_CLOSE; /* always check for errors/EOF */
+ int flags = FD_CLOSE; /* always check for errors/EOF */
if (event->events & WL_SOCKET_READABLE)
flags |= FD_READ;
@@ -1296,7 +1367,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
{
if (cur_event->reset)
{
- WaitEventAdjustWin32(set, cur_event);
+ WaitEventAdjustWin32(set, cur_event, false);
cur_event->reset = false;
}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index ddc3ec8..bf704d5 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -40,6 +40,7 @@
#include "access/printtup.h"
#include "access/xact.h"
#include "catalog/pg_type.h"
+#include "catalog/namespace.h"
#include "commands/async.h"
#include "commands/prepare.h"
#include "libpq/libpq.h"
@@ -75,9 +76,9 @@
#include "utils/snapmgr.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
+#include "utils/builtins.h"
#include "mb/pg_wchar.h"
-
/* ----------------
* global variables
* ----------------
@@ -98,6 +99,10 @@ int max_stack_depth = 100;
/* wait N seconds to allow attach from a debugger */
int PostAuthDelay = 0;
+/* Local socket for redirecting sessions to the backends */
+pgsocket SessionPoolSock = PGINVALID_SOCKET;
+/* Pointer to the active session */
+SessionContext* ActiveSession;
/* ----------------
@@ -169,6 +174,12 @@ static ProcSignalReason RecoveryConflictReason;
static MemoryContext row_description_context = NULL;
static StringInfoData row_description_buf;
+static WaitEventSet* SessionPool; /* Set of all sessions sockets */
+static int64 SessionCount; /* Number of sessions */
+static Port* BackendPort; /* Reference to the original port of this backend created when this backend was launched.
+ * Session using this port may be already terminated, but since it is allocated in TopMemoryContext,
+ * its content is still valid and is used as template for ports of new sessions */
+
/* ----------------------------------------------------------------
* decls for routines only used in this file
* ----------------------------------------------------------------
@@ -194,6 +205,27 @@ static void log_disconnections(int code, Datum arg);
static void enable_statement_timeout(void);
static void disable_statement_timeout(void);
+/*
+ * Generate session ID unique within this backend
+ */
+static char* CreateSessionId(void)
+{
+ char buf[64];
+ pg_lltoa(++SessionCount, buf);
+ return pstrdup(buf);
+}
+
+/*
+ * Free all memory associated with session and delete session object itself
+ */
+static void DeleteSession(SessionContext* session)
+{
+ elog(DEBUG1, "Delete session %p, id=%s, memory context=%p", session, session->id, session->memory);
+ RestoreSessionGUCs(session);
+ ReleaseSessionGUCs(session);
+ MemoryContextDelete(session->memory);
+ free(session);
+}
/* ----------------------------------------------------------------
* routines to obtain user input
@@ -1232,6 +1264,12 @@ exec_parse_message(const char *query_string, /* string to execute */
bool save_log_statement_stats = log_statement_stats;
char msec_str[32];
+ if (ActiveSession && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", ActiveSession->id, stmt_name);
+ }
+
/*
* Report query to various monitoring facilities.
*/
@@ -1503,6 +1541,12 @@ exec_bind_message(StringInfo input_message)
portal_name = pq_getmsgstring(input_message);
stmt_name = pq_getmsgstring(input_message);
+ if (ActiveSession && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", ActiveSession->id, stmt_name);
+ }
+
ereport(DEBUG2,
(errmsg("bind %s to %s",
*portal_name ? portal_name : "<unnamed>",
@@ -2325,6 +2369,12 @@ exec_describe_statement_message(const char *stmt_name)
CachedPlanSource *psrc;
int i;
+ if (ActiveSession && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", ActiveSession->id, stmt_name);
+ }
+
/*
* Start up a transaction command. (Note that this will normally change
* current memory context.) Nothing happens if we are already in one.
@@ -3603,7 +3653,6 @@ process_postgres_switches(int argc, char *argv[], GucContext ctx,
#endif
}
-
/* ----------------------------------------------------------------
* PostgresMain
* postgres main loop -- all backends, interactive or otherwise start here
@@ -3654,6 +3703,21 @@ PostgresMain(int argc, char *argv[],
progname)));
}
+ /* Assign session for this backend in case of session pooling */
+ if (SessionPoolSize != 0)
+ {
+ MemoryContext oldcontext;
+ ActiveSession = (SessionContext*)calloc(1, sizeof(SessionContext));
+ ActiveSession->memory = AllocSetContextCreate(TopMemoryContext,
+ "SessionMemoryContext",
+ ALLOCSET_DEFAULT_SIZES);
+ oldcontext = MemoryContextSwitchTo(ActiveSession->memory);
+ ActiveSession->id = CreateSessionId();
+ ActiveSession->port = MyProcPort;
+ BackendPort = MyProcPort;
+ MemoryContextSwitchTo(oldcontext);
+ }
+
/* Acquire configuration parameters, unless inherited from postmaster */
if (!IsUnderPostmaster)
{
@@ -3783,7 +3847,7 @@ PostgresMain(int argc, char *argv[],
* ... else we'd need to copy the Port data first. Also, subsidiary data
* such as the username isn't lost either; see ProcessStartupPacket().
*/
- if (PostmasterContext)
+ if (PostmasterContext && SessionPoolSize == 0)
{
MemoryContextDelete(PostmasterContext);
PostmasterContext = NULL;
@@ -4069,6 +4133,150 @@ PostgresMain(int argc, char *argv[],
ReadyForQuery(whereToSendOutput);
send_ready_for_query = false;
+
+ /*
+ * Here we perform multiplexing of client sessions if session pooling is enabled.
+ * As far as we perform transaction level pooling, rescheduling is done only when we are not in transaction.
+ */
+ if (SessionPoolSock != PGINVALID_SOCKET && !IsTransactionState() && pq_available_bytes() == 0)
+ {
+ WaitEvent ready_client;
+ if (SessionPool == NULL)
+ {
+ /* Construct wait event set if not constructed yet */
+ SessionPool = CreateWaitEventSet(TopMemoryContext, MaxSessions);
+ /* Add event to detect postmaster death */
+ AddWaitEventToSet(SessionPool, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, ActiveSession);
+ /* Add event for backends latch */
+ AddWaitEventToSet(SessionPool, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, ActiveSession);
+ /* Add event for accepting new sessions */
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, SessionPoolSock, NULL, ActiveSession);
+ /* Add event for current session */
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, MyProcPort->sock, NULL, ActiveSession);
+ }
+ ChooseSession:
+ DoingCommandRead = true;
+ /* Select which client session is ready to send new query */
+ if (WaitEventSetWait(SessionPool, -1, &ready_client, 1, PG_WAIT_CLIENT) != 1)
+ {
+ /* TODO: do some error recovery here */
+ elog(FATAL, "Failed to poll client sessions");
+ }
+ CHECK_FOR_INTERRUPTS();
+ DoingCommandRead = false;
+
+ if (ready_client.events & WL_POSTMASTER_DEATH)
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating connection due to unexpected postmaster exit")));
+
+ if (ready_client.events & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ ProcessClientReadInterrupt(true);
+ goto ChooseSession;
+ }
+
+ if (ready_client.fd == SessionPoolSock)
+ {
+ /* Here we handle case of attaching new session */
+ int status;
+ SessionContext* session;
+ StringInfoData buf;
+ Port* port;
+ pgsocket sock;
+ MemoryContext oldcontext;
+
+ sock = pg_recv_sock(SessionPoolSock);
+ if (sock < 0)
+ elog(FATAL, "Failed to receive session socket: %m");
+
+ session = (SessionContext*)calloc(1, sizeof(SessionContext));
+ session->memory = AllocSetContextCreate(TopMemoryContext,
+ "SessionMemoryContext",
+ ALLOCSET_DEFAULT_SIZES);
+ oldcontext = MemoryContextSwitchTo(session->memory);
+ port = palloc(sizeof(Port));
+ memcpy(port, BackendPort, sizeof(Port));
+
+ /*
+ * Receive the startup packet (which might turn out to be a cancel request
+ * packet).
+ */
+ port->sock = sock;
+ session->port = port;
+ session->id = CreateSessionId();
+
+ MyProcPort = port;
+ status = ProcessStartupPacket(port, false, session->memory);
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * TODO: Currently we assume that all sessions are accessing the same database under the same user.
+ * Just report an error if it is not true
+ */
+ if (strcmp(port->database_name, MyProcPort->database_name) ||
+ strcmp(port->user_name, MyProcPort->user_name))
+ {
+ elog(FATAL, "Failed to open session (dbname=%s user=%s) in backend %d (dbname=%s user=%s)",
+ port->database_name, port->user_name,
+ MyProcPid, MyProcPort->database_name, MyProcPort->user_name);
+ }
+ else if (status == STATUS_OK)
+ {
+ elog(DEBUG2, "Start new session %d in backend %d for database %s user %s",
+ sock, MyProcPid, port->database_name, port->user_name);
+ RestoreSessionGUCs(ActiveSession);
+ ActiveSession = session;
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, sock, NULL, session);
+
+ SetCurrentStatementStartTimestamp();
+ StartTransactionCommand();
+ PerformAuthentication(MyProcPort);
+ CommitTransactionCommand();
+
+ /*
+ * Send GUC options to the client
+ */
+ BeginReportingGUCOptions();
+
+ /*
+ * Send this backend's cancellation info to the frontend.
+ */
+ pq_beginmessage(&buf, 'K');
+ pq_sendint32(&buf, (int32) MyProcPid);
+ pq_sendint32(&buf, (int32) MyCancelKey);
+ pq_endmessage(&buf);
+
+ /* Need not flush since ReadyForQuery will do it. */
+ send_ready_for_query = true;
+ continue;
+ }
+ else
+ {
+ /* Error while processing of startup package
+ * Reject this session and return back to listening sockets
+ */
+ DeleteSession(session);
+ elog(LOG, "Session startup failed");
+ close(sock);
+ goto ChooseSession;
+ }
+ }
+ else
+ {
+ SessionContext* newSession = (SessionContext*)ready_client.user_data;
+ if (ActiveSession != newSession)
+ {
+ elog(DEBUG2, "Switch to session %d in backend %d", ready_client.fd, MyProcPid);
+ RestoreSessionGUCs(ActiveSession);
+ ActiveSession = newSession;
+ RestoreSessionGUCs(ActiveSession);
+ MyProcPort = ActiveSession->port;
+ SetTempNamespaceState(ActiveSession->tempNamespace, ActiveSession->tempToastNamespace);
+ }
+ }
+ }
}
/*
@@ -4350,6 +4558,39 @@ PostgresMain(int argc, char *argv[],
* it will fail to be called during other backend-shutdown
* scenarios.
*/
+
+ if (SessionPool)
+ {
+ /* In case of session pooling close the session, but do not terminate the backend
+ * even if there are not more sessions in this backend.
+ * The reason for keeping backend alive is to prevent redundant process launches if
+ * some client repeatedly open/close connection to the database.
+ * Maximal number of launched backends in case of connection pooling is intended to be
+ * optimal for this system and workload, so there are no reasons to try to reduce this number
+ * when there are no active sessions.
+ */
+ DeleteWaitEventFromSet(SessionPool, MyProcPort->sock);
+ elog(DEBUG1, "Close session %d in backend %d", MyProcPort->sock, MyProcPid);
+
+ pq_getmsgend(&input_message);
+ if (pq_is_reading_msg())
+ pq_endmsgread();
+
+ close(MyProcPort->sock);
+ MyProcPort->sock = PGINVALID_SOCKET;
+ MyProcPort = NULL;
+
+ if (ActiveSession)
+ {
+ DropSessionPreparedStatements(ActiveSession->id);
+ DeleteSession(ActiveSession);
+ ActiveSession = NULL;
+ }
+ whereToSendOutput = DestRemote;
+ /* Need to perform rescheduling to some other session or accept new session */
+ goto ChooseSession;
+ }
+ elog(DEBUG1, "Terminate backend %d", MyProcPid);
proc_exit(0);
case 'd': /* copy data */
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 54fa4a3..b2f43a8 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -120,7 +120,9 @@ int maintenance_work_mem = 16384;
* register background workers.
*/
int NBuffers = 1000;
+int SessionPoolSize = 0;
int MaxConnections = 90;
+int MaxSessions = 1000;
int max_worker_processes = 8;
int max_parallel_workers = 8;
int MaxBackends = 0;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index f9b3309..571c80f 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -65,7 +65,7 @@
static HeapTuple GetDatabaseTuple(const char *dbname);
static HeapTuple GetDatabaseTupleByOid(Oid dboid);
-static void PerformAuthentication(Port *port);
+void PerformAuthentication(Port *port);
static void CheckMyDatabase(const char *name, bool am_superuser);
static void InitCommunication(void);
static void ShutdownPostgres(int code, Datum arg);
@@ -180,7 +180,7 @@ GetDatabaseTupleByOid(Oid dboid)
*
* returns: nothing. Will not return at all if there's any failure.
*/
-static void
+void
PerformAuthentication(Port *port)
{
/* This should be set already, but let's make sure */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 72f6be3..58258b4 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1871,6 +1871,29 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"max_sessions", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets the maximum number of client session."),
+ gettext_noop("Maximal number of client sessions which can be handled by one backend if session pooling is switched on. "
+ "So maximal number of client connections is session_pool_size*max_sessions")
+ },
+ &MaxSessions,
+ 1000, 1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"session_pool_size", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets number of backends serving client sessions."),
+ gettext_noop("If non-zero then session pooling will be used: "
+ "client connections will be redirected to one of the backends and maximal number of backends is determined by this parameter."
+ "Launched backend are never terminated even in case of no active sessions.")
+ },
+ &SessionPoolSize,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the number of connection slots reserved for superusers."),
NULL
@@ -5104,6 +5127,95 @@ NewGUCNestLevel(void)
}
/*
+ * Set GUCs for this session
+ */
+void
+RestoreSessionGUCs(SessionContext* session)
+{
+ SessionGUC* sg;
+ if (session == NULL)
+ return;
+ for (sg = session->gucs; sg != NULL; sg = sg->next)
+ {
+ void* old_extra = sg->var->extra;
+ sg->var->extra = sg->val.extra;
+ switch (sg->var->vartype)
+ {
+ case PGC_BOOL:
+ {
+ struct config_bool *conf = (struct config_bool*)sg->var;
+ bool oldval = *conf->variable;
+ *conf->variable = sg->val.val.boolval;
+ if (conf->assign_hook)
+ conf->assign_hook(sg->val.val.boolval, sg->val.extra);
+ sg->val.val.boolval = oldval;
+ break;
+ }
+ case PGC_INT:
+ {
+ struct config_int *conf = (struct config_int*)sg->var;
+ int oldval = *conf->variable;
+ *conf->variable = sg->val.val.intval;
+ if (conf->assign_hook)
+ conf->assign_hook(sg->val.val.intval, sg->val.extra);
+ sg->val.val.intval = oldval;
+ break;
+ }
+ case PGC_REAL:
+ {
+ struct config_real *conf = (struct config_real*)sg->var;
+ double oldval = *conf->variable;
+ *conf->variable = sg->val.val.realval;
+ if (conf->assign_hook)
+ conf->assign_hook(sg->val.val.realval, sg->val.extra);
+ sg->val.val.realval = oldval;
+ break;
+ }
+ case PGC_STRING:
+ {
+ struct config_string *conf = (struct config_string*)sg->var;
+ char* oldval = *conf->variable;
+ *conf->variable = sg->val.val.stringval;
+ if (conf->assign_hook)
+ conf->assign_hook(sg->val.val.stringval, sg->val.extra);
+ sg->val.val.stringval = oldval;
+ break;
+ }
+ case PGC_ENUM:
+ {
+ struct config_enum *conf = (struct config_enum*)sg->var;
+ int oldval = *conf->variable;
+ *conf->variable = sg->val.val.enumval;
+ if (conf->assign_hook)
+ conf->assign_hook(sg->val.val.enumval, sg->val.extra);
+ sg->val.val.enumval = oldval;
+ break;
+ }
+ }
+ sg->val.extra = old_extra;
+ }
+}
+
+/*
+ * Deallocate memory for session GUCs
+ */
+void
+ReleaseSessionGUCs(SessionContext* session)
+{
+ SessionGUC* sg;
+ for (sg = session->gucs; sg != NULL; sg = sg->next)
+ {
+ if (sg->val.extra)
+ set_extra_field(sg->var, &sg->val.extra, NULL);
+ if (sg->var->vartype == PGC_STRING)
+ {
+ struct config_string* conf = (struct config_string*)sg->var;
+ set_string_field(conf, &sg->val.val.stringval, NULL);
+ }
+ }
+}
+
+/*
* Do GUC processing at transaction or subtransaction commit or abort, or
* when exiting a function that has proconfig settings, or when undoing a
* transient assignment to some GUC variables. (The name is thus a bit of
@@ -5172,7 +5284,42 @@ AtEOXact_GUC(bool isCommit, int nestLevel)
else if (stack->state == GUC_SET)
{
/* we keep the current active value */
- discard_stack_value(gconf, &stack->prior);
+ if (ActiveSession)
+ {
+ SessionGUC* sg;
+ for (sg = ActiveSession->gucs; sg != NULL && sg->var != gconf; sg = sg->next);
+ if (sg == NULL)
+ {
+ sg = MemoryContextAllocZero(ActiveSession->memory,
+ sizeof(SessionGUC));
+ sg->var = gconf;
+ sg->next = ActiveSession->gucs;
+ ActiveSession->gucs = sg;
+ }
+ switch (gconf->vartype)
+ {
+ case PGC_BOOL:
+ sg->val.val.boolval = stack->prior.val.boolval;
+ break;
+ case PGC_INT:
+ sg->val.val.intval = stack->prior.val.intval;
+ break;
+ case PGC_REAL:
+ sg->val.val.realval = stack->prior.val.realval;
+ break;
+ case PGC_STRING:
+ sg->val.val.stringval = stack->prior.val.stringval;
+ break;
+ case PGC_ENUM:
+ sg->val.val.enumval = stack->prior.val.enumval;
+ break;
+ }
+ sg->val.extra = stack->prior.extra;
+ }
+ else
+ {
+ discard_stack_value(gconf, &stack->prior);
+ }
}
else /* must be GUC_LOCAL */
restorePrior = true;
@@ -5197,8 +5344,8 @@ AtEOXact_GUC(bool isCommit, int nestLevel)
case GUC_SET:
/* next level always becomes SET */
- discard_stack_value(gconf, &stack->prior);
- if (prev->state == GUC_SET_LOCAL)
+ discard_stack_value(gconf, &stack->prior);
+ if (prev->state == GUC_SET_LOCAL)
discard_stack_value(gconf, &prev->masked);
prev->state = GUC_SET;
break;
diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h
index ffec029..cb5f8d4 100644
--- a/src/include/commands/prepare.h
+++ b/src/include/commands/prepare.h
@@ -56,5 +56,6 @@ extern TupleDesc FetchPreparedStatementResultDesc(PreparedStatement *stmt);
extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt);
extern void DropAllPreparedStatements(void);
+extern void DropSessionPreparedStatements(char const* sessionId);
#endif /* PREPARE_H */
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 2e7725d..9169b21 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -71,6 +71,7 @@ extern int pq_getbyte(void);
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
extern int pq_putbytes(const char *s, size_t len);
+extern int pq_available_bytes(void);
/*
* prototypes for functions in be-secure.c
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 54ee273..a9f9228 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -157,6 +157,8 @@ extern PGDLLIMPORT char *DataDir;
extern PGDLLIMPORT int NBuffers;
extern PGDLLIMPORT int MaxBackends;
extern PGDLLIMPORT int MaxConnections;
+extern PGDLLIMPORT int MaxSessions;
+extern PGDLLIMPORT int SessionPoolSize;
extern PGDLLIMPORT int max_worker_processes;
extern int max_parallel_workers;
@@ -420,6 +422,7 @@ extern void InitializeMaxBackends(void);
extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username,
Oid useroid, char *out_dbname);
extern void BaseInit(void);
+extern void PerformAuthentication(struct Port *port);
/* in utils/init/miscinit.c */
extern bool IgnoreSystemIndexes;
diff --git a/src/include/port.h b/src/include/port.h
index 3e528fa..8a0ac98 100644
--- a/src/include/port.h
+++ b/src/include/port.h
@@ -41,6 +41,10 @@ typedef SOCKET pgsocket;
extern bool pg_set_noblock(pgsocket sock);
extern bool pg_set_block(pgsocket sock);
+/* send/receive socket descriptor */
+extern int pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid);
+extern pgsocket pg_recv_sock(pgsocket chan);
+
/* Portable path handling for Unix/Win32 (in path.c) */
extern bool has_drive_prefix(const char *filename);
diff --git a/src/include/port/win32_port.h b/src/include/port/win32_port.h
index d31c28f..e667434 100644
--- a/src/include/port/win32_port.h
+++ b/src/include/port/win32_port.h
@@ -447,6 +447,7 @@ extern int pgkill(int pid, int sig);
#define select(n, r, w, e, timeout) pgwin32_select(n, r, w, e, timeout)
#define recv(s, buf, len, flags) pgwin32_recv(s, buf, len, flags)
#define send(s, buf, len, flags) pgwin32_send(s, buf, len, flags)
+#define socketpair(af, type, protocol, socks) pgwin32_socketpair(af, type, protocol, socks)
SOCKET pgwin32_socket(int af, int type, int protocol);
int pgwin32_bind(SOCKET s, struct sockaddr *addr, int addrlen);
@@ -456,6 +457,7 @@ int pgwin32_connect(SOCKET s, const struct sockaddr *name, int namelen);
int pgwin32_select(int nfds, fd_set *readfs, fd_set *writefds, fd_set *exceptfds, const struct timeval *timeout);
int pgwin32_recv(SOCKET s, char *buf, int len, int flags);
int pgwin32_send(SOCKET s, const void *buf, int len, int flags);
+int pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2]);
const char *pgwin32_socket_strerror(int err);
int pgwin32_waitforsinglesocket(SOCKET s, int what, int timeout);
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index 1877eef..c9527c9 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -62,6 +62,9 @@ extern Size ShmemBackendArraySize(void);
extern void ShmemBackendArrayAllocation(void);
#endif
+struct Port;
+extern int ProcessStartupPacket(struct Port *port, bool SSLdone, MemoryContext memctx);
+
/*
* Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved
* for buffer references in buf_internals.h. This limitation could be lifted
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index a4bcb48..10f30d1 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -176,6 +176,8 @@ extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout,
extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
+extern void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd);
+
/*
* Unix implementation uses SIGUSR1 for inter-process signaling.
* Win32 doesn't need this.
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 5c19a61..11eded3 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -21,6 +21,7 @@
#include "storage/lock.h"
#include "storage/pg_sema.h"
#include "storage/proclist_types.h"
+#include "utils/guc_tables.h"
/*
* Each backend advertises up to PGPROC_MAX_CACHED_SUBXIDS TransactionIds
@@ -273,6 +274,29 @@ extern PGDLLIMPORT PROC_HDR *ProcGlobal;
extern PGPROC *PreparedXactProcs;
+typedef struct SessionGUC
+{
+ struct SessionGUC* next;
+ config_var_value val;
+ struct config_generic *var;
+} SessionGUC;
+
+/*
+ * Information associated with client session
+ */
+typedef struct SessionContext
+{
+ MemoryContext memory; /* memory context used for global session data (replacement of TopMemoryContext) */
+ struct Port* port; /* connection port */
+ char* id; /* session identifier used to construct unique prepared statement names */
+ Oid tempNamespace; /* temporary namespace */
+ Oid tempToastNamespace; /* temporary toast namespace */
+ SessionGUC* gucs;
+} SessionContext;
+
+
+extern PGDLLIMPORT SessionContext *ActiveSession;
+
/* Accessor for PGPROC given a pgprocno. */
#define GetPGProcByNumber(n) (&ProcGlobal->allProcs[(n)])
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 63b4e48..191eeaa 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -34,6 +34,7 @@ extern CommandDest whereToSendOutput;
extern PGDLLIMPORT const char *debug_query_string;
extern int max_stack_depth;
extern int PostAuthDelay;
+extern pgsocket SessionPoolSock;
/* GUC-configurable parameters */
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 77daa5a..86e89e8 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -394,6 +394,12 @@ extern Size EstimateGUCStateSpace(void);
extern void SerializeGUCState(Size maxsize, char *start_address);
extern void RestoreGUCState(void *gucstate);
+/* Session polling support function */
+struct SessionContext;
+extern void RestoreSessionGUCs(struct SessionContext* session);
+extern void ReleaseSessionGUCs(struct SessionContext* session);
+
+
/* Support for messages reported from GUC check hooks */
extern PGDLLIMPORT char *GUC_check_errmsg_string;