New versions of built-in connection pool is attached to this mail.
Now client's startup package is received by one of listener workers and
postmater knows database/user name of the recevied connection and so is
able to marshal it to the proper connection pool. Right now SSL is not
supported.
Also I provided some general mechanism for moving static variables to
session context. File
include/storage/sessionvars.h contains list of such variables which are
stored to session context on reschedule.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index cf3f773..14c4163 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -23,6 +23,8 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for
-- test db filtering
\set prevdb :DBNAME
+show session_pool_size;
+show session_pool_ports;
\c template1
SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 5d13e6a..5a93c7e 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -178,7 +178,6 @@ static List *overrideStack = NIL;
* committed its creation, depending on whether myTempNamespace is valid.
*/
static Oid myTempNamespace = InvalidOid;
-
static Oid myTempToastNamespace = InvalidOid;
static SubTransactionId myTempNamespaceSubID = InvalidSubTransactionId;
@@ -193,6 +192,7 @@ char *namespace_search_path = NULL;
/* Local functions */
static void recomputeNamespacePath(void);
static void InitTempTableNamespace(void);
+static Oid GetTempTableNamespace(void);
static void RemoveTempRelations(Oid tempNamespaceId);
static void RemoveTempRelationsCallback(int code, Datum arg);
static void NamespaceCallback(Datum arg, int cacheid, uint32 hashvalue);
@@ -460,9 +460,7 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation)
if (strcmp(newRelation->schemaname, "pg_temp") == 0)
{
/* Initialize temp namespace if first time through */
- if (!OidIsValid(myTempNamespace))
- InitTempTableNamespace();
- return myTempNamespace;
+ return GetTempTableNamespace();
}
/* use exact schema given */
namespaceId = get_namespace_oid(newRelation->schemaname, false);
@@ -471,9 +469,7 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation)
else if (newRelation->relpersistence == RELPERSISTENCE_TEMP)
{
/* Initialize temp namespace if first time through */
- if (!OidIsValid(myTempNamespace))
- InitTempTableNamespace();
- return myTempNamespace;
+ return GetTempTableNamespace();
}
else
{
@@ -482,8 +478,7 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation)
if (activeTempCreationPending)
{
/* Need to initialize temp namespace */
- InitTempTableNamespace();
- return myTempNamespace;
+ return GetTempTableNamespace();
}
namespaceId = activeCreationNamespace;
if (!OidIsValid(namespaceId))
@@ -2921,9 +2916,7 @@ LookupCreationNamespace(const char *nspname)
if (strcmp(nspname, "pg_temp") == 0)
{
/* Initialize temp namespace if first time through */
- if (!OidIsValid(myTempNamespace))
- InitTempTableNamespace();
- return myTempNamespace;
+ return GetTempTableNamespace();
}
namespaceId = get_namespace_oid(nspname, false);
@@ -2986,9 +2979,7 @@ QualifiedNameGetCreationNamespace(List *names, char **objname_p)
if (strcmp(schemaname, "pg_temp") == 0)
{
/* Initialize temp namespace if first time through */
- if (!OidIsValid(myTempNamespace))
- InitTempTableNamespace();
- return myTempNamespace;
+ return GetTempTableNamespace();
}
/* use exact schema given */
namespaceId = get_namespace_oid(schemaname, false);
@@ -3001,8 +2992,7 @@ QualifiedNameGetCreationNamespace(List *names, char **objname_p)
if (activeTempCreationPending)
{
/* Need to initialize temp namespace */
- InitTempTableNamespace();
- return myTempNamespace;
+ return GetTempTableNamespace();
}
namespaceId = activeCreationNamespace;
if (!OidIsValid(namespaceId))
@@ -3254,16 +3244,28 @@ int
GetTempNamespaceBackendId(Oid namespaceId)
{
int result;
- char *nspname;
+ char *nspname,
+ *addlevel;
/* See if the namespace name starts with "pg_temp_" or "pg_toast_temp_" */
nspname = get_namespace_name(namespaceId);
if (!nspname)
return InvalidBackendId; /* no such namespace? */
if (strncmp(nspname, "pg_temp_", 8) == 0)
- result = atoi(nspname + 8);
+ {
+ /* check for session id */
+ if ((addlevel = strstr(nspname + 8, "_")) != NULL)
+ result = atoi(addlevel + 1);
+ else
+ result = atoi(nspname + 8);
+ }
else if (strncmp(nspname, "pg_toast_temp_", 14) == 0)
- result = atoi(nspname + 14);
+ {
+ if ((addlevel = strstr(nspname + 14, "_")) != NULL)
+ result = atoi(addlevel + 1);
+ else
+ result = atoi(nspname + 14);
+ }
else
result = InvalidBackendId;
pfree(nspname);
@@ -3309,8 +3311,11 @@ void
SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId)
{
/* Worker should not have created its own namespaces ... */
- Assert(myTempNamespace == InvalidOid);
- Assert(myTempToastNamespace == InvalidOid);
+ if (!ActiveSession)
+ {
+ Assert(myTempNamespace == InvalidOid);
+ Assert(myTempToastNamespace == InvalidOid);
+ }
Assert(myTempNamespaceSubID == InvalidSubTransactionId);
/* Assign same namespace OIDs that leader has */
@@ -3830,6 +3835,24 @@ recomputeNamespacePath(void)
list_free(oidlist);
}
+static Oid
+GetTempTableNamespace(void)
+{
+ if (ActiveSession)
+ {
+ if (!OidIsValid(ActiveSession->tempNamespace))
+ InitTempTableNamespace();
+ else
+ myTempNamespace = ActiveSession->tempNamespace;
+ }
+ else
+ {
+ if (!OidIsValid(myTempNamespace))
+ InitTempTableNamespace();
+ }
+ return myTempNamespace;
+}
+
/*
* InitTempTableNamespace
* Initialize temp table namespace on first use in a particular backend
@@ -3841,8 +3864,6 @@ InitTempTableNamespace(void)
Oid namespaceId;
Oid toastspaceId;
- Assert(!OidIsValid(myTempNamespace));
-
/*
* First, do permission check to see if we are authorized to make temp
* tables. We use a nonstandard error message here since "databasename:
@@ -3881,7 +3902,12 @@ InitTempTableNamespace(void)
(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
errmsg("cannot create temporary tables during a parallel operation")));
- snprintf(namespaceName, sizeof(namespaceName), "pg_temp_%d", MyBackendId);
+ if (ActiveSession)
+ snprintf(namespaceName, sizeof(namespaceName), "pg_temp_%d_%u",
+ ActiveSession->id, MyBackendId);
+ else
+ snprintf(namespaceName, sizeof(namespaceName), "pg_temp_%d",
+ MyBackendId);
namespaceId = get_namespace_oid(namespaceName, true);
if (!OidIsValid(namespaceId))
@@ -3913,8 +3939,12 @@ InitTempTableNamespace(void)
* it. (We assume there is no need to clean it out if it does exist, since
* dropping a parent table should make its toast table go away.)
*/
- snprintf(namespaceName, sizeof(namespaceName), "pg_toast_temp_%d",
- MyBackendId);
+ if (ActiveSession)
+ snprintf(namespaceName, sizeof(namespaceName), "pg_toast_temp_%d_%u",
+ ActiveSession->id, MyBackendId);
+ else
+ snprintf(namespaceName, sizeof(namespaceName), "pg_toast_temp_%u",
+ MyBackendId);
toastspaceId = get_namespace_oid(namespaceName, true);
if (!OidIsValid(toastspaceId))
@@ -3945,6 +3975,11 @@ InitTempTableNamespace(void)
*/
MyProc->tempNamespaceId = namespaceId;
+ if (ActiveSession)
+ {
+ ActiveSession->tempNamespace = namespaceId;
+ ActiveSession->tempToastNamespace = toastspaceId;
+ }
/* It should not be done already. */
AssertState(myTempNamespaceSubID == InvalidSubTransactionId);
myTempNamespaceSubID = GetCurrentSubTransactionId();
@@ -3974,6 +4009,11 @@ AtEOXact_Namespace(bool isCommit, bool parallel)
{
myTempNamespace = InvalidOid;
myTempToastNamespace = InvalidOid;
+ if (ActiveSession)
+ {
+ ActiveSession->tempNamespace = InvalidOid;
+ ActiveSession->tempToastNamespace = InvalidOid;
+ }
baseSearchPathValid = false; /* need to rebuild list */
/*
@@ -4121,13 +4161,16 @@ RemoveTempRelations(Oid tempNamespaceId)
static void
RemoveTempRelationsCallback(int code, Datum arg)
{
- if (OidIsValid(myTempNamespace)) /* should always be true */
+ Oid tempNamespace = ActiveSession ?
+ ActiveSession->tempNamespace : myTempNamespace;
+
+ if (OidIsValid(tempNamespace)) /* should always be true */
{
/* Need to ensure we have a usable transaction. */
AbortOutOfAnyTransaction();
StartTransactionCommand();
- RemoveTempRelations(myTempNamespace);
+ RemoveTempRelations(tempNamespace);
CommitTransactionCommand();
}
@@ -4137,10 +4180,19 @@ RemoveTempRelationsCallback(int code, Datum arg)
* Remove all temp tables from the temporary namespace.
*/
void
-ResetTempTableNamespace(void)
+ResetTempTableNamespace(Oid npc)
{
- if (OidIsValid(myTempNamespace))
- RemoveTempRelations(myTempNamespace);
+ if (OidIsValid(npc))
+ {
+ AbortOutOfAnyTransaction();
+ StartTransactionCommand();
+ RemoveTempRelations(npc);
+ CommitTransactionCommand();
+ }
+ else
+ /* global */
+ if (OidIsValid(myTempNamespace))
+ RemoveTempRelations(myTempNamespace);
}
diff --git a/src/backend/catalog/pg_db_role_setting.c b/src/backend/catalog/pg_db_role_setting.c
index e123691..23ff527 100644
--- a/src/backend/catalog/pg_db_role_setting.c
+++ b/src/backend/catalog/pg_db_role_setting.c
@@ -16,6 +16,7 @@
#include "catalog/indexing.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_db_role_setting.h"
+#include "storage/proc.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
#include "utils/tqual.h"
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 5df4382..f57a950 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -24,6 +24,7 @@
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
+#include "catalog/namespace.h"
#include "catalog/storage.h"
#include "catalog/storage_xlog.h"
#include "storage/freespace.h"
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 9bc67ce..3c90f8d 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2447,7 +2447,7 @@ CopyFrom(CopyState cstate)
* registers the snapshot it uses.
*/
InvalidateCatalogSnapshot();
- if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
+ if (!ThereAreNoPriorRegisteredSnapshots() || (SessionPoolSize == 0 && !ThereAreNoReadyPortals()))
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot perform FREEZE because of prior transaction activity")));
diff --git a/src/backend/commands/discard.c b/src/backend/commands/discard.c
index 01a999c..363a52a 100644
--- a/src/backend/commands/discard.c
+++ b/src/backend/commands/discard.c
@@ -45,7 +45,7 @@ DiscardCommand(DiscardStmt *stmt, bool isTopLevel)
break;
case DISCARD_TEMP:
- ResetTempTableNamespace();
+ ResetTempTableNamespace(InvalidOid);
break;
default:
@@ -73,6 +73,6 @@ DiscardAll(bool isTopLevel)
Async_UnlistenAll();
LockReleaseAll(USER_LOCKMETHOD, true);
ResetPlanCache();
- ResetTempTableNamespace();
+ ResetTempTableNamespace(InvalidOid);
ResetSequenceCaches();
}
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index b945b15..1696500 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -30,9 +30,11 @@
#include "parser/parse_expr.h"
#include "parser/parse_type.h"
#include "rewrite/rewriteHandler.h"
+#include "storage/proc.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
+#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
@@ -43,9 +45,7 @@
* The keys for this hash table are the arguments to PREPARE and EXECUTE
* (statement names); the entries are PreparedStatement structs.
*/
-static HTAB *prepared_queries = NULL;
-
-static void InitQueryHashTable(void);
+static HTAB *InitQueryHashTable(MemoryContext mcxt);
static ParamListInfo EvaluateParams(PreparedStatement *pstmt, List *params,
const char *queryString, EState *estate);
static Datum build_regtype_array(Oid *param_types, int num_params);
@@ -427,20 +427,43 @@ EvaluateParams(PreparedStatement *pstmt, List *params,
/*
* Initialize query hash table upon first use.
*/
-static void
-InitQueryHashTable(void)
+static HTAB *
+InitQueryHashTable(MemoryContext mcxt)
{
- HASHCTL hash_ctl;
+ HTAB *res;
+ MemoryContext old_mcxt;
+ HASHCTL hash_ctl;
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = NAMEDATALEN;
hash_ctl.entrysize = sizeof(PreparedStatement);
+ hash_ctl.hcxt = mcxt;
+
+ old_mcxt = MemoryContextSwitchTo(mcxt);
+ res = hash_create("Prepared Queries", 32, &hash_ctl, HASH_ELEM | HASH_CONTEXT);
+ MemoryContextSwitchTo(old_mcxt);
- prepared_queries = hash_create("Prepared Queries",
- 32,
- &hash_ctl,
- HASH_ELEM);
+ return res;
+}
+
+static HTAB *
+get_prepared_queries_htab(bool init)
+{
+ static HTAB *prepared_queries = NULL;
+
+ if (ActiveSession)
+ {
+ if (init && !ActiveSession->prepared_queries)
+ ActiveSession->prepared_queries = InitQueryHashTable(ActiveSession->memory);
+ return ActiveSession->prepared_queries;
+ }
+
+ /* Initialize the global hash table, if necessary */
+ if (init && !prepared_queries)
+ prepared_queries = InitQueryHashTable(TopMemoryContext);
+
+ return prepared_queries;
}
/*
@@ -458,12 +481,9 @@ StorePreparedStatement(const char *stmt_name,
TimestampTz cur_ts = GetCurrentStatementStartTimestamp();
bool found;
- /* Initialize the hash table, if necessary */
- if (!prepared_queries)
- InitQueryHashTable();
/* Add entry to hash table */
- entry = (PreparedStatement *) hash_search(prepared_queries,
+ entry = (PreparedStatement *) hash_search(get_prepared_queries_htab(true),
stmt_name,
HASH_ENTER,
&found);
@@ -495,13 +515,14 @@ PreparedStatement *
FetchPreparedStatement(const char *stmt_name, bool throwError)
{
PreparedStatement *entry;
+ HTAB *queries = get_prepared_queries_htab(false);
/*
* If the hash table hasn't been initialized, it can't be storing
* anything, therefore it couldn't possibly store our plan.
*/
- if (prepared_queries)
- entry = (PreparedStatement *) hash_search(prepared_queries,
+ if (queries)
+ entry = (PreparedStatement *) hash_search(queries,
stmt_name,
HASH_FIND,
NULL);
@@ -579,7 +600,11 @@ DeallocateQuery(DeallocateStmt *stmt)
void
DropPreparedStatement(const char *stmt_name, bool showError)
{
- PreparedStatement *entry;
+ PreparedStatement *entry;
+ HTAB *queries = get_prepared_queries_htab(false);
+
+ if (!queries)
+ return;
/* Find the query's hash table entry; raise error if wanted */
entry = FetchPreparedStatement(stmt_name, showError);
@@ -590,7 +615,7 @@ DropPreparedStatement(const char *stmt_name, bool showError)
DropCachedPlan(entry->plansource);
/* Now we can remove the hash table entry */
- hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL);
+ hash_search(queries, entry->stmt_name, HASH_REMOVE, NULL);
}
}
@@ -602,20 +627,21 @@ DropAllPreparedStatements(void)
{
HASH_SEQ_STATUS seq;
PreparedStatement *entry;
+ HTAB *queries = get_prepared_queries_htab(false);
/* nothing cached */
- if (!prepared_queries)
+ if (!queries)
return;
/* walk over cache */
- hash_seq_init(&seq, prepared_queries);
+ hash_seq_init(&seq, queries);
while ((entry = hash_seq_search(&seq)) != NULL)
{
/* Release the plancache entry */
DropCachedPlan(entry->plansource);
/* Now we can remove the hash table entry */
- hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL);
+ hash_search(queries, entry->stmt_name, HASH_REMOVE, NULL);
}
}
@@ -710,10 +736,11 @@ Datum
pg_prepared_statement(PG_FUNCTION_ARGS)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
- TupleDesc tupdesc;
+ TupleDesc tupdesc;
Tuplestorestate *tupstore;
- MemoryContext per_query_ctx;
- MemoryContext oldcontext;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ HTAB *queries;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -757,13 +784,13 @@ pg_prepared_statement(PG_FUNCTION_ARGS)
/* generate junk in short-term context */
MemoryContextSwitchTo(oldcontext);
- /* hash table might be uninitialized */
- if (prepared_queries)
+ queries = get_prepared_queries_htab(false);
+ if (queries)
{
HASH_SEQ_STATUS hash_seq;
PreparedStatement *prep_stmt;
- hash_seq_init(&hash_seq, prepared_queries);
+ hash_seq_init(&hash_seq, queries);
while ((prep_stmt = hash_seq_search(&hash_seq)) != NULL)
{
Datum values[5];
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 89122d4..7843d9d 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -90,8 +90,6 @@ static HTAB *seqhashtab = NULL; /* hash table for SeqTable items */
* last_used_seq is updated by nextval() to point to the last used
* sequence.
*/
-static SeqTableData *last_used_seq = NULL;
-
static void fill_seq_with_data(Relation rel, HeapTuple tuple);
static Relation lock_and_open_sequence(SeqTable seq);
static void create_seq_hashtable(void);
diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index d349d7c..3afacee 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -144,6 +144,7 @@ secure_read(Port *port, void *ptr, size_t len)
{
ssize_t n;
int waitfor;
+ WaitEventSet *waitset = pq_get_current_waitset();
retry:
#ifdef USE_SSL
@@ -166,9 +167,9 @@ retry:
Assert(waitfor);
- ModifyWaitEvent(FeBeWaitSet, 0, waitfor, NULL);
+ ModifyWaitEvent(waitset, 0, waitfor, NULL);
- WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
+ WaitEventSetWait(waitset, -1 /* no timeout */ , &event, 1,
WAIT_EVENT_CLIENT_READ);
/*
@@ -247,6 +248,7 @@ secure_write(Port *port, void *ptr, size_t len)
{
ssize_t n;
int waitfor;
+ WaitEventSet *waitset = pq_get_current_waitset();
retry:
waitfor = 0;
@@ -268,9 +270,9 @@ retry:
Assert(waitfor);
- ModifyWaitEvent(FeBeWaitSet, 0, waitfor, NULL);
+ ModifyWaitEvent(waitset, 0, waitfor, NULL);
- WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
+ WaitEventSetWait(waitset, -1 /* no timeout */ , &event, 1,
WAIT_EVENT_CLIENT_WRITE);
/* See comments in secure_read. */
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..51d4f0b 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -13,7 +13,7 @@
* copy is aborted by an ereport(ERROR), we need to close out the copy so that
* the frontend gets back into sync. Therefore, these routines have to be
* aware of COPY OUT state. (New COPY-OUT is message-based and does *not*
- * set the DoingCopyOut flag.)
+ * set the is_doing_copyout flag.)
*
* NOTE: generally, it's a bad idea to emit outgoing messages directly with
* pq_putbytes(), especially if the message would require multiple calls
@@ -87,12 +87,14 @@
#ifdef _MSC_VER /* mstcpip.h is missing on mingw */
#include <mstcpip.h>
#endif
+#include <execinfo.h>
#include "common/ip.h"
#include "libpq/libpq.h"
#include "miscadmin.h"
#include "port/pg_bswap.h"
#include "storage/ipc.h"
+#include "storage/proc.h"
#include "utils/guc.h"
#include "utils/memutils.h"
@@ -134,23 +136,6 @@ static List *sock_paths = NIL;
#define PQ_SEND_BUFFER_SIZE 8192
#define PQ_RECV_BUFFER_SIZE 8192
-static char *PqSendBuffer;
-static int PqSendBufferSize; /* Size send buffer */
-static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
-static int PqSendStart; /* Next index to send a byte in PqSendBuffer */
-
-static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
-static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
-static int PqRecvLength; /* End of data available in PqRecvBuffer */
-
-/*
- * Message status
- */
-static bool PqCommBusy; /* busy sending data to the client */
-static bool PqCommReadingMsg; /* in the middle of reading a message */
-static bool DoingCopyOut; /* in old-protocol COPY OUT processing */
-
-
/* Internal functions */
static void socket_comm_reset(void);
static void socket_close(int code, Datum arg);
@@ -181,28 +166,55 @@ static PQcommMethods PqCommSocketMethods = {
socket_endcopyout
};
-PQcommMethods *PqCommMethods = &PqCommSocketMethods;
+/* These variables used to be global */
+struct PQcommState {
+ Port *port;
+ MemoryContext mcxt;
-WaitEventSet *FeBeWaitSet;
+ /* Message status */
+ bool is_busy; /* busy sending data to the client */
+ bool is_reading; /* in the middle of reading a message */
+ bool is_doing_copyout; /* in old-protocol COPY OUT processing */
+ char *send_buf;
+ int send_bufsize; /* Size send buffer */
+ int send_offset; /* Next index to store a byte in send_buf */
+ int send_start; /* Next index to send a byte in send_buf */
-/* --------------------------------
- * pq_init - initialize libpq at backend startup
- * --------------------------------
+ char recv_buf[PQ_RECV_BUFFER_SIZE];
+ int recv_offset; /* Next index to read a byte from pqstate->recv_buf */
+ int recv_len; /* End of data available in pqstate->recv_buf */
+
+ /* Wait events set */
+ WaitEventSet *wait_events;
+};
+
+static struct PQcommState *pqstate = NULL;
+PQcommMethods *PqCommMethods = &PqCommSocketMethods;
+
+/*
+ * Create common wait event for a backend
*/
-void
-pq_init(void)
+WaitEventSet *
+pq_create_backend_event_set(MemoryContext mcxt, Port *port,
+ bool onlySock)
{
- /* initialize state variables */
- PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
- PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
- PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
- PqCommBusy = false;
- PqCommReadingMsg = false;
- DoingCopyOut = false;
+ WaitEventSet *result;
+ int nevents = onlySock ? 1 : 3;
+
+ result = CreateWaitEventSet(mcxt, nevents);
+
+ AddWaitEventToSet(result, WL_SOCKET_WRITEABLE, port->sock,
+ NULL, NULL);
+
+ if (!onlySock)
+ {
+ AddWaitEventToSet(result, WL_LATCH_SET, -1, MyLatch, NULL);
+ AddWaitEventToSet(result, WL_POSTMASTER_DEATH, -1, NULL, NULL);
- /* set up process-exit hook to close the socket */
- on_proc_exit(socket_close, 0);
+ /* set up process-exit hook to close the socket */
+ on_proc_exit(socket_close, 0);
+ }
/*
* In backends (as soon as forked) we operate the underlying socket in
@@ -215,16 +227,65 @@ pq_init(void)
* infinite recursion.
*/
#ifndef WIN32
- if (!pg_set_noblock(MyProcPort->sock))
+ if (!pg_set_noblock(port->sock))
ereport(COMMERROR,
(errmsg("could not set socket to nonblocking mode: %m")));
#endif
- FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
- AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
- NULL, NULL);
- AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
- AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
+ return result;
+}
+
+/* --------------------------------
+ * pq_init - initialize libpq at backend startup
+ * --------------------------------
+ */
+void *
+pq_init(MemoryContext mcxt)
+{
+ struct PQcommState *state =
+ MemoryContextAllocZero(mcxt, sizeof(struct PQcommState));
+
+ /* initialize state variables */
+ state->mcxt = mcxt;
+
+ state->send_bufsize = PQ_SEND_BUFFER_SIZE;
+ state->send_buf = MemoryContextAlloc(mcxt, state->send_bufsize);
+ state->send_offset = state->send_start = state->recv_offset = state->recv_len = 0;
+ state->is_busy = false;
+ state->is_reading = false;
+ state->is_doing_copyout = false;
+
+ state->wait_events = NULL;
+ return (void *) state;
+}
+
+void
+pq_set_current_state(void *state, Port *port, WaitEventSet *set)
+{
+ pqstate = (struct PQcommState *) state;
+
+ if (pqstate)
+ {
+ pq_reset();
+ pqstate->port = port;
+ pqstate->wait_events = set;
+ }
+}
+
+WaitEventSet *
+pq_get_current_waitset(void)
+{
+ return pqstate ? pqstate->wait_events : NULL;
+}
+
+void
+pq_reset(void)
+{
+ pqstate->send_offset = pqstate->send_start = 0;
+ pqstate->recv_offset = pqstate->recv_len = 0;
+ pqstate->is_busy = false;
+ pqstate->is_reading = false;
+ pqstate->is_doing_copyout = false;
}
/* --------------------------------
@@ -239,7 +300,7 @@ static void
socket_comm_reset(void)
{
/* Do not throw away pending data, but do reset the busy flag */
- PqCommBusy = false;
+ pqstate->is_busy = false;
/* We can abort any old-style COPY OUT, too */
pq_endcopyout(true);
}
@@ -255,8 +316,8 @@ socket_comm_reset(void)
static void
socket_close(int code, Datum arg)
{
- /* Nothing to do in a standalone backend, where MyProcPort is NULL. */
- if (MyProcPort != NULL)
+ /* Nothing to do in a standalone backend, where pqstate->port is NULL. */
+ if (pqstate->port != NULL)
{
#if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
#ifdef ENABLE_GSS
@@ -267,11 +328,11 @@ socket_close(int code, Datum arg)
* BackendInitialize(), because pg_GSS_recvauth() makes first use of
* "ctx" and "cred".
*/
- if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
- gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
+ if (pqstate->port->gss->ctx != GSS_C_NO_CONTEXT)
+ gss_delete_sec_context(&min_s, &pqstate->port->gss->ctx, NULL);
- if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
- gss_release_cred(&min_s, &MyProcPort->gss->cred);
+ if (pqstate->port->gss->cred != GSS_C_NO_CREDENTIAL)
+ gss_release_cred(&min_s, &pqstate->port->gss->cred);
#endif /* ENABLE_GSS */
/*
@@ -279,14 +340,14 @@ socket_close(int code, Datum arg)
* postmaster child free this, doing so is safe when interrupting
* BackendInitialize().
*/
- free(MyProcPort->gss);
+ free(pqstate->port->gss);
#endif /* ENABLE_GSS || ENABLE_SSPI */
/*
* Cleanly shut down SSL layer. Nowhere else does a postmaster child
* call this, so this is safe when interrupting BackendInitialize().
*/
- secure_close(MyProcPort);
+ secure_close(pqstate->port);
/*
* Formerly we did an explicit close() here, but it seems better to
@@ -298,7 +359,7 @@ socket_close(int code, Datum arg)
* We do set sock to PGINVALID_SOCKET to prevent any further I/O,
* though.
*/
- MyProcPort->sock = PGINVALID_SOCKET;
+ pqstate->port->sock = PGINVALID_SOCKET;
}
}
@@ -921,12 +982,12 @@ RemoveSocketFiles(void)
static void
socket_set_nonblocking(bool nonblocking)
{
- if (MyProcPort == NULL)
+ if (pqstate->port == NULL)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
errmsg("there is no client connection")));
- MyProcPort->noblock = nonblocking;
+ pqstate->port->noblock = nonblocking;
}
/* --------------------------------
@@ -938,30 +999,30 @@ socket_set_nonblocking(bool nonblocking)
static int
pq_recvbuf(void)
{
- if (PqRecvPointer > 0)
+ if (pqstate->recv_offset > 0)
{
- if (PqRecvLength > PqRecvPointer)
+ if (pqstate->recv_len > pqstate->recv_offset)
{
/* still some unread data, left-justify it in the buffer */
- memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
- PqRecvLength - PqRecvPointer);
- PqRecvLength -= PqRecvPointer;
- PqRecvPointer = 0;
+ memmove(pqstate->recv_buf, pqstate->recv_buf + pqstate->recv_offset,
+ pqstate->recv_len - pqstate->recv_offset);
+ pqstate->recv_len -= pqstate->recv_offset;
+ pqstate->recv_offset = 0;
}
else
- PqRecvLength = PqRecvPointer = 0;
+ pqstate->recv_len = pqstate->recv_offset = 0;
}
/* Ensure that we're in blocking mode */
socket_set_nonblocking(false);
- /* Can fill buffer from PqRecvLength and upwards */
+ /* Can fill buffer from pqstate->recv_len and upwards */
for (;;)
{
int r;
- r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
- PQ_RECV_BUFFER_SIZE - PqRecvLength);
+ r = secure_read(pqstate->port, pqstate->recv_buf + pqstate->recv_len,
+ PQ_RECV_BUFFER_SIZE - pqstate->recv_len);
if (r < 0)
{
@@ -987,7 +1048,7 @@ pq_recvbuf(void)
return EOF;
}
/* r contains number of bytes read, so just incr length */
- PqRecvLength += r;
+ pqstate->recv_len += r;
return 0;
}
}
@@ -999,14 +1060,14 @@ pq_recvbuf(void)
int
pq_getbyte(void)
{
- Assert(PqCommReadingMsg);
+ Assert(pqstate->is_reading);
- while (PqRecvPointer >= PqRecvLength)
+ while (pqstate->recv_offset >= pqstate->recv_len)
{
if (pq_recvbuf()) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
- return (unsigned char) PqRecvBuffer[PqRecvPointer++];
+ return (unsigned char) pqstate->recv_buf[pqstate->recv_offset++];
}
/* --------------------------------
@@ -1018,14 +1079,25 @@ pq_getbyte(void)
int
pq_peekbyte(void)
{
- Assert(PqCommReadingMsg);
+ Assert(pqstate->is_reading);
- while (PqRecvPointer >= PqRecvLength)
+ while (pqstate->recv_offset >= pqstate->recv_len)
{
if (pq_recvbuf()) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
- return (unsigned char) PqRecvBuffer[PqRecvPointer];
+ return (unsigned char) pqstate->recv_buf[pqstate->recv_offset];
+}
+
+/* --------------------------------
+ * pq_available_bytes - get number of buffered bytes available for reading.
+ *
+ * --------------------------------
+ */
+int
+pq_available_bytes(void)
+{
+ return pqstate->recv_len - pqstate->recv_offset;
}
/* --------------------------------
@@ -1041,18 +1113,18 @@ pq_getbyte_if_available(unsigned char *c)
{
int r;
- Assert(PqCommReadingMsg);
+ Assert(pqstate->is_reading);
- if (PqRecvPointer < PqRecvLength)
+ if (pqstate->recv_offset < pqstate->recv_len)
{
- *c = PqRecvBuffer[PqRecvPointer++];
+ *c = pqstate->recv_buf[pqstate->recv_offset++];
return 1;
}
/* Put the socket into non-blocking mode */
socket_set_nonblocking(true);
- r = secure_read(MyProcPort, c, 1);
+ r = secure_read(pqstate->port, c, 1);
if (r < 0)
{
/*
@@ -1095,20 +1167,20 @@ pq_getbytes(char *s, size_t len)
{
size_t amount;
- Assert(PqCommReadingMsg);
+ Assert(pqstate->is_reading);
while (len > 0)
{
- while (PqRecvPointer >= PqRecvLength)
+ while (pqstate->recv_offset >= pqstate->recv_len)
{
if (pq_recvbuf()) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
- amount = PqRecvLength - PqRecvPointer;
+ amount = pqstate->recv_len - pqstate->recv_offset;
if (amount > len)
amount = len;
- memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
- PqRecvPointer += amount;
+ memcpy(s, pqstate->recv_buf + pqstate->recv_offset, amount);
+ pqstate->recv_offset += amount;
s += amount;
len -= amount;
}
@@ -1129,19 +1201,19 @@ pq_discardbytes(size_t len)
{
size_t amount;
- Assert(PqCommReadingMsg);
+ Assert(pqstate->is_reading);
while (len > 0)
{
- while (PqRecvPointer >= PqRecvLength)
+ while (pqstate->recv_offset >= pqstate->recv_len)
{
if (pq_recvbuf()) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
- amount = PqRecvLength - PqRecvPointer;
+ amount = pqstate->recv_len - pqstate->recv_offset;
if (amount > len)
amount = len;
- PqRecvPointer += amount;
+ pqstate->recv_offset += amount;
len -= amount;
}
return 0;
@@ -1167,35 +1239,35 @@ pq_getstring(StringInfo s)
{
int i;
- Assert(PqCommReadingMsg);
+ Assert(pqstate->is_reading);
resetStringInfo(s);
/* Read until we get the terminating '\0' */
for (;;)
{
- while (PqRecvPointer >= PqRecvLength)
+ while (pqstate->recv_offset >= pqstate->recv_len)
{
if (pq_recvbuf()) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
- for (i = PqRecvPointer; i < PqRecvLength; i++)
+ for (i = pqstate->recv_offset; i < pqstate->recv_len; i++)
{
- if (PqRecvBuffer[i] == '\0')
+ if (pqstate->recv_buf[i] == '\0')
{
/* include the '\0' in the copy */
- appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
- i - PqRecvPointer + 1);
- PqRecvPointer = i + 1; /* advance past \0 */
+ appendBinaryStringInfo(s, pqstate->recv_buf + pqstate->recv_offset,
+ i - pqstate->recv_offset + 1);
+ pqstate->recv_offset = i + 1; /* advance past \0 */
return 0;
}
}
/* If we're here we haven't got the \0 in the buffer yet. */
- appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
- PqRecvLength - PqRecvPointer);
- PqRecvPointer = PqRecvLength;
+ appendBinaryStringInfo(s, pqstate->recv_buf + pqstate->recv_offset,
+ pqstate->recv_len - pqstate->recv_offset);
+ pqstate->recv_offset = pqstate->recv_len;
}
}
@@ -1213,12 +1285,12 @@ pq_startmsgread(void)
* There shouldn't be a read active already, but let's check just to be
* sure.
*/
- if (PqCommReadingMsg)
+ if (pqstate->is_reading)
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("terminating connection because protocol synchronization was lost")));
- PqCommReadingMsg = true;
+ pqstate->is_reading = true;
}
@@ -1233,9 +1305,9 @@ pq_startmsgread(void)
void
pq_endmsgread(void)
{
- Assert(PqCommReadingMsg);
+ Assert(pqstate->is_reading);
- PqCommReadingMsg = false;
+ pqstate->is_reading = false;
}
/* --------------------------------
@@ -1249,7 +1321,7 @@ pq_endmsgread(void)
bool
pq_is_reading_msg(void)
{
- return PqCommReadingMsg;
+ return pqstate->is_reading;
}
/* --------------------------------
@@ -1273,7 +1345,7 @@ pq_getmessage(StringInfo s, int maxlen)
{
int32 len;
- Assert(PqCommReadingMsg);
+ Assert(pqstate->is_reading);
resetStringInfo(s);
@@ -1318,7 +1390,7 @@ pq_getmessage(StringInfo s, int maxlen)
errmsg("incomplete message from client")));
/* we discarded the rest of the message so we're back in sync. */
- PqCommReadingMsg = false;
+ pqstate->is_reading = false;
PG_RE_THROW();
}
PG_END_TRY();
@@ -1337,7 +1409,7 @@ pq_getmessage(StringInfo s, int maxlen)
}
/* finished reading the message. */
- PqCommReadingMsg = false;
+ pqstate->is_reading = false;
return 0;
}
@@ -1355,13 +1427,13 @@ pq_putbytes(const char *s, size_t len)
int res;
/* Should only be called by old-style COPY OUT */
- Assert(DoingCopyOut);
+ Assert(pqstate->is_doing_copyout);
/* No-op if reentrant call */
- if (PqCommBusy)
+ if (pqstate->is_busy)
return 0;
- PqCommBusy = true;
+ pqstate->is_busy = true;
res = internal_putbytes(s, len);
- PqCommBusy = false;
+ pqstate->is_busy = false;
return res;
}
@@ -1373,23 +1445,24 @@ internal_putbytes(const char *s, size_t len)
while (len > 0)
{
/* If buffer is full, then flush it out */
- if (PqSendPointer >= PqSendBufferSize)
+ if (pqstate->send_offset >= pqstate->send_bufsize)
{
socket_set_nonblocking(false);
if (internal_flush())
return EOF;
}
- amount = PqSendBufferSize - PqSendPointer;
+ amount = pqstate->send_bufsize - pqstate->send_offset;
if (amount > len)
amount = len;
- memcpy(PqSendBuffer + PqSendPointer, s, amount);
- PqSendPointer += amount;
+ memcpy(pqstate->send_buf + pqstate->send_offset, s, amount);
+ pqstate->send_offset += amount;
s += amount;
len -= amount;
}
return 0;
}
+
/* --------------------------------
* socket_flush - flush pending output
*
@@ -1401,13 +1474,17 @@ socket_flush(void)
{
int res;
+ if (pqstate->port->sock == PGINVALID_SOCKET)
+ return 0;
+
/* No-op if reentrant call */
- if (PqCommBusy)
+ if (pqstate->is_busy)
return 0;
- PqCommBusy = true;
+
+ pqstate->is_busy = true;
socket_set_nonblocking(false);
res = internal_flush();
- PqCommBusy = false;
+ pqstate->is_busy = false;
return res;
}
@@ -1423,14 +1500,14 @@ internal_flush(void)
{
static int last_reported_send_errno = 0;
- char *bufptr = PqSendBuffer + PqSendStart;
- char *bufend = PqSendBuffer + PqSendPointer;
+ char *bufptr = pqstate->send_buf + pqstate->send_start;
+ char *bufend = pqstate->send_buf + pqstate->send_offset;
while (bufptr < bufend)
{
int r;
- r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+ r = secure_write(pqstate->port, bufptr, bufend - bufptr);
if (r <= 0)
{
@@ -1470,7 +1547,7 @@ internal_flush(void)
* flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
* the connection.
*/
- PqSendStart = PqSendPointer = 0;
+ pqstate->send_start = pqstate->send_offset = 0;
ClientConnectionLost = 1;
InterruptPending = 1;
return EOF;
@@ -1478,10 +1555,10 @@ internal_flush(void)
last_reported_send_errno = 0; /* reset after any successful send */
bufptr += r;
- PqSendStart += r;
+ pqstate->send_start += r;
}
- PqSendStart = PqSendPointer = 0;
+ pqstate->send_start = pqstate->send_offset = 0;
return 0;
}
@@ -1496,20 +1573,23 @@ socket_flush_if_writable(void)
{
int res;
+ if (pqstate->port->sock == PGINVALID_SOCKET)
+ return 0;
+
/* Quick exit if nothing to do */
- if (PqSendPointer == PqSendStart)
+ if (pqstate->send_offset == pqstate->send_start)
return 0;
/* No-op if reentrant call */
- if (PqCommBusy)
+ if (pqstate->is_busy)
return 0;
/* Temporarily put the socket into non-blocking mode */
socket_set_nonblocking(true);
- PqCommBusy = true;
+ pqstate->is_busy = true;
res = internal_flush();
- PqCommBusy = false;
+ pqstate->is_busy = false;
return res;
}
@@ -1520,7 +1600,7 @@ socket_flush_if_writable(void)
static bool
socket_is_send_pending(void)
{
- return (PqSendStart < PqSendPointer);
+ return (pqstate->send_start < pqstate->send_offset);
}
/* --------------------------------
@@ -1559,9 +1639,9 @@ socket_is_send_pending(void)
static int
socket_putmessage(char msgtype, const char *s, size_t len)
{
- if (DoingCopyOut || PqCommBusy)
+ if (pqstate->is_doing_copyout || pqstate->is_busy)
return 0;
- PqCommBusy = true;
+ pqstate->is_busy = true;
if (msgtype)
if (internal_putbytes(&msgtype, 1))
goto fail;
@@ -1575,11 +1655,11 @@ socket_putmessage(char msgtype, const char *s, size_t len)
}
if (internal_putbytes(s, len))
goto fail;
- PqCommBusy = false;
+ pqstate->is_busy = false;
return 0;
fail:
- PqCommBusy = false;
+ pqstate->is_busy = false;
return EOF;
}
@@ -1599,11 +1679,11 @@ socket_putmessage_noblock(char msgtype, const char *s, size_t len)
* Ensure we have enough space in the output buffer for the message header
* as well as the message itself.
*/
- required = PqSendPointer + 1 + 4 + len;
- if (required > PqSendBufferSize)
+ required = pqstate->send_offset + 1 + 4 + len;
+ if (required > pqstate->send_bufsize)
{
- PqSendBuffer = repalloc(PqSendBuffer, required);
- PqSendBufferSize = required;
+ pqstate->send_buf = repalloc(pqstate->send_buf, required);
+ pqstate->send_bufsize = required;
}
res = pq_putmessage(msgtype, s, len);
Assert(res == 0); /* should not fail when the message fits in
@@ -1619,7 +1699,7 @@ socket_putmessage_noblock(char msgtype, const char *s, size_t len)
static void
socket_startcopyout(void)
{
- DoingCopyOut = true;
+ pqstate->is_doing_copyout = true;
}
/* --------------------------------
@@ -1635,12 +1715,12 @@ socket_startcopyout(void)
static void
socket_endcopyout(bool errorAbort)
{
- if (!DoingCopyOut)
+ if (!pqstate->is_doing_copyout)
return;
if (errorAbort)
pq_putbytes("\n\n\\.\n", 5);
/* in non-error case, copy.c will have emitted the terminator line */
- DoingCopyOut = false;
+ pqstate->is_doing_copyout = false;
}
/*
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index aba1e92..56ec998 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o send_sock.o $(TAS)
ifeq ($(PORTNAME), win32)
SUBDIRS += win32
diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c
new file mode 100644
index 0000000..b69cc78
--- /dev/null
+++ b/src/backend/port/send_sock.c
@@ -0,0 +1,158 @@
+/*-------------------------------------------------------------------------
+ *
+ * send_sock.c
+ * Send socket descriptor to another process
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/port/send_sock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+#ifdef WIN32
+typedef struct
+{
+ SOCKET origsocket;
+ WSAPROTOCOL_INFO wsainfo;
+} InheritableSocket;
+#endif
+
+/*
+ * Send socket descriptor "sock" to backend process through Unix socket "chan"
+ */
+int
+pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid)
+{
+#ifdef WIN32
+ InheritableSocket dst;
+ size_t rc;
+ dst.origsocket = sock;
+ if (WSADuplicateSocket(sock, pid, &dst.wsainfo) != 0)
+ {
+ ereport(FATAL,
+ (errmsg("could not duplicate socket %d for use in backend: error code %d",
+ (int)sock, WSAGetLastError())));
+ return -1;
+ }
+ rc = send(chan, &dst, sizeof(dst), 0);
+ if (rc != sizeof(dst))
+ {
+ ereport(FATAL,
+ (errmsg("Failed to send inheritable socket: rc=%d, error code %d",
+ (int)rc, WSAGetLastError())));
+ return -1;
+ }
+ return 0;
+#else
+ struct msghdr msg = { 0 };
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ char buf[CMSG_SPACE(sizeof(sock))];
+ memset(buf, '\0', sizeof(buf));
+
+ /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */
+ io.iov_base = "";
+ io.iov_len = 1;
+
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ if (!cmsg)
+ return PGINVALID_SOCKET;
+
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(sock));
+
+ memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock));
+ msg.msg_controllen = cmsg->cmsg_len;
+
+ if (sendmsg(chan, &msg, 0) < 0)
+ return PGINVALID_SOCKET;
+
+ return 0;
+#endif
+}
+
+
+/*
+ * Receive socket descriptor from postmaster process through Unix socket "chan"
+ */
+pgsocket
+pg_recv_sock(pgsocket chan)
+{
+#ifdef WIN32
+ InheritableSocket src;
+ SOCKET s;
+ size_t rc = recv(chan, &src, sizeof(src), 0);
+ if (rc != sizeof(src))
+ {
+ ereport(FATAL,
+ (errmsg("Failed to receive inheritable socket: rc=%d, error code %d",
+ (int)rc, WSAGetLastError())));
+ }
+ s = WSASocket(FROM_PROTOCOL_INFO,
+ FROM_PROTOCOL_INFO,
+ FROM_PROTOCOL_INFO,
+ &src.wsainfo,
+ 0,
+ 0);
+ if (s == INVALID_SOCKET)
+ {
+ ereport(FATAL,
+ (errmsg("could not create inherited socket: error code %d\n",
+ WSAGetLastError())));
+ }
+
+ /*
+ * To make sure we don't get two references to the same socket, close
+ * the original one. (This would happen when inheritance actually
+ * works..
+ */
+ closesocket(src.origsocket);
+ return s;
+#else
+ struct msghdr msg = {0};
+ char c_buffer[256];
+ char m_buffer[256];
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ pgsocket sock;
+
+ io.iov_base = m_buffer;
+ io.iov_len = sizeof(m_buffer);
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+
+ msg.msg_control = c_buffer;
+ msg.msg_controllen = sizeof(c_buffer);
+
+ if (recvmsg(chan, &msg, 0) < 0)
+ return PGINVALID_SOCKET;
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ if (!cmsg)
+ return PGINVALID_SOCKET;
+
+ memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock));
+
+ pg_set_noblock(sock);
+
+ return sock;
+#endif
+}
diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c
index f4356fe..7fd901f 100644
--- a/src/backend/port/win32/socket.c
+++ b/src/backend/port/win32/socket.c
@@ -726,3 +726,65 @@ pgwin32_socket_strerror(int err)
}
return wserrbuf;
}
+
+int pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2])
+{
+ union {
+ struct sockaddr_in inaddr;
+ struct sockaddr addr;
+ } a;
+ SOCKET listener;
+ int e;
+ socklen_t addrlen = sizeof(a.inaddr);
+ DWORD flags = 0;
+ int reuse = 1;
+
+ socks[0] = socks[1] = -1;
+
+ listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (listener == -1)
+ return SOCKET_ERROR;
+
+ memset(&a, 0, sizeof(a));
+ a.inaddr.sin_family = AF_INET;
+ a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ a.inaddr.sin_port = 0;
+
+ for (;;) {
+ if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,
+ (char*) &reuse, (socklen_t) sizeof(reuse)) == -1)
+ break;
+ if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
+ break;
+
+ memset(&a, 0, sizeof(a));
+ if (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR)
+ break;
+ a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ a.inaddr.sin_family = AF_INET;
+
+ if (listen(listener, 1) == SOCKET_ERROR)
+ break;
+
+ socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, flags);
+ if (socks[0] == -1)
+ break;
+ if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
+ break;
+
+ socks[1] = accept(listener, NULL, NULL);
+ if (socks[1] == -1)
+ break;
+
+ closesocket(listener);
+ return 0;
+ }
+
+ e = WSAGetLastError();
+ closesocket(listener);
+ closesocket(socks[0]);
+ closesocket(socks[1]);
+ WSASetLastError(e);
+ socks[0] = socks[1] = -1;
+ return SOCKET_ERROR;
+}
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 71c2321..b0bd173 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -13,6 +13,7 @@ top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \
- pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o
+ pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o \
+ connpool.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index d2b695e..15b9eb5 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -21,6 +21,7 @@
#include "port/atomics.h"
#include "postmaster/bgworker_internals.h"
#include "postmaster/postmaster.h"
+#include "postmaster/connpool.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "storage/dsm.h"
@@ -129,7 +130,10 @@ static const struct
},
{
"ApplyWorkerMain", ApplyWorkerMain
- }
+ },
+ {
+ "StartupPacketReaderMain", StartupPacketReaderMain
+ }
};
/* Private functions. */
diff --git a/src/backend/postmaster/connpool.c b/src/backend/postmaster/connpool.c
new file mode 100644
index 0000000..e2d041a
--- /dev/null
+++ b/src/backend/postmaster/connpool.c
@@ -0,0 +1,269 @@
+/*-------------------------------------------------------------------------
+ * connpool.c
+ * PostgreSQL connection pool workers.
+ *
+ * Copyright (c) 2018, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/postmaster/connpool.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <signal.h>
+#include <unistd.h>
+
+#include "lib/stringinfo.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/connpool.h"
+#include "postmaster/postmaster.h"
+#include "storage/proc.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "tcop/tcopprot.h"
+
+/*
+ * GUC parameters
+ */
+int NumConnPoolWorkers = 2;
+
+/*
+ * Global variables
+ */
+ConnPoolWorker *ConnPoolWorkers;
+
+/*
+ * Signals management
+ */
+static volatile sig_atomic_t shutdown_requested = false;
+static void handle_sigterm(SIGNAL_ARGS);
+
+static void *pqstate;
+
+static void
+handle_sigterm(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+ shutdown_requested = true;
+ SetLatch(&MyProc->procLatch);
+ errno = save_errno;
+}
+
+Size
+ConnPoolShmemSize(void)
+{
+ return MAXALIGN(sizeof(ConnPoolWorker) * NumConnPoolWorkers);
+}
+
+void
+ConnectionPoolWorkersInit(void)
+{
+ int i;
+ bool found;
+ Size size = ConnPoolShmemSize();
+
+ ConnPoolWorkers = ShmemInitStruct("connection pool workers",
+ size, &found);
+
+ if (!found)
+ {
+ MemSet(ConnPoolWorkers, 0, size);
+ for (i = 0; i < NumConnPoolWorkers; i++)
+ {
+ ConnPoolWorker *worker = &ConnPoolWorkers[i];
+ if (socketpair(AF_UNIX, SOCK_STREAM, 0, worker->pipes) < 0)
+ elog(FATAL, "could not create socket pair for connection pool");
+ }
+ }
+}
+
+/*
+ * Register background workers for startup packet reading.
+ */
+void
+RegisterConnPoolWorkers(void)
+{
+ int i;
+ BackgroundWorker bgw;
+
+ if (SessionPoolSize == 0)
+ /* no need to start workers */
+ return;
+
+ for (i = 0; i < NumConnPoolWorkers; i++)
+ {
+ memset(&bgw, 0, sizeof(bgw));
+ bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
+ bgw.bgw_start_time = BgWorkerStart_PostmasterStart;
+ snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "StartupPacketReaderMain");
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "connection pool worker %d", i + 1);
+ bgw.bgw_restart_time = 3;
+ bgw.bgw_notify_pid = 0;
+ bgw.bgw_main_arg = (Datum) i;
+
+ RegisterBackgroundWorker(&bgw);
+ }
+
+ elog(LOG, "Connection pool have been started");
+}
+
+static void
+resetWorkerState(ConnPoolWorker *worker, Port *port)
+{
+ /* Cleanup */
+ whereToSendOutput = DestNone;
+ if (port != NULL)
+ {
+ if (port->sock != PGINVALID_SOCKET)
+ closesocket(port->sock);
+ if (port->pqcomm_waitset != NULL)
+ FreeWaitEventSet(port->pqcomm_waitset);
+ port = NULL;
+ }
+ pq_set_current_state(pqstate, NULL, NULL);
+}
+
+void
+StartupPacketReaderMain(Datum arg)
+{
+ sigjmp_buf local_sigjmp_buf;
+ ConnPoolWorker *worker = &ConnPoolWorkers[(int) arg];
+ MemoryContext mcxt;
+ int status;
+ Port *port = NULL;
+
+ pqsignal(SIGTERM, handle_sigterm);
+ BackgroundWorkerUnblockSignals();
+
+ mcxt = AllocSetContextCreate(TopMemoryContext,
+ "temporary context",
+ ALLOCSET_DEFAULT_SIZES);
+ pqstate = pq_init(TopMemoryContext);
+ worker->pid = MyProcPid;
+ worker->latch = MyLatch;
+ Assert(MyLatch == &MyProc->procLatch);
+
+ MemoryContextSwitchTo(mcxt);
+
+ /* In an exception is encountered, processing resumes here */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* Since not using PG_TRY, must reset error stack by hand */
+ error_context_stack = NULL;
+
+ /* Prevent interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log and to the client */
+ EmitErrorReport();
+
+ /*
+ * Now return to normal top-level context and clear ErrorContext for
+ * next time.
+ */
+ MemoryContextSwitchTo(mcxt);
+ FlushErrorState();
+
+ /*
+ * We only reset worker state here, but memory will be cleaned
+ * after next cycle. That's enough for now.
+ */
+ resetWorkerState(worker, port);
+
+ /* Ready for new sockets */
+ worker->state = CPW_FREE;
+
+ /* Now we can allow interrupts again */
+ RESUME_INTERRUPTS();
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ while (!shutdown_requested)
+ {
+ ListCell *lc;
+ int rc;
+ StringInfoData buf;
+
+ rc = WaitLatch(&MyProc->procLatch,
+ WL_LATCH_SET | WL_POSTMASTER_DEATH,
+ 0, PG_WAIT_EXTENSION);
+
+ if (rc & WL_POSTMASTER_DEATH)
+ break;
+
+ ResetLatch(&MyProc->procLatch);
+
+ if (shutdown_requested)
+ break;
+
+ if (worker->state != CPW_NEW_SOCKET)
+ /* we woke up for other reason */
+ continue;
+
+ /* Set up temporary pq state for startup packet */
+ port = palloc0(sizeof(Port));
+ port->sock = PGINVALID_SOCKET;
+
+ while (port->sock == PGINVALID_SOCKET)
+ port->sock = pg_recv_sock(worker->pipes[1]);
+
+ /* init pqcomm */
+ port->pqcomm_waitset = pq_create_backend_event_set(mcxt, port, true);
+ port->canAcceptConnections = worker->cac_state;
+ pq_set_current_state(pqstate, port, port->pqcomm_waitset);
+ whereToSendOutput = DestRemote;
+
+ /* TODO: deal with timeouts */
+ status = ProcessStartupPacket(port, false, mcxt, ERROR);
+ if (status != STATUS_OK)
+ {
+ worker->state = CPW_FREE;
+ goto cleanup;
+ }
+
+ /* Serialize a port into stringinfo */
+ pq_beginmessage(&buf, 'P');
+ pq_sendint(&buf, port->proto, 4);
+ pq_sendstring(&buf, port->database_name);
+ pq_sendstring(&buf, port->user_name);
+ pq_sendint(&buf, list_length(port->guc_options), 4);
+
+ foreach(lc, port->guc_options)
+ {
+ char *str = (char *) lfirst(lc);
+ pq_sendstring(&buf, str);
+ }
+
+ if (port->cmdline_options)
+ {
+ pq_sendint(&buf, 1, 4);
+ pq_sendstring(&buf, port->cmdline_options);
+ }
+ else pq_sendint(&buf, 0, 4);
+
+ worker->state = CPW_PROCESSED;
+
+ while ((rc = send(worker->pipes[1], &buf.len, sizeof(buf.len), 0)) < 0 && errno == EINTR);
+ if (rc != (int)sizeof(buf.len))
+ elog(ERROR, "could not send data to postmaster");
+ while ((rc = send(worker->pipes[1], buf.data, buf.len, 0)) < 0 && errno == EINTR);
+ if (rc != buf.len)
+ elog(ERROR, "could not send data to postmaster");
+ pfree(buf.data);
+ buf.data = NULL;
+ cleanup:
+ resetWorkerState(worker, port);
+ MemoryContextReset(mcxt);
+ }
+
+ resetWorkerState(worker, NULL);
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 8a5b2b3..8bdc988 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -868,7 +868,8 @@ pgstat_report_stat(bool force)
PgStat_TableEntry *this_ent;
/* Shouldn't have any pending transaction-dependent counts */
- Assert(entry->trans == NULL);
+ if (entry->trans != NULL)
+ continue;
/*
* Ignore entries that didn't accumulate any actual counts, such
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index a4b53b3..56fef63 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -76,6 +76,7 @@
#include <sys/param.h>
#include <netdb.h>
#include <limits.h>
+#include <pthread.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
@@ -114,6 +115,7 @@
#include "postmaster/pgarch.h"
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
+#include "postmaster/connpool.h"
#include "replication/logicallauncher.h"
#include "replication/walsender.h"
#include "storage/fd.h"
@@ -170,6 +172,7 @@ typedef struct bkend
pid_t pid; /* process id of backend */
int32 cancel_key; /* cancel key for cancels for this backend */
int child_slot; /* PMChildSlot for this backend, if any */
+ pgsocket session_send_sock; /* Write end of socket pipe to this backend used to send session socket descriptor to the backend process */
/*
* Flavor of backend or auxiliary process. Note that BACKEND_TYPE_WALSND
@@ -178,8 +181,11 @@ typedef struct bkend
*/
int bkend_type;
bool dead_end; /* is it going to send an error and quit? */
- bool bgworker_notify; /* gets bgworker start/stop notifications */
+ bool bgworker_notify;/* gets bgworker start/stop notifications */
dlist_node elem; /* list link in BackendList */
+ int session_pool_id;/* identifier of backends session pool */
+ int worker_id; /* identifier of worker within session pool */
+ void *pool; /* pool of backends */
} Backend;
static dlist_head BackendList = DLIST_STATIC_INIT(BackendList);
@@ -190,7 +196,27 @@ static Backend *ShmemBackendArray;
BackgroundWorker *MyBgworkerEntry = NULL;
+struct DatabasePoolKey {
+ char database[NAMEDATALEN];
+ char username[NAMEDATALEN];
+};
+
+typedef struct DatabasePool
+{
+ struct DatabasePoolKey key;
+ Backend **workers; /* pool backends */
+ int n_workers; /* number of launched worker backends
+ in this pool so far */
+ int rr_index; /* index of current backends used to implement
+ * round-robin distribution of sessions through
+ * backends */
+} DatabasePool;
+
+static struct
+{
+ HTAB *pools;
+} PostmasterSessionPool;
/* The socket number we are listening for connections on */
int PostPortNumber;
@@ -214,7 +240,7 @@ int ReservedBackends;
/* The socket(s) we're listening to. */
#define MAXLISTEN 64
-static pgsocket ListenSocket[MAXLISTEN];
+static pgsocket ListenSocket[MAXLISTEN + MAX_CONNPOOL_WORKERS];
/*
* Set by the -o option
@@ -393,15 +419,19 @@ static void unlink_external_pid_file(int status, Datum arg);
static void getInstallationPaths(const char *argv0);
static void checkControlFile(void);
static Port *ConnCreate(int serverFd);
+static Port *PoolConnCreate(pgsocket poolFd, int workerId);
static void ConnFree(Port *port);
+static void ConnDispatch(Port *port);
static void reset_shared(int port);
static void SIGHUP_handler(SIGNAL_ARGS);
+static CAC_state canAcceptConnections(void);
static void pmdie(SIGNAL_ARGS);
static void reaper(SIGNAL_ARGS);
static void sigusr1_handler(SIGNAL_ARGS);
static void startup_die(SIGNAL_ARGS);
static void dummy_handler(SIGNAL_ARGS);
static void StartupPacketTimeoutHandler(void);
+static int BackendStartup(DatabasePool *pool, Port *port);
static void CleanupBackend(int pid, int exitstatus);
static bool CleanupBackgroundWorker(int pid, int exitstatus);
static void HandleChildCrash(int pid, int exitstatus, const char *procname);
@@ -412,13 +442,10 @@ static void BackendInitialize(Port *port);
static void BackendRun(Port *port) pg_attribute_noreturn();
static void ExitPostmaster(int status) pg_attribute_noreturn();
static int ServerLoop(void);
-static int BackendStartup(Port *port);
-static int ProcessStartupPacket(Port *port, bool SSLdone);
static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options);
static void processCancelRequest(Port *port, void *pkt);
static int initMasks(fd_set *rmask);
static void report_fork_failure_to_client(Port *port, int errnum);
-static CAC_state canAcceptConnections(void);
static bool RandomCancelKey(int32 *cancel_key);
static void signal_child(pid_t pid, int signal);
static bool SignalSomeChildren(int signal, int targets);
@@ -486,6 +513,7 @@ typedef struct
{
Port port;
InheritableSocket portsocket;
+ InheritableSocket sessionsocket;
char DataDir[MAXPGPATH];
pgsocket ListenSocket[MAXLISTEN];
int32 MyCancelKey;
@@ -988,6 +1016,11 @@ PostmasterMain(int argc, char *argv[])
ApplyLauncherRegister();
/*
+ * Register connnection pool workers
+ */
+ RegisterConnPoolWorkers();
+
+ /*
* process any libraries that should be preloaded at postmaster start
*/
process_shared_preload_libraries();
@@ -1613,6 +1646,177 @@ DetermineSleepTime(struct timeval *timeout)
}
}
+static bool
+IsDedicatedDatabase(char const* dbname)
+{
+ List *namelist;
+ ListCell *l;
+ char *databases;
+ bool found = false;
+
+ /* Need a modifiable copy of namespace_search_path string */
+ databases = pstrdup(DedicatedDatabases);
+
+ if (!SplitIdentifierString(databases, ',', &namelist)) {
+ elog(ERROR, "invalid list syntax");
+ }
+ foreach(l, namelist)
+ {
+ char *curname = (char *) lfirst(l);
+ if (strcmp(curname, dbname) == 0)
+ {
+ found = true;
+ break;
+ }
+ }
+ list_free(namelist);
+ pfree(databases);
+
+ return found;
+}
+
+/*
+ * Find free worker and send socket
+ */
+static void
+SendPortToConnectionPool(Port *port)
+{
+ int i;
+ bool sent;
+
+ /* By default is not dedicated */
+ IsDedicatedBackend = false;
+
+ sent = false;
+
+again:
+ for (i = 0; i < NumConnPoolWorkers; i++)
+ {
+ ConnPoolWorker *worker = &ConnPoolWorkers[i];
+ if (worker->pid == 0)
+ continue;
+
+ if (worker->state == CPW_PROCESSED)
+ {
+ Port *conn = PoolConnCreate(worker->pipes[0], i);
+ if (conn)
+ ConnDispatch(conn);
+ }
+ if (worker->state == CPW_FREE)
+ {
+ worker->port = port;
+ worker->state = CPW_NEW_SOCKET;
+ worker->cac_state = canAcceptConnections();
+
+ if (pg_send_sock(worker->pipes[0], port->sock, worker->pid) < 0)
+ {
+ elog(LOG, "could not send socket to connection pool: %m");
+ ExitPostmaster(1);
+ }
+ SetLatch(worker->latch);
+ sent = true;
+ break;
+ }
+ }
+
+ if (!sent)
+ {
+ pg_usleep(1000L);
+ goto again;
+ }
+}
+
+static void
+ConnDispatch(Port *port)
+{
+ bool found;
+ DatabasePool *pool;
+ struct DatabasePoolKey key;
+
+ Assert(port->sock != PGINVALID_SOCKET);
+ if (IsDedicatedDatabase(port->database_name))
+ {
+ IsDedicatedBackend = true;
+ BackendStartup(NULL, port);
+ goto cleanup;
+ }
+
+#ifdef USE_SSL
+ if (port->ssl_in_use)
+ {
+ /*
+ * We don't (yet) support SSL connections with connection pool,
+ * since we need to move whole SSL context to already working
+ * backend. This task needs more investigation.
+ */
+ elog(ERROR, "connection pool does not support SSL connections");
+ goto cleanup;
+ }
+#endif
+ MemSet(key.database, 0, NAMEDATALEN);
+ MemSet(key.username, 0, NAMEDATALEN);
+
+ strlcpy(key.database, port->database_name, NAMEDATALEN);
+ strlcpy(key.username, port->user_name, NAMEDATALEN);
+
+ pool = hash_search(PostmasterSessionPool.pools, &key, HASH_ENTER, &found);
+ if (!found)
+ {
+ pool->key = key;
+ pool->workers = NULL;
+ pool->n_workers = 0;
+ pool->rr_index = 0;
+ }
+
+ BackendStartup(pool, port);
+
+cleanup:
+ /*
+ * We no longer need the open socket or port structure
+ * in this process
+ */
+ StreamClose(port->sock);
+ ConnFree(port);
+}
+
+/*
+ * Init wait event set for connection pool workers,
+ * and hash table for backends in pool.
+ */
+static int
+InitConnPoolState(fd_set *rmask, int numSockets)
+{
+ int i;
+ HASHCTL ctl;
+
+ /*
+ * create hashtable that indexes the relcache
+ */
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(struct DatabasePoolKey);
+ ctl.entrysize = sizeof(DatabasePool);
+ ctl.hcxt = PostmasterContext;
+ PostmasterSessionPool.pools = hash_create("Pool by database and user", 100,
+ &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+ for (i = 0; i < NumConnPoolWorkers; i++)
+ {
+ ConnPoolWorker *worker = &ConnPoolWorkers[i];
+ worker->port = NULL;
+
+ /*
+ * we use same pselect(3) call for connection pool workers and
+ * clients
+ */
+ ListenSocket[MAXLISTEN + i] = worker->pipes[0];
+ FD_SET(worker->pipes[0], rmask);
+ if (worker->pipes[0] > numSockets)
+ numSockets = worker->pipes[0];
+ }
+
+ return numSockets + 1;
+}
+
/*
* Main idle loop of postmaster
*
@@ -1630,6 +1834,9 @@ ServerLoop(void)
nSockets = initMasks(&readmask);
+ if (SessionPoolSize > 0)
+ nSockets = InitConnPoolState(&readmask, nSockets);
+
for (;;)
{
fd_set rmask;
@@ -1690,27 +1897,43 @@ ServerLoop(void)
*/
if (selres > 0)
{
+ Port *port;
int i;
+ /* Check for client connections */
for (i = 0; i < MAXLISTEN; i++)
{
if (ListenSocket[i] == PGINVALID_SOCKET)
break;
if (FD_ISSET(ListenSocket[i], &rmask))
{
- Port *port;
-
port = ConnCreate(ListenSocket[i]);
if (port)
{
- BackendStartup(port);
-
- /*
- * We no longer need the open socket or port structure
- * in this process
- */
- StreamClose(port->sock);
- ConnFree(port);
+ if (SessionPoolSize == 0)
+ {
+ IsDedicatedBackend = true;
+ BackendStartup(NULL, port);
+ StreamClose(port->sock);
+ ConnFree(port);
+ }
+ else
+ SendPortToConnectionPool(port);
+ }
+ }
+ }
+
+ /* Check for some data from connections pool */
+ if (SessionPoolSize > 0)
+ {
+ for (i = 0; i < NumConnPoolWorkers; i++)
+ {
+ if (FD_ISSET(ListenSocket[MAXLISTEN + i], &rmask))
+ {
+ port = PoolConnCreate(ListenSocket[MAXLISTEN + i], i);
+ if (port)
+ ConnDispatch(port);
+
}
}
}
@@ -1893,13 +2116,15 @@ initMasks(fd_set *rmask)
* send anything to the client, which would typically be appropriate
* if we detect a communications failure.)
*/
-static int
-ProcessStartupPacket(Port *port, bool SSLdone)
+int
+ProcessStartupPacket(Port *port, bool SSLdone, MemoryContext memctx,
+ int errlevel)
{
int32 len;
void *buf;
ProtocolVersion proto;
- MemoryContext oldcontext;
+ MemoryContext oldcontext = MemoryContextSwitchTo(memctx);
+ int result;
pq_startmsgread();
if (pq_getbytes((char *) &len, 4) == EOF)
@@ -1992,7 +2217,7 @@ retry1:
#endif
/* regular startup packet, cancel, etc packet should follow... */
/* but not another SSL negotiation request */
- return ProcessStartupPacket(port, true);
+ return ProcessStartupPacket(port, true, memctx, errlevel);
}
/* Could add additional special packet types here */
@@ -2006,13 +2231,16 @@ retry1:
/* Check that the major protocol version is in range. */
if (PG_PROTOCOL_MAJOR(proto) < PG_PROTOCOL_MAJOR(PG_PROTOCOL_EARLIEST) ||
PG_PROTOCOL_MAJOR(proto) > PG_PROTOCOL_MAJOR(PG_PROTOCOL_LATEST))
- ereport(FATAL,
+ {
+ ereport(errlevel,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported frontend protocol %u.%u: server supports %u.0 to %u.%u",
PG_PROTOCOL_MAJOR(proto), PG_PROTOCOL_MINOR(proto),
PG_PROTOCOL_MAJOR(PG_PROTOCOL_EARLIEST),
PG_PROTOCOL_MAJOR(PG_PROTOCOL_LATEST),
PG_PROTOCOL_MINOR(PG_PROTOCOL_LATEST))));
+ return STATUS_ERROR;
+ }
/*
* Now fetch parameters out of startup packet and save them into the Port
@@ -2022,7 +2250,7 @@ retry1:
* not worry about leaking this storage on failure, since we aren't in the
* postmaster process anymore.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ oldcontext = MemoryContextSwitchTo(memctx);
if (PG_PROTOCOL_MAJOR(proto) >= 3)
{
@@ -2070,12 +2298,15 @@ retry1:
am_db_walsender = true;
}
else if (!parse_bool(valptr, &am_walsender))
- ereport(FATAL,
+ {
+ ereport(errlevel,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for parameter \"%s\": \"%s\"",
"replication",
valptr),
errhint("Valid values are: \"false\", 0, \"true\", 1, \"database\".")));
+ return STATUS_ERROR;
+ }
}
else if (strncmp(nameptr, "_pq_.", 5) == 0)
{
@@ -2103,9 +2334,12 @@ retry1:
* given packet length, complain.
*/
if (offset != len - 1)
- ereport(FATAL,
+ {
+ ereport(errlevel,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid startup packet layout: expected terminator as last byte")));
+ return STATUS_ERROR;
+ }
/*
* If the client requested a newer protocol version or if the client
@@ -2141,9 +2375,12 @@ retry1:
/* Check a user name was given. */
if (port->user_name == NULL || port->user_name[0] == '\0')
- ereport(FATAL,
+ {
+ ereport(errlevel,
(errcode(ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION),
errmsg("no PostgreSQL user name specified in startup packet")));
+ return STATUS_ERROR;
+ }
/* The database defaults to the user name. */
if (port->database_name == NULL || port->database_name[0] == '\0')
@@ -2197,27 +2434,32 @@ retry1:
* now instead of wasting cycles on an authentication exchange. (This also
* allows a pg_ping utility to be written.)
*/
+ result = STATUS_OK;
switch (port->canAcceptConnections)
{
case CAC_STARTUP:
- ereport(FATAL,
+ ereport(errlevel,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("the database system is starting up")));
+ result = STATUS_ERROR;
break;
case CAC_SHUTDOWN:
- ereport(FATAL,
+ ereport(errlevel,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("the database system is shutting down")));
+ result = STATUS_ERROR;
break;
case CAC_RECOVERY:
- ereport(FATAL,
+ ereport(errlevel,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("the database system is in recovery mode")));
+ result = STATUS_ERROR;
break;
case CAC_TOOMANY:
- ereport(FATAL,
+ ereport(errlevel,
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("sorry, too many clients already")));
+ result = STATUS_ERROR;
break;
case CAC_WAITBACKUP:
/* OK for now, will check in InitPostgres */
@@ -2226,7 +2468,7 @@ retry1:
break;
}
- return STATUS_OK;
+ return result;
}
/*
@@ -2322,7 +2564,7 @@ processCancelRequest(Port *port, void *pkt)
/*
* canAcceptConnections --- check to see if database state allows connections.
*/
-static CAC_state
+CAC_state
canAcceptConnections(void)
{
CAC_state result = CAC_OK;
@@ -2398,7 +2640,7 @@ ConnCreate(int serverFd)
ConnFree(port);
return NULL;
}
-
+ SessionPoolSock = PGINVALID_SOCKET;
/*
* Allocate GSSAPI specific state struct
*/
@@ -2418,6 +2660,66 @@ ConnCreate(int serverFd)
return port;
}
+#define CONN_BUF_SIZE 8192
+
+static Port *
+PoolConnCreate(pgsocket poolFd, int workerId)
+{
+ char recv_buf[CONN_BUF_SIZE];
+ int recv_len = 0,
+ i,
+ rc,
+ offs,
+ len;
+ StringInfoData buf;
+ ConnPoolWorker *worker = &ConnPoolWorkers[workerId];
+ Port *port = worker->port;
+
+ if (worker->state != CPW_PROCESSED)
+ return NULL;
+
+ /* In any case we should free the worker */
+ worker->port = NULL;
+ worker->state = CPW_FREE;
+
+ while ((rc = read(poolFd, &recv_len, sizeof recv_len)) < 0 && errno == EINTR);
+ if (rc != (int)sizeof(recv_len))
+ {
+ io_error:
+ StreamClose(port->sock);
+ ConnFree(port);
+ return NULL;
+ }
+
+ for (offs = 0; offs < recv_len; offs += rc)
+ {
+ while ((rc = read(poolFd, recv_buf + offs, CONN_BUF_SIZE - offs)) < 0 && errno == EINTR);
+ if (rc <= 0)
+ goto io_error;
+ }
+
+ buf.cursor = 0;
+ buf.data = recv_buf;
+ buf.len = recv_len;
+
+ port->proto = pq_getmsgint(&buf, 4);
+ port->database_name = MemoryContextStrdup(TopMemoryContext, pq_getmsgstring(&buf));
+ port->user_name = MemoryContextStrdup(TopMemoryContext, pq_getmsgstring(&buf));
+ port->guc_options = NIL;
+
+ /* GUC */
+ len = pq_getmsgint(&buf, 4);
+ for (i = 0; i < len; i++)
+ {
+ char *val = MemoryContextStrdup(TopMemoryContext, pq_getmsgstring(&buf));
+ port->guc_options = lappend(port->guc_options, val);
+ }
+
+ if (pq_getmsgint(&buf, 4) > 0)
+ port->cmdline_options = MemoryContextStrdup(TopMemoryContext, pq_getmsgstring(&buf));
+
+ return port;
+}
/*
* ConnFree -- free a local connection data structure
@@ -2430,6 +2732,12 @@ ConnFree(Port *conn)
#endif
if (conn->gss)
free(conn->gss);
+ if (conn->database_name)
+ pfree(conn->database_name);
+ if (conn->user_name)
+ pfree(conn->user_name);
+ if (conn->cmdline_options)
+ pfree(conn->cmdline_options);
free(conn);
}
@@ -3185,6 +3493,44 @@ CleanupBackgroundWorker(int pid,
}
/*
+ * Unlink backend from backend's list and free memory.
+ */
+static void
+UnlinkPooledBackend(Backend *bp)
+{
+ DatabasePool *pool = bp->pool;
+
+ if (!pool ||
+ bp->bkend_type != BACKEND_TYPE_NORMAL ||
+ bp->session_send_sock == PGINVALID_SOCKET)
+ return;
+
+ Assert(pool->n_workers > bp->worker_id &&
+ pool->workers[bp->worker_id] == bp);
+
+ if (--pool->n_workers != 0)
+ {
+ pool->workers[bp->worker_id] = pool->workers[pool->n_workers];
+ pool->workers[bp->worker_id]->worker_id = bp->worker_id;
+ pool->rr_index %= pool->n_workers;
+ }
+
+ closesocket(bp->session_send_sock);
+ bp->session_send_sock = PGINVALID_SOCKET;
+
+ elog(DEBUG2, "Cleanup backend %d", bp->pid);
+}
+
+static void
+DeleteBackend(Backend *bp)
+{
+ UnlinkPooledBackend(bp);
+
+ dlist_delete(&bp->elem);
+ free(bp);
+}
+
+/*
* CleanupBackend -- cleanup after terminated backend.
*
* Remove all local state associated with backend.
@@ -3261,8 +3607,7 @@ CleanupBackend(int pid,
*/
BackgroundWorkerStopNotifications(bp->pid);
}
- dlist_delete(iter.cur);
- free(bp);
+ DeleteBackend(bp);
break;
}
}
@@ -3364,8 +3709,7 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
ShmemBackendArrayRemove(bp);
#endif
}
- dlist_delete(iter.cur);
- free(bp);
+ DeleteBackend(bp);
/* Keep looping so we can signal remaining backends */
}
else
@@ -3962,16 +4306,42 @@ TerminateChildren(int signal)
* Note: if you change this code, also consider StartAutovacuumWorker.
*/
static int
-BackendStartup(Port *port)
+BackendStartup(DatabasePool *pool, Port *port)
{
Backend *bn; /* for backend cleanup */
pid_t pid;
+ pgsocket session_pipe[2];
+
+ /*
+ * In case of session pooling instead of spawning new backend open
+ * new session at one of the existed backends.
+ */
+ while (pool && pool->n_workers >= SessionPoolSize)
+ {
+ Backend *worker = pool->workers[pool->rr_index];
+ pool->rr_index = (pool->rr_index + 1) % pool->n_workers; /* round-robin */
+
+ /* Send connection socket to the worker backend */
+ if (pg_send_sock(worker->session_send_sock, port->sock, worker->pid) < 0)
+ {
+ elog(LOG, "Failed to send session socket %d: %m",
+ worker->session_send_sock);
+ UnlinkPooledBackend(worker);
+ continue;
+ }
+
+ elog(DEBUG2, "Start new session for socket %d at backend %d",
+ port->sock, worker->pid);
+
+ /* TODO: serialize the port and send it through socket */
+ return STATUS_OK;
+ }
/*
* Create backend data structure. Better before the fork() so we can
* handle failure cleanly.
*/
- bn = (Backend *) malloc(sizeof(Backend));
+ bn = (Backend *) calloc(1, sizeof(Backend));
if (!bn)
{
ereport(LOG,
@@ -4012,12 +4382,30 @@ BackendStartup(Port *port)
/* Hasn't asked to be notified about any bgworkers yet */
bn->bgworker_notify = false;
+ /* Create socket pair for sending session sockets to the backend */
+ if (!IsDedicatedBackend)
+ {
+ if (socketpair(AF_UNIX, SOCK_STREAM, 0, session_pipe) < 0)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg_internal("could not create socket pair for launching sessions: %m")));
+#ifdef WIN32
+ SessionPoolSock = session_pipe[0];
+#endif
+ }
#ifdef EXEC_BACKEND
pid = backend_forkexec(port);
#else /* !EXEC_BACKEND */
pid = fork_process();
if (pid == 0) /* child */
{
+ whereToSendOutput = DestNone;
+
+ if (!IsDedicatedBackend)
+ {
+ SessionPoolSock = session_pipe[0]; /* Use this socket for receiving client session socket descriptor */
+ close(session_pipe[1]); /* Close unused end of the pipe */
+ }
free(bn);
/* Detangle from postmaster */
@@ -4026,11 +4414,14 @@ BackendStartup(Port *port)
/* Close the postmaster's sockets */
ClosePostmasterPorts(false);
- /* Perform additional initialization and collect startup packet */
+ /* Perform additional initialization */
BackendInitialize(port);
/* And run the backend */
BackendRun(port);
+
+ /* Unreachable */
+ Assert(false);
}
#endif /* EXEC_BACKEND */
@@ -4041,6 +4432,7 @@ BackendStartup(Port *port)
if (!bn->dead_end)
(void) ReleasePostmasterChildSlot(bn->child_slot);
+
free(bn);
errno = save_errno;
ereport(LOG,
@@ -4059,9 +4451,27 @@ BackendStartup(Port *port)
* of backends.
*/
bn->pid = pid;
+ bn->session_send_sock = PGINVALID_SOCKET;
bn->bkend_type = BACKEND_TYPE_NORMAL; /* Can change later to WALSND */
+ bn->pool = pool;
dlist_push_head(&BackendList, &bn->elem);
+ if (!IsDedicatedBackend)
+ {
+ /* Use this socket for sending client session socket descriptor */
+ bn->session_send_sock = session_pipe[1];
+
+ /* Close unused end of the pipe */
+ closesocket(session_pipe[0]);
+
+ if (pool->workers == NULL)
+ pool->workers = (Backend **) calloc(sizeof(Backend *), SessionPoolSize);
+
+ bn->worker_id = pool->n_workers++;
+ pool->workers[bn->worker_id] = bn;
+
+ elog(DEBUG1, "Start %d-th worker with pid %d", pool->n_workers, pid);
+ }
#ifdef EXEC_BACKEND
if (!bn->dead_end)
ShmemBackendArrayAdd(bn);
@@ -4122,6 +4532,7 @@ BackendInitialize(Port *port)
/* Save port etc. for ps status */
MyProcPort = port;
+ FrontendProtocol = port->proto;
/*
* PreAuthDelay is a debugging aid for investigating problems in the
@@ -4148,7 +4559,10 @@ BackendInitialize(Port *port)
* Initialize libpq and enable reporting of ereport errors to the client.
* Must do this now because authentication uses libpq to send messages.
*/
- pq_init(); /* initialize libpq to talk to client */
+ port->pqcomm_state = pq_init(TopMemoryContext); /* initialize libpq to talk to client */
+ port->pqcomm_waitset = pq_create_backend_event_set(TopMemoryContext, port, false);
+ pq_set_current_state(port->pqcomm_state, port, port->pqcomm_waitset);
+
whereToSendOutput = DestRemote; /* now safe to ereport to client */
/*
@@ -4227,35 +4641,46 @@ BackendInitialize(Port *port)
port->remote_hostname = strdup(remote_host);
/*
- * Ready to begin client interaction. We will give up and exit(1) after a
- * time delay, so that a broken client can't hog a connection
- * indefinitely. PreAuthDelay and any DNS interactions above don't count
- * against the time limit.
- *
- * Note: AuthenticationTimeout is applied here while waiting for the
- * startup packet, and then again in InitPostgres for the duration of any
- * authentication operations. So a hostile client could tie up the
- * process for nearly twice AuthenticationTimeout before we kick him off.
- *
- * Note: because PostgresMain will call InitializeTimeouts again, the
- * registration of STARTUP_PACKET_TIMEOUT will be lost. This is okay
- * since we never use it again after this function.
+ * Read startup backend only if we don't use session pool
*/
- RegisterTimeout(STARTUP_PACKET_TIMEOUT, StartupPacketTimeoutHandler);
- enable_timeout_after(STARTUP_PACKET_TIMEOUT, AuthenticationTimeout * 1000);
+ if (IsDedicatedBackend && !port->proto)
+ {
+ /*
+ * Ready to begin client interaction. We will give up and exit(1) after a
+ * time delay, so that a broken client can't hog a connection
+ * indefinitely. PreAuthDelay and any DNS interactions above don't count
+ * against the time limit.
+ *
+ * Note: AuthenticationTimeout is applied here while waiting for the
+ * startup packet, and then again in InitPostgres for the duration of any
+ * authentication operations. So a hostile client could tie up the
+ * process for nearly twice AuthenticationTimeout before we kick him off.
+ *
+ * Note: because PostgresMain will call InitializeTimeouts again, the
+ * registration of STARTUP_PACKET_TIMEOUT will be lost. This is okay
+ * since we never use it again after this function.
+ */
+ RegisterTimeout(STARTUP_PACKET_TIMEOUT, StartupPacketTimeoutHandler);
+ enable_timeout_after(STARTUP_PACKET_TIMEOUT, AuthenticationTimeout * 1000);
- /*
- * Receive the startup packet (which might turn out to be a cancel request
- * packet).
- */
- status = ProcessStartupPacket(port, false);
+ /*
+ * Receive the startup packet (which might turn out to be a cancel request
+ * packet).
+ */
+ status = ProcessStartupPacket(port, false, TopMemoryContext, FATAL);
- /*
- * Stop here if it was bad or a cancel packet. ProcessStartupPacket
- * already did any appropriate error reporting.
- */
- if (status != STATUS_OK)
- proc_exit(0);
+ /*
+ * Stop here if it was bad or a cancel packet. ProcessStartupPacket
+ * already did any appropriate error reporting.
+ */
+ if (status != STATUS_OK)
+ proc_exit(0);
+
+ /*
+ * Disable the timeout
+ */
+ disable_timeout(STARTUP_PACKET_TIMEOUT, false);
+ }
/*
* Now that we have the user and database name, we can set the process
@@ -4277,9 +4702,8 @@ BackendInitialize(Port *port)
update_process_title ? "authentication" : "");
/*
- * Disable the timeout, and prevent SIGTERM/SIGQUIT again.
+ * Prevent SIGTERM/SIGQUIT again.
*/
- disable_timeout(STARTUP_PACKET_TIMEOUT, false);
PG_SETMASK(&BlockSig);
}
@@ -5990,6 +6414,9 @@ save_backend_variables(BackendParameters *param, Port *port,
if (!write_inheritable_socket(¶m->portsocket, port->sock, childPid))
return false;
+ if (!write_inheritable_socket(¶m->sessionsocket, SessionPoolSock, childPid))
+ return false;
+
strlcpy(param->DataDir, DataDir, MAXPGPATH);
memcpy(¶m->ListenSocket, &ListenSocket, sizeof(ListenSocket));
@@ -6222,6 +6649,7 @@ restore_backend_variables(BackendParameters *param, Port *port)
{
memcpy(port, ¶m->port, sizeof(Port));
read_inheritable_socket(&port->sock, ¶m->portsocket);
+ read_inheritable_socket(&SessionPoolSock, ¶m->sessionsocket);
SetDataDir(param->DataDir);
diff --git a/src/backend/storage/ipc/ipc.c b/src/backend/storage/ipc/ipc.c
index a85a1c6..9802ca0 100644
--- a/src/backend/storage/ipc/ipc.c
+++ b/src/backend/storage/ipc/ipc.c
@@ -413,3 +413,12 @@ on_exit_reset(void)
on_proc_exit_index = 0;
reset_on_dsm_detach();
}
+
+void
+on_shmem_exit_reset(void)
+{
+ before_shmem_exit_index = 0;
+ on_shmem_exit_index = 0;
+ on_proc_exit_index = 0;
+ reset_on_dsm_detach();
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 0c86a58..10e4613 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -28,6 +28,7 @@
#include "postmaster/bgworker_internals.h"
#include "postmaster/bgwriter.h"
#include "postmaster/postmaster.h"
+#include "postmaster/connpool.h"
#include "replication/logicallauncher.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
@@ -150,6 +151,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
size = add_size(size, BackendRandomShmemSize());
+ size = add_size(size, ConnPoolShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -271,6 +273,11 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
AsyncShmemInit();
BackendRandomShmemInit();
+ /*
+ * Set up connection pool workers
+ */
+ ConnectionPoolWorkersInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index f6dda9c..605f054 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -76,6 +76,7 @@ struct WaitEventSet
{
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */
+ int free_events; /* L1-list of free events linked by "pos" and terminated by -1*/
/*
* Array, of nevents_space length, storing the definition of events this
@@ -129,9 +130,9 @@ static void drainSelfPipe(void);
#if defined(WAIT_USE_EPOLL)
static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
#elif defined(WAIT_USE_POLL)
-static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove);
#elif defined(WAIT_USE_WIN32)
-static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove);
#endif
static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
@@ -562,6 +563,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
set->latch = NULL;
set->nevents_space = nevents;
+ set->free_events = -1;
#if defined(WAIT_USE_EPOLL)
#ifdef EPOLL_CLOEXEC
@@ -667,9 +669,11 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
void *user_data)
{
WaitEvent *event;
+ int free_event;
/* not enough space */
- Assert(set->nevents < set->nevents_space);
+ if (set->nevents == set->nevents_space)
+ return -1;
if (latch)
{
@@ -690,8 +694,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
- event = &set->events[set->nevents];
- event->pos = set->nevents++;
+ free_event = set->free_events;
+ if (free_event >= 0)
+ {
+ event = &set->events[free_event];
+ set->free_events = event->pos;
+ event->pos = free_event;
+ }
+ else
+ {
+ event = &set->events[set->nevents];
+ event->pos = set->nevents;
+ }
+ set->nevents += 1;
event->fd = fd;
event->events = events;
event->user_data = user_data;
@@ -718,15 +733,38 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
- WaitEventAdjustWin32(set, event);
+ WaitEventAdjustWin32(set, event, false);
#endif
return event->pos;
}
/*
+ * Remove event with specified socket descriptor
+ */
+void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd)
+{
+ int i, n = set->nevents;
+ for (i = 0; i < n; i++)
+ {
+ WaitEvent *event = &set->events[i];
+ if (event->fd == fd)
+ {
+#if defined(WAIT_USE_EPOLL)
+ WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_POLL)
+ WaitEventAdjustPoll(set, event, true);
+#elif defined(WAIT_USE_WIN32)
+ WaitEventAdjustWin32(set, event, true);
+#endif
+ break;
+ }
+ }
+}
+
+/*
* Change the event mask and, in the WL_LATCH_SET case, the latch associated
* with the WaitEvent.
*
@@ -774,9 +812,9 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
- WaitEventAdjustWin32(set, event);
+ WaitEventAdjustWin32(set, event, false);
#endif
}
@@ -822,19 +860,38 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
* requiring that, and actually it makes the code simpler...
*/
rc = epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev);
-
+ Assert(rc >= 0);
if (rc < 0)
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("epoll_ctl() failed: %m")));
+
+ if (action == EPOLL_CTL_DEL)
+ {
+ int pos = event->pos;
+ event->fd = PGINVALID_SOCKET;
+ set->nevents -= 1;
+ event->pos = set->free_events;
+ set->free_events = pos;
+ }
}
#endif
#if defined(WAIT_USE_POLL)
static void
-WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove)
{
- struct pollfd *pollfd = &set->pollfds[event->pos];
+ int pos = event->pos;
+ struct pollfd *pollfd = &set->pollfds[pos];
+
+ if (remove)
+ {
+ set->nevents -= 1;
+ *pollfd = set->pollfds[set->nevents];
+ set->events[pos] = set->events[set->nevents];
+ event->pos = pos;
+ return;
+ }
pollfd->revents = 0;
pollfd->fd = event->fd;
@@ -865,9 +922,25 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
#if defined(WAIT_USE_WIN32)
static void
-WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove)
{
- HANDLE *handle = &set->handles[event->pos + 1];
+ int pos = event->pos;
+ HANDLE *handle = &set->handles[pos + 1];
+
+ if (remove)
+ {
+ Assert(event->fd != PGINVALID_SOCKET);
+
+ if (*handle != WSA_INVALID_EVENT)
+ WSACloseEvent(*handle);
+
+ set->nevents -= 1;
+ set->events[pos] = set->events[set->nevents];
+ *handle = set->handles[set->nevents + 1];
+ set->handles[set->nevents + 1] = WSA_INVALID_EVENT;
+ event->pos = pos;
+ return;
+ }
if (event->events == WL_LATCH_SET)
{
@@ -880,7 +953,7 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
}
else
{
- int flags = FD_CLOSE; /* always check for errors/EOF */
+ int flags = FD_CLOSE; /* always check for errors/EOF */
if (event->events & WL_SOCKET_READABLE)
flags |= FD_READ;
@@ -897,8 +970,8 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
WSAGetLastError());
}
if (WSAEventSelect(event->fd, *handle, flags) != 0)
- elog(ERROR, "failed to set up event for socket: error code %u",
- WSAGetLastError());
+ elog(ERROR, "failed to set up event for socket %p: error code %u",
+ event->fd, WSAGetLastError());
Assert(event->fd != PGINVALID_SOCKET);
}
@@ -1296,7 +1369,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
{
if (cur_event->reset)
{
- WaitEventAdjustWin32(set, cur_event);
+ WaitEventAdjustWin32(set, cur_event, false);
cur_event->reset = false;
}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 6f9aaa5..5356000 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -597,6 +597,15 @@ InitAuxiliaryProcess(void)
}
/*
+ * Generate unique session ID.
+ */
+uint32
+CreateSessionId(void)
+{
+ return ++SessionPool->sessionCount;
+}
+
+/*
* Record the PID and PGPROC structures for the Startup process, for use in
* ProcSendSignal(). See comments there for further explanation.
*/
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 7a9ada2..42017e4 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -40,6 +40,7 @@
#include "access/printtup.h"
#include "access/xact.h"
#include "catalog/pg_type.h"
+#include "catalog/namespace.h"
#include "commands/async.h"
#include "commands/prepare.h"
#include "executor/spi.h"
@@ -77,9 +78,12 @@
#include "utils/snapmgr.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
+#include "utils/builtins.h"
+#include "utils/varlena.h"
+#include "utils/inval.h"
+#include "utils/catcache.h"
#include "mb/pg_wchar.h"
-
/* ----------------
* global variables
* ----------------
@@ -100,6 +104,41 @@ int max_stack_depth = 100;
/* wait N seconds to allow attach from a debugger */
int PostAuthDelay = 0;
+/* Local socket for redirecting sessions to the backends */
+pgsocket SessionPoolSock = PGINVALID_SOCKET;
+
+/* Pointer to pool of sessions */
+BackendSessionPool *SessionPool = NULL;
+
+/* Pointer to the active session */
+SessionContext *ActiveSession;
+SessionContext DefaultContext;
+bool IsDedicatedBackend = false;
+
+#define SessionVariable(type,name,init) type name = init;
+#include "storage/sessionvars.h"
+
+static void SaveSessionVariables(SessionContext* session)
+{
+ if (session != NULL)
+ {
+#define SessionVariable(type,name,init) session->name = name;
+#include "storage/sessionvars.h"
+ }
+}
+
+static void LoadSessionVariables(SessionContext* session)
+{
+#define SessionVariable(type,name,init) name = session->name;
+#include "storage/sessionvars.h"
+}
+
+static void InitializeSessionVariables(SessionContext* session)
+{
+#define SessionVariable(type,name,init) session->name = DefaultContext.name;
+#include "storage/sessionvars.h"
+}
+
/* ----------------
@@ -171,6 +210,8 @@ static ProcSignalReason RecoveryConflictReason;
static MemoryContext row_description_context = NULL;
static StringInfoData row_description_buf;
+static bool IdleInTransactionSessionError;
+
/* ----------------------------------------------------------------
* decls for routines only used in this file
* ----------------------------------------------------------------
@@ -196,6 +237,8 @@ static void log_disconnections(int code, Datum arg);
static void enable_statement_timeout(void);
static void disable_statement_timeout(void);
+static void DeleteSession(SessionContext *session);
+static void ResetCurrentSession(void);
/* ----------------------------------------------------------------
* routines to obtain user input
@@ -1234,10 +1277,6 @@ exec_parse_message(const char *query_string, /* string to execute */
bool save_log_statement_stats = log_statement_stats;
char msec_str[32];
- /*
- * Report query to various monitoring facilities.
- */
- debug_query_string = query_string;
pgstat_report_activity(STATE_RUNNING, query_string);
@@ -2930,9 +2969,29 @@ ProcessInterrupts(void)
LockErrorCleanup();
/* don't send to client, we already know the connection to be dead. */
whereToSendOutput = DestNone;
- ereport(FATAL,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("connection to client lost")));
+
+ if (ActiveSession)
+ {
+ Port *port = ActiveSession->port;
+ DeleteWaitEventFromSet(SessionPool->waitEvents, port->sock);
+
+ elog(LOG, "Lost connection on session %d in backend %d", MyProcPort->sock, MyProcPid);
+
+ closesocket(port->sock);
+ port->sock = PGINVALID_SOCKET;
+
+ MyProcPort = NULL;
+
+ StartTransactionCommand();
+ UserAbortTransactionBlock();
+ CommitTransactionCommand();
+
+ ResetCurrentSession();
+ }
+ else
+ ereport(FATAL,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("connection to client lost")));
}
/*
@@ -3043,9 +3102,20 @@ ProcessInterrupts(void)
{
/* Has the timeout setting changed since last we looked? */
if (IdleInTransactionSessionTimeout > 0)
- ereport(FATAL,
- (errcode(ERRCODE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT),
- errmsg("terminating connection due to idle-in-transaction timeout")));
+ {
+ if (ActiveSession)
+ {
+ IdleInTransactionSessionTimeoutPending = false;
+ IdleInTransactionSessionError = true;
+ ereport(ERROR,
+ (errcode(ERRCODE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT),
+ errmsg("canceling current transaction due to idle-in-transaction timeout")));
+ }
+ else
+ ereport(FATAL,
+ (errcode(ERRCODE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT),
+ errmsg("terminating connection due to idle-in-transaction timeout")));
+ }
else
IdleInTransactionSessionTimeoutPending = false;
@@ -3605,6 +3675,97 @@ process_postgres_switches(int argc, char *argv[], GucContext ctx,
#endif
}
+#define ACTIVE_SESSION_MAGIC 0xDEFA1234U
+#define REMOVED_SESSION_MAGIC 0xDEADDEEDU
+
+static SessionContext *
+CreateSession(void)
+{
+ SessionContext *session = (SessionContext *)
+ MemoryContextAllocZero(SessionPool->mcxt, sizeof(SessionContext));
+
+ session->memory = AllocSetContextCreate(SessionPool->mcxt,
+ "SessionMemoryContext", ALLOCSET_DEFAULT_SIZES);
+ session->prepared_queries = NULL;
+ session->id = CreateSessionId();
+ session->portals = CreatePortalsHashTable(session->memory);
+ session->magic = ACTIVE_SESSION_MAGIC;
+ return session;
+}
+
+static void
+SwitchToSession(SessionContext *session)
+{
+ /* epoll may return even for already closed session if socket is still openned.
+ * From epoll documentation:
+ * Q6 Will closing a file descriptor cause it to be removed from all epoll sets automatically?
+ *
+ * A6 Yes, but be aware of the following point. A file descriptor is a reference to an open file description (see
+ * open(2)). Whenever a descriptor is duplicated via dup(2), dup2(2), fcntl(2) F_DUPFD, or fork(2), a new file
+ * descriptor referring to the same open file description is created. An open file description continues to
+ * exist until all file descriptors referring to it have been closed. A file descriptor is removed from an
+ * epoll set only after all the file descriptors referring to the underlying open file description have been
+ * closed (or before if the descriptor is explicitly removed using epoll_ctl(2) EPOLL_CTL_DEL). This means
+ * that even after a file descriptor that is part of an epoll set has been closed, events may be reported for
+ * that file descriptor if other file descriptors referring to the same underlying file description remain
+ * open.
+ *
+ * Using this check for valid magic field we try to ignore such events.
+ */
+ if (ActiveSession == session || session->magic != ACTIVE_SESSION_MAGIC)
+ return;
+
+ SaveSessionVariables(ActiveSession);
+ RestoreSessionGUCs(ActiveSession);
+ ActiveSession = session;
+
+ MyProcPort = ActiveSession->port;
+ SetTempNamespaceState(ActiveSession->tempNamespace,
+ ActiveSession->tempToastNamespace);
+ pq_set_current_state(session->port->pqcomm_state, session->port,
+ session->eventSet);
+ whereToSendOutput = DestRemote;
+
+ RestoreSessionGUCs(ActiveSession);
+ LoadSessionVariables(ActiveSession);
+}
+
+static void
+ResetCurrentSession(void)
+{
+ if (!ActiveSession)
+ return;
+
+ whereToSendOutput = DestNone;
+ DeleteSession(ActiveSession);
+ pq_set_current_state(NULL, NULL, NULL);
+ SetTempNamespaceState(InvalidOid, InvalidOid);
+ ActiveSession = NULL;
+}
+
+/*
+ * Free all memory associated with session and delete session object itself.
+ */
+static void
+DeleteSession(SessionContext *session)
+{
+ elog(DEBUG1, "Delete session %p, id=%u, memory context=%p",
+ session, session->id, session->memory);
+
+ if (OidIsValid(session->tempNamespace))
+ ResetTempTableNamespace(session->tempNamespace);
+
+ DropAllPreparedStatements();
+ FreeWaitEventSet(session->eventSet);
+ RestoreSessionGUCs(session);
+ ReleaseSessionGUCs(session);
+ MemoryContextDelete(session->memory);
+ session->magic = REMOVED_SESSION_MAGIC;
+ pfree(session);
+
+ on_shmem_exit_reset();
+ pgstat_report_stat(true);
+}
/* ----------------------------------------------------------------
* PostgresMain
@@ -3656,6 +3817,33 @@ PostgresMain(int argc, char *argv[],
progname)));
}
+ /* Serve all conections to "postgres" database by dedicated backends */
+ if (IsDedicatedBackend)
+ {
+ SessionPoolSize = 0;
+ closesocket(SessionPoolSock);
+ SessionPoolSock = PGINVALID_SOCKET;
+ }
+
+ if (IsUnderPostmaster && !IsDedicatedBackend)
+ {
+ elog(DEBUG1, "Session pooling is active on %s database", dbname);
+
+ /* Initialize sessions pool for this backend */
+ Assert(SessionPool == NULL);
+ SessionPool = (BackendSessionPool *) MemoryContextAllocZero(
+ TopMemoryContext, sizeof(BackendSessionPool));
+ SessionPool->mcxt = AllocSetContextCreate(TopMemoryContext,
+ "SessionPoolContext", ALLOCSET_DEFAULT_SIZES);
+
+ /* Save the original backend port here */
+ SessionPool->backendPort = MyProcPort;
+
+ ActiveSession = CreateSession();
+ ActiveSession->port = MyProcPort;
+ ActiveSession->eventSet = pq_get_current_waitset();
+ }
+
/* Acquire configuration parameters, unless inherited from postmaster */
if (!IsUnderPostmaster)
{
@@ -3784,7 +3972,7 @@ PostgresMain(int argc, char *argv[],
* ... else we'd need to copy the Port data first. Also, subsidiary data
* such as the username isn't lost either; see ProcessStartupPacket().
*/
- if (PostmasterContext)
+ if (PostmasterContext && SessionPoolSize == 0)
{
MemoryContextDelete(PostmasterContext);
PostmasterContext = NULL;
@@ -3922,7 +4110,8 @@ PostgresMain(int argc, char *argv[],
pq_comm_reset();
/* Report the error to the client and/or server log */
- EmitErrorReport();
+ if (MyProcPort)
+ EmitErrorReport();
/*
* Make sure debug_query_string gets reset before we possibly clobber
@@ -3982,13 +4171,26 @@ PostgresMain(int argc, char *argv[],
* messages from the client, so there isn't much we can do with the
* connection anymore.
*/
- if (pq_is_reading_msg())
+ if (pq_is_reading_msg() && !ActiveSession)
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("terminating connection because protocol synchronization was lost")));
/* Now we can allow interrupts again */
RESUME_INTERRUPTS();
+
+ if (ActiveSession)
+ {
+ if (IdleInTransactionSessionError || (IsAbortedTransactionBlockState() && pq_is_reading_msg()))
+ {
+ StartTransactionCommand();
+ UserAbortTransactionBlock();
+ CommitTransactionCommand();
+ IdleInTransactionSessionError = false;
+ }
+ if (pq_is_reading_msg())
+ goto CloseSession;
+ }
}
/* We can now handle ereport(ERROR) */
@@ -3997,10 +4199,30 @@ PostgresMain(int argc, char *argv[],
if (!ignore_till_sync)
send_ready_for_query = true; /* initially, or after error */
+
+ /* Initialize wait event set if we're using sessions pool */
+ if (SessionPool && SessionPool->waitEvents == NULL)
+ {
+ /* Construct wait event set if not constructed yet */
+ SessionPool->waitEvents = CreateWaitEventSet(SessionPool->mcxt, MaxSessions + 3);
+ /* Add event to detect postmaster death */
+ AddWaitEventToSet(SessionPool->waitEvents, WL_POSTMASTER_DEATH,
+ PGINVALID_SOCKET, NULL, ActiveSession);
+ /* Add event for backends latch */
+ AddWaitEventToSet(SessionPool->waitEvents, WL_LATCH_SET,
+ PGINVALID_SOCKET, MyLatch, ActiveSession);
+ /* Add event for accepting new sessions */
+ AddWaitEventToSet(SessionPool->waitEvents, WL_SOCKET_READABLE,
+ SessionPoolSock, NULL, ActiveSession);
+ /* Add event for current session */
+ AddWaitEventToSet(SessionPool->waitEvents, WL_SOCKET_READABLE,
+ ActiveSession->port->sock, NULL, ActiveSession);
+ SaveSessionVariables(&DefaultContext);
+ }
+
/*
* Non-error queries loop here.
*/
-
for (;;)
{
/*
@@ -4076,6 +4298,130 @@ PostgresMain(int argc, char *argv[],
ReadyForQuery(whereToSendOutput);
send_ready_for_query = false;
+
+ /*
+ * Here we perform multiplexing of client sessions if session pooling is enabled.
+ * As far as we perform transaction level pooling,
+ * rescheduling is done only when we are not in transaction.
+ */
+ if (SessionPoolSock != PGINVALID_SOCKET
+ && !IsTransactionState()
+ && !IsAbortedTransactionBlockState()
+ && pq_available_bytes() == 0)
+ {
+ WaitEvent ready_client;
+
+ChooseSession:
+ DoingCommandRead = true;
+ /* Select which client session is ready to send new query */
+ if (WaitEventSetWait(SessionPool->waitEvents, -1,
+ &ready_client, 1, PG_WAIT_CLIENT) != 1)
+ {
+ /* TODO: do some error recovery here */
+ elog(FATAL, "Failed to poll client sessions");
+ }
+ CHECK_FOR_INTERRUPTS();
+ DoingCommandRead = false;
+
+ if (ready_client.events & WL_POSTMASTER_DEATH)
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating connection due to unexpected postmaster exit")));
+
+ if (ready_client.events & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ ProcessClientReadInterrupt(true);
+ goto ChooseSession;
+ }
+
+ if (ready_client.fd == SessionPoolSock)
+ {
+ /* Here we handle case of attaching new session */
+ SessionContext* session;
+ StringInfoData buf;
+ Port* port;
+ pgsocket sock;
+ MemoryContext oldcontext;
+
+ sock = pg_recv_sock(SessionPoolSock);
+ if (sock == PGINVALID_SOCKET)
+ elog(ERROR, "Failed to receive session socket: %m");
+
+ session = CreateSession();
+
+ /* Initialize port and wait event set for this session */
+ oldcontext = MemoryContextSwitchTo(session->memory);
+ MyProcPort = port = palloc(sizeof(Port));
+ memcpy(port, SessionPool->backendPort, sizeof(Port));
+
+ /*
+ * Receive the startup packet (which might turn out to be
+ * a cancel request packet).
+ */
+ port->sock = sock;
+ port->pqcomm_state = pq_init(session->memory);
+
+ session->port = port;
+ session->eventSet =
+ pq_create_backend_event_set(session->memory, port, false);
+ pq_set_current_state(session->port->pqcomm_state,
+ port,
+ session->eventSet);
+ whereToSendOutput = DestRemote;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ if (AddWaitEventToSet(SessionPool->waitEvents, WL_SOCKET_READABLE,
+ sock, NULL, session) < 0)
+ {
+ elog(WARNING, "Too much pooled sessions: %d", MaxSessions);
+ DeleteSession(session);
+ ActiveSession = NULL;
+ closesocket(sock);
+ goto ChooseSession;
+ }
+
+ elog(DEBUG1, "Start new session %d in backend %d "
+ "for database %s user %s", (int)sock, MyProcPid,
+ port->database_name, port->user_name);
+
+ SaveSessionVariables(ActiveSession);
+ RestoreSessionGUCs(ActiveSession);
+ ActiveSession = session;
+ InitializeSessionVariables(session);
+ LoadSessionVariables(session);
+ SetCurrentStatementStartTimestamp();
+ StartTransactionCommand();
+ PerformAuthentication(MyProcPort);
+ process_settings(MyDatabaseId, GetSessionUserId());
+ CommitTransactionCommand();
+ SetTempNamespaceState(InvalidOid, InvalidOid);
+
+ /*
+ * Send GUC options to the client
+ */
+ BeginReportingGUCOptions();
+
+ /*
+ * Send this backend's cancellation info to the frontend.
+ */
+ pq_beginmessage(&buf, 'K');
+ pq_sendint(&buf, (int32) MyProcPid, 4);
+ pq_sendint(&buf, (int32) MyCancelKey, 4);
+ pq_endmessage(&buf);
+
+ /* Need not flush since ReadyForQuery will do it. */
+ send_ready_for_query = true;
+
+ continue;
+ }
+ else
+ {
+ SessionContext* session = (SessionContext *) ready_client.user_data;
+ SwitchToSession(session);
+ }
+ }
}
/*
@@ -4118,6 +4464,8 @@ PostgresMain(int argc, char *argv[],
*/
if (ConfigReloadPending)
{
+ if (ActiveSession && RestartPoolerOnReload)
+ proc_exit(0);
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
@@ -4355,6 +4703,46 @@ PostgresMain(int argc, char *argv[],
* it will fail to be called during other backend-shutdown
* scenarios.
*/
+
+ if (SessionPool)
+ {
+CloseSession:
+ /* In case of session pooling close the session, but do not terminate the backend
+ * even if there are not more sessions in this backend.
+ * The reason for keeping backend alive is to prevent redundant process launches if
+ * some client repeatedly open/close connection to the database.
+ * Maximal number of launched backends in case of connection pooling is intended to be
+ * optimal for this system and workload, so there are no reasons to try to reduce this number
+ * when there are no active sessions.
+ */
+ if (MyProcPort)
+ {
+ elog(DEBUG1, "Closing session %d in backend %d", MyProcPort->sock, MyProcPid);
+
+ DeleteWaitEventFromSet(SessionPool->waitEvents, MyProcPort->sock);
+
+ pq_getmsgend(&input_message);
+ if (pq_is_reading_msg())
+ pq_endmsgread();
+
+ closesocket(MyProcPort->sock);
+ MyProcPort->sock = PGINVALID_SOCKET;
+ MyProcPort = NULL;
+ }
+
+ if (ActiveSession)
+ {
+ StartTransactionCommand();
+ UserAbortTransactionBlock();
+ CommitTransactionCommand();
+
+ ResetCurrentSession();
+ }
+
+ /* Need to perform rescheduling to some other session or accept new session */
+ goto ChooseSession;
+ }
+ elog(DEBUG1, "Terminate backend %d", MyProcPid);
proc_exit(0);
case 'd': /* copy data */
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index e95e347..6726195 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -875,6 +875,17 @@ pg_backend_pid(PG_FUNCTION_ARGS)
PG_RETURN_INT32(MyProcPid);
}
+Datum
+pg_session_id(PG_FUNCTION_ARGS)
+{
+ char *s;
+ if (ActiveSession)
+ s = psprintf("%d.%u", MyProcPid, ActiveSession->id);
+ else
+ s = psprintf("%d", MyProcPid);
+
+ PG_RETURN_TEXT_P(CStringGetTextDatum(s));
+}
Datum
pg_stat_get_backend_pid(PG_FUNCTION_ARGS)
diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c
index 7271b58..6b0cb54 100644
--- a/src/backend/utils/cache/plancache.c
+++ b/src/backend/utils/cache/plancache.c
@@ -61,6 +61,7 @@
#include "parser/analyze.h"
#include "parser/parsetree.h"
#include "storage/lmgr.h"
+#include "storage/proc.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/inval.h"
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 6125421..7ce5671 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -78,6 +78,7 @@
#include "rewrite/rewriteDefine.h"
#include "rewrite/rowsecurity.h"
#include "storage/lmgr.h"
+#include "storage/proc.h"
#include "storage/smgr.h"
#include "utils/array.h"
#include "utils/builtins.h"
@@ -1943,6 +1944,13 @@ RelationIdGetRelation(Oid relationId)
Assert(rd->rd_isvalid ||
(rd->rd_isnailed && !criticalRelcachesBuilt));
}
+ /*
+ * In case of session pooling, relation descriptor can be constructed by some other session,
+ * so we need to recheck rd_islocaltemp value
+ */
+ if (ActiveSession && RELATION_IS_OTHER_TEMP(rd) && isTempOrTempToastNamespace(rd->rd_rel->relnamespace))
+ rd->rd_islocaltemp = true;
+
return rd;
}
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index f7d6617..0617f4b 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -128,7 +128,9 @@ int max_parallel_maintenance_workers = 2;
* register background workers.
*/
int NBuffers = 1000;
+int SessionPoolSize = 0;
int MaxConnections = 90;
+int MaxSessions = 1000;
int max_worker_processes = 8;
int max_parallel_workers = 8;
int MaxBackends = 0;
@@ -147,3 +149,6 @@ int VacuumCostBalance = 0; /* working state for vacuum */
bool VacuumCostActive = false;
double vacuum_cleanup_index_scale_factor;
+
+bool RestartPoolerOnReload = false;
+char *DedicatedDatabases;
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 865119d..715429a 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -250,19 +250,6 @@ ChangeToDataDir(void)
* convenient way to do it.
* ----------------------------------------------------------------
*/
-static Oid AuthenticatedUserId = InvalidOid;
-static Oid SessionUserId = InvalidOid;
-static Oid OuterUserId = InvalidOid;
-static Oid CurrentUserId = InvalidOid;
-
-/* We also have to remember the superuser state of some of these levels */
-static bool AuthenticatedUserIsSuperuser = false;
-static bool SessionUserIsSuperuser = false;
-
-static int SecurityRestrictionContext = 0;
-
-/* We also remember if a SET ROLE is currently active */
-static bool SetRoleIsActive = false;
/*
* Initialize the basic environment for a postmaster child
@@ -345,13 +332,15 @@ InitStandaloneProcess(const char *argv0)
void
SwitchToSharedLatch(void)
{
+ WaitEventSet *waitset;
Assert(MyLatch == &LocalLatchData);
Assert(MyProc != NULL);
MyLatch = &MyProc->procLatch;
- if (FeBeWaitSet)
- ModifyWaitEvent(FeBeWaitSet, 1, WL_LATCH_SET, MyLatch);
+ waitset = pq_get_current_waitset();
+ if (waitset)
+ ModifyWaitEvent(waitset, 1, WL_LATCH_SET, MyLatch);
/*
* Set the shared latch as the local one might have been set. This
@@ -364,13 +353,15 @@ SwitchToSharedLatch(void)
void
SwitchBackToLocalLatch(void)
{
+ WaitEventSet *waitset;
Assert(MyLatch != &LocalLatchData);
Assert(MyProc != NULL && MyLatch == &MyProc->procLatch);
MyLatch = &LocalLatchData;
- if (FeBeWaitSet)
- ModifyWaitEvent(FeBeWaitSet, 1, WL_LATCH_SET, MyLatch);
+ waitset = pq_get_current_waitset();
+ if (waitset)
+ ModifyWaitEvent(waitset, 1, WL_LATCH_SET, MyLatch);
SetLatch(MyLatch);
}
@@ -434,6 +425,8 @@ SetSessionUserId(Oid userid, bool is_superuser)
/* We force the effective user IDs to match, too */
OuterUserId = userid;
CurrentUserId = userid;
+
+ SysCacheInvalidate(AUTHOID, 0);
}
/*
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 5ef6315..f1d6834 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -62,10 +62,8 @@
#include "utils/timeout.h"
#include "utils/tqual.h"
-
static HeapTuple GetDatabaseTuple(const char *dbname);
static HeapTuple GetDatabaseTupleByOid(Oid dboid);
-static void PerformAuthentication(Port *port);
static void CheckMyDatabase(const char *name, bool am_superuser, bool override_allow_connections);
static void InitCommunication(void);
static void ShutdownPostgres(int code, Datum arg);
@@ -74,7 +72,6 @@ static void LockTimeoutHandler(void);
static void IdleInTransactionSessionTimeoutHandler(void);
static bool ThereIsAtLeastOneRole(void);
static void process_startup_options(Port *port, bool am_superuser);
-static void process_settings(Oid databaseid, Oid roleid);
/*** InitPostgres support ***/
@@ -180,7 +177,7 @@ GetDatabaseTupleByOid(Oid dboid)
*
* returns: nothing. Will not return at all if there's any failure.
*/
-static void
+void
PerformAuthentication(Port *port)
{
/* This should be set already, but let's make sure */
@@ -1126,7 +1123,7 @@ process_startup_options(Port *port, bool am_superuser)
* We try specific settings for the database/role combination, as well as
* general for this database and for this user.
*/
-static void
+void
process_settings(Oid databaseid, Oid roleid)
{
Relation relsetting;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 0625eff..f435356 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -59,6 +59,7 @@
#include "postmaster/autovacuum.h"
#include "postmaster/bgworker_internals.h"
#include "postmaster/bgwriter.h"
+#include "postmaster/connpool.h"
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
#include "postmaster/walwriter.h"
@@ -587,6 +588,8 @@ const char *const config_group_names[] =
gettext_noop("Connections and Authentication / Authentication"),
/* CONN_AUTH_SSL */
gettext_noop("Connections and Authentication / SSL"),
+ /* CONN_POOLING */
+ gettext_noop("Connections and Authentication / Connection Pooling"),
/* RESOURCES */
gettext_noop("Resource Usage"),
/* RESOURCES_MEM */
@@ -1192,6 +1195,16 @@ static struct config_bool ConfigureNamesBool[] =
},
{
+ {"restart_pooler_on_reload", PGC_SIGHUP, CONN_POOLING,
+ gettext_noop("Restart session pool workers on pg_reload_conf()."),
+ NULL,
+ },
+ &RestartPoolerOnReload,
+ false,
+ NULL, NULL, NULL
+ },
+
+ {
{"log_duration", PGC_SUSET, LOGGING_WHAT,
gettext_noop("Logs the duration of each completed SQL statement."),
NULL
@@ -1998,8 +2011,41 @@ static struct config_int ConfigureNamesInt[] =
check_maxconnections, NULL, NULL
},
+ {
+ {"max_sessions", PGC_POSTMASTER, CONN_POOLING,
+ gettext_noop("Sets the maximum number of client session."),
+ gettext_noop("Maximal number of client sessions which can be handled by one backend if session pooling is switched on. "
+ "So maximal number of client connections is session_pool_size*max_sessions")
+ },
+ &MaxSessions,
+ 1000, 1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
{
- /* see max_connections and max_wal_senders */
+ {"session_pool_size", PGC_POSTMASTER, CONN_POOLING,
+ gettext_noop("Sets number of backends serving client sessions."),
+ gettext_noop("If non-zero then session pooling will be used: "
+ "client connections will be redirected to one of the backends and maximal number of backends is determined by this parameter."
+ "Launched backend are never terminated even in case of no active sessions.")
+ },
+ &SessionPoolSize,
+ 10, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"connection_pool_workers", PGC_POSTMASTER, CONN_POOLING,
+ gettext_noop("Number of connection pool workers"),
+ NULL,
+ },
+ &NumConnPoolWorkers,
+ 2, 0, MAX_CONNPOOL_WORKERS,
+ NULL, NULL, NULL
+ },
+
+ {
+ /* see max_connections and max_wal_senders */
{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the number of connection slots reserved for superusers."),
NULL
@@ -3340,9 +3386,9 @@ static struct config_string ConfigureNamesString[] =
{
{"temp_tablespaces", PGC_USERSET, CLIENT_CONN_STATEMENT,
- gettext_noop("Sets the tablespace(s) to use for temporary tables and sort files."),
- NULL,
- GUC_LIST_INPUT | GUC_LIST_QUOTE
+ gettext_noop("Sets the tablespace(s) to use for temporary tables and sort files."),
+ NULL,
+ GUC_LIST_INPUT | GUC_LIST_QUOTE
},
&temp_tablespaces,
"",
@@ -3350,6 +3396,16 @@ static struct config_string ConfigureNamesString[] =
},
{
+ {"dedicated_databases", PGC_USERSET, CONN_POOLING,
+ gettext_noop("Set of databases for which session pooling is disabled."),
+ NULL,
+ GUC_LIST_INPUT | GUC_LIST_QUOTE
+ },
+ &DedicatedDatabases,
+ "template0, template1, postgres"
+ },
+
+ {
{"dynamic_library_path", PGC_SUSET, CLIENT_CONN_OTHER,
gettext_noop("Sets the path for dynamically loadable modules."),
gettext_noop("If a dynamically loadable module needs to be opened and "
@@ -5346,6 +5402,164 @@ NewGUCNestLevel(void)
}
/*
+ * Save changed variables after SET command.
+ * It's important to restore variables as we add them to the list.
+ */
+static void
+SaveSessionGUCs(SessionContext *session,
+ struct config_generic *gconf,
+ config_var_value *prior_val)
+{
+ SessionGUC *sg;
+
+ /* Find needed GUC in active session */
+ for (sg = session->gucs;
+ sg != NULL && sg->var != gconf; sg = sg->next);
+
+ if (sg != NULL)
+ /* already there */
+ return;
+
+ sg = MemoryContextAllocZero(session->memory, sizeof(SessionGUC));
+ sg->var = gconf;
+ sg->saved.extra = prior_val->extra;
+
+ switch (gconf->vartype)
+ {
+ case PGC_BOOL:
+ sg->saved.val.boolval = prior_val->val.boolval;
+ break;
+ case PGC_INT:
+ sg->saved.val.intval = prior_val->val.intval;
+ break;
+ case PGC_REAL:
+ sg->saved.val.realval = prior_val->val.realval;
+ break;
+ case PGC_STRING:
+ sg->saved.val.stringval = prior_val->val.stringval;
+ break;
+ case PGC_ENUM:
+ sg->saved.val.enumval = prior_val->val.enumval;
+ break;
+ }
+
+ if (session->gucs)
+ {
+ SessionGUC *latest;
+
+ /* Move to end of the list */
+ for (latest = session->gucs;
+ latest->next != NULL; latest = latest->next);
+ latest->next = sg;
+ }
+ else
+ session->gucs = sg;
+}
+
+/*
+ * Set GUCs for this session
+ */
+void
+RestoreSessionGUCs(SessionContext* session)
+{
+ SessionGUC *sg;
+ bool save_reporting_enabled;
+
+ if (session == NULL)
+ return;
+
+ save_reporting_enabled = reporting_enabled;
+ reporting_enabled = false;
+
+ for (sg = session->gucs; sg != NULL; sg = sg->next)
+ {
+ void *saved_extra = sg->saved.extra;
+ void *old_extra = sg->var->extra;
+
+ /* restore extra */
+ sg->var->extra = saved_extra;
+ sg->saved.extra = old_extra;
+
+ /* restore actual values */
+ switch (sg->var->vartype)
+ {
+ case PGC_BOOL:
+ {
+ struct config_bool *conf = (struct config_bool *)sg->var;
+ bool oldval = *conf->variable;
+ *conf->variable = sg->saved.val.boolval;
+ if (conf->assign_hook)
+ conf->assign_hook(sg->saved.val.boolval, saved_extra);
+
+ sg->saved.val.boolval = oldval;
+ break;
+ }
+ case PGC_INT:
+ {
+ struct config_int *conf = (struct config_int*) sg->var;
+ int oldval = *conf->variable;
+ *conf->variable = sg->saved.val.intval;
+ if (conf->assign_hook)
+ conf->assign_hook(sg->saved.val.intval, saved_extra);
+ sg->saved.val.intval = oldval;
+ break;
+ }
+ case PGC_REAL:
+ {
+ struct config_real *conf = (struct config_real*) sg->var;
+ double oldval = *conf->variable;
+ *conf->variable = sg->saved.val.realval;
+ if (conf->assign_hook)
+ conf->assign_hook(sg->saved.val.realval, saved_extra);
+ sg->saved.val.realval = oldval;
+ break;
+ }
+ case PGC_STRING:
+ {
+ struct config_string *conf = (struct config_string*) sg->var;
+ char* oldval = *conf->variable;
+ *conf->variable = sg->saved.val.stringval;
+ if (conf->assign_hook)
+ conf->assign_hook(sg->saved.val.stringval, saved_extra);
+ sg->saved.val.stringval = oldval;
+ break;
+ }
+ case PGC_ENUM:
+ {
+ struct config_enum *conf = (struct config_enum*) sg->var;
+ int oldval = *conf->variable;
+ *conf->variable = sg->saved.val.enumval;
+ if (conf->assign_hook)
+ conf->assign_hook(sg->saved.val.enumval, saved_extra);
+ sg->saved.val.enumval = oldval;
+ break;
+ }
+ }
+ }
+ reporting_enabled = save_reporting_enabled;
+}
+
+/*
+ * Deallocate memory for session GUCs
+ */
+void
+ReleaseSessionGUCs(SessionContext* session)
+{
+ SessionGUC* sg;
+ for (sg = session->gucs; sg != NULL; sg = sg->next)
+ {
+ if (sg->saved.extra)
+ set_extra_field(sg->var, &sg->saved.extra, NULL);
+
+ if (sg->var->vartype == PGC_STRING)
+ {
+ struct config_string* conf = (struct config_string*)sg->var;
+ set_string_field(conf, &sg->saved.val.stringval, NULL);
+ }
+ }
+}
+
+/*
* Do GUC processing at transaction or subtransaction commit or abort, or
* when exiting a function that has proconfig settings, or when undoing a
* transient assignment to some GUC variables. (The name is thus a bit of
@@ -5413,8 +5627,10 @@ AtEOXact_GUC(bool isCommit, int nestLevel)
restoreMasked = true;
else if (stack->state == GUC_SET)
{
- /* we keep the current active value */
- discard_stack_value(gconf, &stack->prior);
+ if (ActiveSession)
+ SaveSessionGUCs(ActiveSession, gconf, &stack->prior);
+ else
+ discard_stack_value(gconf, &stack->prior);
}
else /* must be GUC_LOCAL */
restorePrior = true;
@@ -5440,8 +5656,8 @@ AtEOXact_GUC(bool isCommit, int nestLevel)
case GUC_SET:
/* next level always becomes SET */
- discard_stack_value(gconf, &stack->prior);
- if (prev->state == GUC_SET_LOCAL)
+ discard_stack_value(gconf, &stack->prior);
+ if (prev->state == GUC_SET_LOCAL)
discard_stack_value(gconf, &prev->masked);
prev->state = GUC_SET;
break;
diff --git a/src/backend/utils/misc/superuser.c b/src/backend/utils/misc/superuser.c
index fbe83c9..1ebc379 100644
--- a/src/backend/utils/misc/superuser.c
+++ b/src/backend/utils/misc/superuser.c
@@ -24,6 +24,7 @@
#include "catalog/pg_authid.h"
#include "utils/inval.h"
#include "utils/syscache.h"
+#include "storage/proc.h"
#include "miscadmin.h"
@@ -33,8 +34,6 @@
* the status of the last requested roleid. The cache can be flushed
* at need by watching for cache update events on pg_authid.
*/
-static Oid last_roleid = InvalidOid; /* InvalidOid == cache not valid */
-static bool last_roleid_is_super = false;
static bool roleid_callback_registered = false;
static void RoleidCallback(Datum arg, int cacheid, uint32 hashvalue);
diff --git a/src/backend/utils/mmgr/portalmem.c b/src/backend/utils/mmgr/portalmem.c
index 04ea32f..a8c27a3 100644
--- a/src/backend/utils/mmgr/portalmem.c
+++ b/src/backend/utils/mmgr/portalmem.c
@@ -23,6 +23,7 @@
#include "commands/portalcmds.h"
#include "miscadmin.h"
#include "storage/ipc.h"
+#include "storage/proc.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
@@ -53,11 +54,14 @@ typedef struct portalhashent
static HTAB *PortalHashTable = NULL;
+#define CurrentPortalHashTable() \
+ (ActiveSession ? ActiveSession->portals : PortalHashTable)
+
#define PortalHashTableLookup(NAME, PORTAL) \
do { \
PortalHashEnt *hentry; \
\
- hentry = (PortalHashEnt *) hash_search(PortalHashTable, \
+ hentry = (PortalHashEnt *) hash_search(CurrentPortalHashTable(), \
(NAME), HASH_FIND, NULL); \
if (hentry) \
PORTAL = hentry->portal; \
@@ -69,7 +73,7 @@ do { \
do { \
PortalHashEnt *hentry; bool found; \
\
- hentry = (PortalHashEnt *) hash_search(PortalHashTable, \
+ hentry = (PortalHashEnt *) hash_search(CurrentPortalHashTable(), \
(NAME), HASH_ENTER, &found); \
if (found) \
elog(ERROR, "duplicate portal name"); \
@@ -82,7 +86,7 @@ do { \
do { \
PortalHashEnt *hentry; \
\
- hentry = (PortalHashEnt *) hash_search(PortalHashTable, \
+ hentry = (PortalHashEnt *) hash_search(CurrentPortalHashTable(), \
PORTAL->name, HASH_REMOVE, NULL); \
if (hentry == NULL) \
elog(WARNING, "trying to delete portal name that does not exist"); \
@@ -90,12 +94,33 @@ do { \
static MemoryContext TopPortalContext = NULL;
-
/* ----------------------------------------------------------------
* public portal interface functions
* ----------------------------------------------------------------
*/
+HTAB *
+CreatePortalsHashTable(MemoryContext mcxt)
+{
+ HASHCTL ctl;
+ int flags = HASH_ELEM;
+
+ ctl.keysize = MAX_PORTALNAME_LEN;
+ ctl.entrysize = sizeof(PortalHashEnt);
+
+ if (mcxt)
+ {
+ ctl.hcxt = mcxt;
+ flags |= HASH_CONTEXT;
+ }
+
+ /*
+ * use PORTALS_PER_USER as a guess of how many hash table entries to
+ * create, initially
+ */
+ return hash_create("Portal hash", PORTALS_PER_USER, &ctl, flags);
+}
+
/*
* EnablePortalManager
* Enables the portal management module at backend startup.
@@ -103,23 +128,13 @@ static MemoryContext TopPortalContext = NULL;
void
EnablePortalManager(void)
{
- HASHCTL ctl;
-
Assert(TopPortalContext == NULL);
TopPortalContext = AllocSetContextCreate(TopMemoryContext,
- "TopPortalContext",
- ALLOCSET_DEFAULT_SIZES);
-
- ctl.keysize = MAX_PORTALNAME_LEN;
- ctl.entrysize = sizeof(PortalHashEnt);
+ "TopPortalContext",
+ ALLOCSET_DEFAULT_SIZES);
- /*
- * use PORTALS_PER_USER as a guess of how many hash table entries to
- * create, initially
- */
- PortalHashTable = hash_create("Portal hash", PORTALS_PER_USER,
- &ctl, HASH_ELEM);
+ PortalHashTable = CreatePortalsHashTable(NULL);
}
/*
@@ -602,11 +617,14 @@ PortalHashTableDeleteAll(void)
{
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
+ HTAB *htab;
- if (PortalHashTable == NULL)
+ htab = CurrentPortalHashTable();
+
+ if (htab == NULL)
return;
- hash_seq_init(&status, PortalHashTable);
+ hash_seq_init(&status, htab);
while ((hentry = hash_seq_search(&status)) != NULL)
{
Portal portal = hentry->portal;
@@ -619,7 +637,7 @@ PortalHashTableDeleteAll(void)
/* Restart the iteration in case that led to other drops */
hash_seq_term(&status);
- hash_seq_init(&status, PortalHashTable);
+ hash_seq_init(&status, htab);
}
}
@@ -672,8 +690,10 @@ PreCommit_Portals(bool isPrepare)
bool result = false;
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
+ HTAB *htab;
- hash_seq_init(&status, PortalHashTable);
+ htab = CurrentPortalHashTable();
+ hash_seq_init(&status, htab);
while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
{
@@ -746,7 +766,7 @@ PreCommit_Portals(bool isPrepare)
* caused a drop of the next portal in the hash chain.
*/
hash_seq_term(&status);
- hash_seq_init(&status, PortalHashTable);
+ hash_seq_init(&status, htab);
}
return result;
@@ -763,8 +783,11 @@ AtAbort_Portals(void)
{
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
+ HTAB *htab;
+
+ htab = CurrentPortalHashTable();
- hash_seq_init(&status, PortalHashTable);
+ hash_seq_init(&status, htab);
while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
{
@@ -840,8 +863,11 @@ AtCleanup_Portals(void)
{
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
+ HTAB *htab;
- hash_seq_init(&status, PortalHashTable);
+ htab = CurrentPortalHashTable();
+
+ hash_seq_init(&status, htab);
while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
{
@@ -899,8 +925,10 @@ PortalErrorCleanup(void)
{
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
+ HTAB *htab;
- hash_seq_init(&status, PortalHashTable);
+ htab = CurrentPortalHashTable();
+ hash_seq_init(&status, htab);
while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
{
@@ -927,8 +955,9 @@ AtSubCommit_Portals(SubTransactionId mySubid,
{
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
+ HTAB *htab = CurrentPortalHashTable();
- hash_seq_init(&status, PortalHashTable);
+ hash_seq_init(&status, htab);
while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
{
@@ -962,8 +991,11 @@ AtSubAbort_Portals(SubTransactionId mySubid,
{
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
+ HTAB *htab;
+
+ htab = CurrentPortalHashTable();
- hash_seq_init(&status, PortalHashTable);
+ hash_seq_init(&status, htab);
while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
{
@@ -1072,8 +1104,9 @@ AtSubCleanup_Portals(SubTransactionId mySubid)
{
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
+ HTAB *htab = CurrentPortalHashTable();
- hash_seq_init(&status, PortalHashTable);
+ hash_seq_init(&status, htab);
while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
{
@@ -1161,7 +1194,7 @@ pg_cursor(PG_FUNCTION_ARGS)
/* generate junk in short-term context */
MemoryContextSwitchTo(oldcontext);
- hash_seq_init(&hash_seq, PortalHashTable);
+ hash_seq_init(&hash_seq, CurrentPortalHashTable());
while ((hentry = hash_seq_search(&hash_seq)) != NULL)
{
Portal portal = hentry->portal;
@@ -1200,7 +1233,7 @@ ThereAreNoReadyPortals(void)
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
- hash_seq_init(&status, PortalHashTable);
+ hash_seq_init(&status, CurrentPortalHashTable());
while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
{
@@ -1229,8 +1262,11 @@ HoldPinnedPortals(void)
{
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
+ HTAB *htab;
+
+ htab = CurrentPortalHashTable();
- hash_seq_init(&status, PortalHashTable);
+ hash_seq_init(&status, htab);
while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
{
diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h
index 0e20237..ddcc3c8 100644
--- a/src/include/catalog/namespace.h
+++ b/src/include/catalog/namespace.h
@@ -144,7 +144,9 @@ extern void GetTempNamespaceState(Oid *tempNamespaceId,
Oid *tempToastNamespaceId);
extern void SetTempNamespaceState(Oid tempNamespaceId,
Oid tempToastNamespaceId);
-extern void ResetTempTableNamespace(void);
+
+struct SessionContext;
+extern void ResetTempTableNamespace(Oid npc);
extern OverrideSearchPath *GetOverrideSearchPath(MemoryContext context);
extern OverrideSearchPath *CopyOverrideSearchPath(OverrideSearchPath *path);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a146510..62fb7a4 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5202,6 +5202,9 @@
{ oid => '2026', descr => 'statistics: current backend PID',
proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
prorettype => 'int4', proargtypes => '', prosrc => 'pg_backend_pid' },
+{ oid => '3436', descr => 'statistics: current session ID',
+ proname => 'pg_session_id', provolatile => 's', proparallel => 'r',
+ prorettype => 'int4', proargtypes => '', prosrc => 'pg_session_id' },
{ oid => '1937', descr => 'statistics: PID of backend',
proname => 'pg_stat_get_backend_pid', provolatile => 's', proparallel => 'r',
prorettype => 'int4', proargtypes => 'int4',
diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h
index ffec029..fdf1854 100644
--- a/src/include/commands/prepare.h
+++ b/src/include/commands/prepare.h
@@ -56,5 +56,6 @@ extern TupleDesc FetchPreparedStatementResultDesc(PreparedStatement *stmt);
extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt);
extern void DropAllPreparedStatements(void);
+extern void DropSessionPreparedStatements(uint32 sessionId);
#endif /* PREPARE_H */
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index ef5528c..bb6d359 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -66,6 +66,7 @@ typedef struct
#include "datatype/timestamp.h"
#include "libpq/hba.h"
#include "libpq/pqcomm.h"
+#include "storage/latch.h"
typedef enum CAC_state
@@ -139,6 +140,12 @@ typedef struct Port
List *guc_options;
/*
+ * libpq communication state
+ */
+ void *pqcomm_state;
+ WaitEventSet *pqcomm_waitset;
+
+ /*
* Information that needs to be held during the authentication cycle.
*/
HbaLine *hba;
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 36baf6b..10ba28b 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -60,7 +60,12 @@ extern int StreamConnection(pgsocket server_fd, Port *port);
extern void StreamClose(pgsocket sock);
extern void TouchSocketFiles(void);
extern void RemoveSocketFiles(void);
-extern void pq_init(void);
+extern void *pq_init(MemoryContext mcxt);
+extern void pq_reset(void);
+extern void pq_set_current_state(void *state, Port *port, WaitEventSet *set);
+extern WaitEventSet *pq_get_current_waitset(void);
+extern WaitEventSet *pq_create_backend_event_set(MemoryContext mcxt,
+ Port *port, bool onlySock);
extern int pq_getbytes(char *s, size_t len);
extern int pq_getstring(StringInfo s);
extern void pq_startmsgread(void);
@@ -71,6 +76,7 @@ extern int pq_getbyte(void);
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
extern int pq_putbytes(const char *s, size_t len);
+extern int pq_available_bytes(void);
/*
* prototypes for functions in be-secure.c
@@ -96,8 +102,6 @@ extern ssize_t secure_raw_write(Port *port, const void *ptr, size_t len);
extern bool ssl_loaded_verify_locations;
-extern WaitEventSet *FeBeWaitSet;
-
/* GUCs */
extern char *SSLCipherSuites;
extern char *SSLECDHCurve;
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index e167ee8..9652340 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -26,6 +26,7 @@
#include <signal.h>
#include "pgtime.h" /* for pg_time_t */
+#include "utils/palloc.h"
#define InvalidPid (-1)
@@ -150,6 +151,9 @@ extern PGDLLIMPORT bool IsUnderPostmaster;
extern PGDLLIMPORT bool IsBackgroundWorker;
extern PGDLLIMPORT bool IsBinaryUpgrade;
+extern PGDLLIMPORT bool RestartPoolerOnReload;
+extern PGDLLIMPORT char* DedicatedDatabases;
+
extern PGDLLIMPORT bool ExitOnAnyError;
extern PGDLLIMPORT char *DataDir;
@@ -158,10 +162,14 @@ extern PGDLLIMPORT int data_directory_mode;
extern PGDLLIMPORT int NBuffers;
extern PGDLLIMPORT int MaxBackends;
extern PGDLLIMPORT int MaxConnections;
+extern PGDLLIMPORT int MaxSessions;
+extern PGDLLIMPORT int SessionPoolSize;
+extern PGDLLIMPORT int SessionPoolPorts;
extern PGDLLIMPORT int max_worker_processes;
extern PGDLLIMPORT int max_parallel_workers;
extern PGDLLIMPORT int MyProcPid;
+extern PGDLLIMPORT uint32 MySessionId;
extern PGDLLIMPORT pg_time_t MyStartTime;
extern PGDLLIMPORT struct Port *MyProcPort;
extern PGDLLIMPORT struct Latch *MyLatch;
@@ -335,6 +343,9 @@ extern void SwitchBackToLocalLatch(void);
extern bool superuser(void); /* current user is superuser */
extern bool superuser_arg(Oid roleid); /* given user is superuser */
+/* in utils/init/postinit.c */
+void process_settings(Oid databaseid, Oid roleid);
+
/*****************************************************************************
* pmod.h -- *
@@ -425,6 +436,7 @@ extern void InitializeMaxBackends(void);
extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username,
Oid useroid, char *out_dbname, bool override_allow_connections);
extern void BaseInit(void);
+extern void PerformAuthentication(struct Port *port);
/* in utils/init/miscinit.c */
extern bool IgnoreSystemIndexes;
@@ -445,6 +457,9 @@ extern void process_session_preload_libraries(void);
extern void pg_bindtextdomain(const char *domain);
extern bool has_rolreplication(Oid roleid);
+void *GetLocalUserIdStateCopy(MemoryContext mcxt);
+void SetCurrentUserIdState(void *userId);
+
/* in access/transam/xlog.c */
extern bool BackupInProgress(void);
extern void CancelBackup(void);
diff --git a/src/include/port.h b/src/include/port.h
index 74a9dc4..ac53f3c 100644
--- a/src/include/port.h
+++ b/src/include/port.h
@@ -41,6 +41,10 @@ typedef SOCKET pgsocket;
extern bool pg_set_noblock(pgsocket sock);
extern bool pg_set_block(pgsocket sock);
+/* send/receive socket descriptor */
+extern int pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid);
+extern pgsocket pg_recv_sock(pgsocket chan);
+
/* Portable path handling for Unix/Win32 (in path.c) */
extern bool has_drive_prefix(const char *filename);
diff --git a/src/include/port/win32_port.h b/src/include/port/win32_port.h
index b398cd3..01971bc 100644
--- a/src/include/port/win32_port.h
+++ b/src/include/port/win32_port.h
@@ -447,6 +447,7 @@ extern int pgkill(int pid, int sig);
#define select(n, r, w, e, timeout) pgwin32_select(n, r, w, e, timeout)
#define recv(s, buf, len, flags) pgwin32_recv(s, buf, len, flags)
#define send(s, buf, len, flags) pgwin32_send(s, buf, len, flags)
+#define socketpair(af, type, protocol, socks) pgwin32_socketpair(af, type, protocol, socks)
SOCKET pgwin32_socket(int af, int type, int protocol);
int pgwin32_bind(SOCKET s, struct sockaddr *addr, int addrlen);
@@ -456,6 +457,7 @@ int pgwin32_connect(SOCKET s, const struct sockaddr *name, int namelen);
int pgwin32_select(int nfds, fd_set *readfs, fd_set *writefds, fd_set *exceptfds, const struct timeval *timeout);
int pgwin32_recv(SOCKET s, char *buf, int len, int flags);
int pgwin32_send(SOCKET s, const void *buf, int len, int flags);
+int pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2]);
const char *pgwin32_socket_strerror(int err);
int pgwin32_waitforsinglesocket(SOCKET s, int what, int timeout);
diff --git a/src/include/postmaster/connpool.h b/src/include/postmaster/connpool.h
new file mode 100644
index 0000000..45aa37c
--- /dev/null
+++ b/src/include/postmaster/connpool.h
@@ -0,0 +1,54 @@
+#ifndef CONN_POOL_H
+#define CONN_POOL_H
+
+#include "port.h"
+#include "libpq/libpq-be.h"
+
+#define MAX_CONNPOOL_WORKERS 100
+
+typedef enum
+{
+ CPW_FREE,
+ CPW_NEW_SOCKET,
+ CPW_PROCESSED
+} ConnPoolWorkerState;
+
+enum CAC_STATE;
+
+typedef struct ConnPoolWorker
+{
+ Port *port; /* port in the pool */
+ int pipes[2]; /* 0 for sending, 1 for receiving */
+
+ /* the communication procedure:
+ * ) find a worker with state == CPW_FREE
+ * ) assign client socket
+ * ) add pipe to wait set (if it's not there)
+ * ) wake up the worker.
+ * ) process data from the worker until state != CPW_PROCESSED
+ * ) set state to CPW_FREE
+ * ) fork or send socket and the data to backend.
+ *
+ * bgworker
+ * ) wokes up
+ * ) check the state
+ * ) if stats is CPW_NEW_SOCKET gets data from clientsock and
+ * send the data through pipe to postmaster.
+ * ) set state to CPW_PROCESSED.
+ */
+ volatile ConnPoolWorkerState state;
+ volatile CAC_state cac_state;
+ pid_t pid;
+ Latch *latch;
+} ConnPoolWorker;
+
+extern Size ConnPoolShmemSize(void);
+extern void ConnectionPoolWorkersInit(void);
+extern void RegisterConnPoolWorkers(void);
+extern void StartupPacketReaderMain(Datum arg);
+
+/* global variables */
+extern int NumConnPoolWorkers;
+extern ConnPoolWorker *ConnPoolWorkers;
+
+#endif
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index 1877eef..1f16836 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -62,6 +62,10 @@ extern Size ShmemBackendArraySize(void);
extern void ShmemBackendArrayAllocation(void);
#endif
+struct Port;
+extern int ProcessStartupPacket(struct Port *port, bool SSLdone,
+ MemoryContext memctx, int errlevel);
+
/*
* Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved
* for buffer references in buf_internals.h. This limitation could be lifted
diff --git a/src/include/storage/ipc.h b/src/include/storage/ipc.h
index 6a05a89..9cddaf9 100644
--- a/src/include/storage/ipc.h
+++ b/src/include/storage/ipc.h
@@ -72,6 +72,7 @@ extern void on_shmem_exit(pg_on_exit_callback function, Datum arg);
extern void before_shmem_exit(pg_on_exit_callback function, Datum arg);
extern void cancel_before_shmem_exit(pg_on_exit_callback function, Datum arg);
extern void on_exit_reset(void);
+extern void on_shmem_exit_reset(void);
/* ipci.c */
extern PGDLLIMPORT shmem_startup_hook_type shmem_startup_hook;
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index fd8735b..c7dd708 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -176,6 +176,8 @@ extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout,
extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
+extern void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd);
+
/*
* Unix implementation uses SIGUSR1 for inter-process signaling.
* Win32 doesn't need this.
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index cb613c8..f3c1079 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -21,6 +21,7 @@
#include "storage/lock.h"
#include "storage/pg_sema.h"
#include "storage/proclist_types.h"
+#include "utils/guc_tables.h"
/*
* Each backend advertises up to PGPROC_MAX_CACHED_SUBXIDS TransactionIds
@@ -276,6 +277,57 @@ extern PGDLLIMPORT PROC_HDR *ProcGlobal;
extern PGPROC *PreparedXactProcs;
+typedef struct SessionGUC
+{
+ struct SessionGUC *next;
+ config_var_value saved;
+ struct config_generic *var;
+} SessionGUC;
+
+/*
+ * Information associated with client session.
+ */
+typedef struct SessionContext
+{
+ uint32 magic; /* Magic to validate content of session object */
+ uint32 id; /* session identifier, unique across many backends */
+ /* Memory context used for global session data (instead of TopMemoryContext) */
+ MemoryContext memory;
+ struct Port* port; /* connection port */
+ Oid tempNamespace; /* temporary namespace */
+ Oid tempToastNamespace; /* temporary toast namespace */
+ SessionGUC *gucs; /* session local GUCs */
+ WaitEventSet *eventSet; /* Wait set for the session */
+ HTAB *prepared_queries; /* Session prepared queries */
+ HTAB *portals; /* Session portals */
+ void *userId; /* Current role state */
+ #define SessionVariable(type,name,init) type name;
+ #include "storage/sessionvars.h"
+} SessionContext;
+
+#define SessionVariable(type,name,init) extern type name;
+#include "storage/sessionvars.h"
+
+typedef struct Port Port;
+typedef struct BackendSessionPool
+{
+ MemoryContext mcxt;
+
+ WaitEventSet *waitEvents; /* Set of all sessions sockets */
+ uint32 sessionCount; /* Number of sessions */
+
+ /*
+ * Reference to the original port of this backend created when this backend
+ * was launched. Session using this port may be already terminated,
+ * but since it is allocated in TopMemoryContext, its content is still
+ * valid and is used as template for ports of new sessions
+ */
+ Port *backendPort;
+} BackendSessionPool;
+
+extern PGDLLIMPORT SessionContext *ActiveSession;
+extern PGDLLIMPORT BackendSessionPool *SessionPool;
+
/* Accessor for PGPROC given a pgprocno. */
#define GetPGProcByNumber(n) (&ProcGlobal->allProcs[(n)])
@@ -295,7 +347,7 @@ extern int StatementTimeout;
extern int LockTimeout;
extern int IdleInTransactionSessionTimeout;
extern bool log_lock_waits;
-
+extern bool IsDedicatedBackend;
/*
* Function Prototypes
@@ -321,6 +373,7 @@ extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock);
extern void CheckDeadLockAlert(void);
extern bool IsWaitingForLock(void);
extern void LockErrorCleanup(void);
+extern uint32 CreateSessionId(void);
extern void ProcWaitForSignal(uint32 wait_event_info);
extern void ProcSendSignal(int pid);
diff --git a/src/include/storage/sessionvars.h b/src/include/storage/sessionvars.h
new file mode 100644
index 0000000..690c56f
--- /dev/null
+++ b/src/include/storage/sessionvars.h
@@ -0,0 +1,13 @@
+/* SessionVariable(type,name,init) */
+SessionVariable(Oid, AuthenticatedUserId, InvalidOid)
+SessionVariable(Oid, SessionUserId, InvalidOid)
+SessionVariable(Oid, OuterUserId, InvalidOid)
+SessionVariable(Oid, CurrentUserId, InvalidOid)
+SessionVariable(bool, AuthenticatedUserIsSuperuser, false)
+SessionVariable(bool, SessionUserIsSuperuser, false)
+SessionVariable(int, SecurityRestrictionContext, 0)
+SessionVariable(bool, SetRoleIsActive, false)
+SessionVariable(Oid, last_roleid, InvalidOid)
+SessionVariable(bool, last_roleid_is_super, false)
+SessionVariable(struct SeqTableData*, last_used_seq, NULL)
+#undef SessionVariable
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 63b4e48..51d130c 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -31,9 +31,11 @@
#define STACK_DEPTH_SLOP (512 * 1024L)
extern CommandDest whereToSendOutput;
+
extern PGDLLIMPORT const char *debug_query_string;
extern int max_stack_depth;
extern int PostAuthDelay;
+extern pgsocket SessionPoolSock;
/* GUC-configurable parameters */
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index f462eab..338f0ec 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -395,6 +395,12 @@ extern Size EstimateGUCStateSpace(void);
extern void SerializeGUCState(Size maxsize, char *start_address);
extern void RestoreGUCState(void *gucstate);
+/* Session polling support function */
+struct SessionContext;
+extern void RestoreSessionGUCs(struct SessionContext* session);
+extern void ReleaseSessionGUCs(struct SessionContext* session);
+
+
/* Support for messages reported from GUC check hooks */
extern PGDLLIMPORT char *GUC_check_errmsg_string;
diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h
index 668d9ef..e3f2e5a 100644
--- a/src/include/utils/guc_tables.h
+++ b/src/include/utils/guc_tables.h
@@ -58,6 +58,7 @@ enum config_group
CONN_AUTH_SETTINGS,
CONN_AUTH_AUTH,
CONN_AUTH_SSL,
+ CONN_POOLING,
RESOURCES,
RESOURCES_MEM,
RESOURCES_DISK,
diff --git a/src/include/utils/portal.h b/src/include/utils/portal.h
index e4929b9..69ac10d 100644
--- a/src/include/utils/portal.h
+++ b/src/include/utils/portal.h
@@ -202,6 +202,7 @@ typedef struct PortalData
/* Prototypes for functions in utils/mmgr/portalmem.c */
+HTAB *CreatePortalsHashTable(MemoryContext mcxt);
extern void EnablePortalManager(void);
extern bool PreCommit_Portals(bool isPrepare);
extern void AtAbort_Portals(void);