So this is what I came up with on plane. Generalized the loading
functionality into load_library_function which that can load either
known postgres functions or call load_external_function.
I am not quite sure if fmgr.c is best place to put it, but I didn't want
to include stuff from executor in bgworker.c. I was thinking about
putting it to dfmgr.c (as that's where load_external_function) but again
seemed weird to including executor there.
As with previous patch, 9.6 will need quite different patch as we need
to keep compatibility there, but since I am traveling I think it's
better to share what I have so far.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel
index db9ac3d..b360887 100644
--- a/src/backend/access/transam/README.parallel
+++ b/src/backend/access/transam/README.parallel
@@ -198,7 +198,7 @@ pattern looks like this:
EnterParallelMode(); /* prohibit unsafe state changes */
- pcxt = CreateParallelContext(entrypoint, nworkers);
+ pcxt = CreateParallelContext("library_name", "function_name", nworkers);
/* Allow space for application-specific data here. */
shm_toc_estimate_chunk(&pcxt->estimator, size);
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index b3d3853..326d4f9 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -61,7 +61,7 @@
#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
-#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009)
+#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
/* Fixed-size parallel state. */
typedef struct FixedParallelState
@@ -77,9 +77,6 @@ typedef struct FixedParallelState
pid_t parallel_master_pid;
BackendId parallel_master_backend_id;
- /* Entrypoint for parallel workers. */
- parallel_worker_main_type entrypoint;
-
/* Mutex protects remaining fields. */
slock_t mutex;
@@ -109,7 +106,6 @@ static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
/* Private functions. */
static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
-static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
@@ -119,7 +115,8 @@ static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
* destroyed before exiting the current subtransaction.
*/
ParallelContext *
-CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
+CreateParallelContext(const char *library_name, const char *function_name,
+ int nworkers)
{
MemoryContext oldcontext;
ParallelContext *pcxt;
@@ -152,7 +149,8 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
pcxt = palloc0(sizeof(ParallelContext));
pcxt->subid = GetCurrentSubTransactionId();
pcxt->nworkers = nworkers;
- pcxt->entrypoint = entrypoint;
+ pcxt->library_name = pstrdup(library_name);
+ pcxt->function_name = pstrdup(function_name);
pcxt->error_context_stack = error_context_stack;
shm_toc_initialize_estimator(&pcxt->estimator);
dlist_push_head(&pcxt_list, &pcxt->node);
@@ -164,33 +162,6 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
}
/*
- * Establish a new parallel context that calls a function provided by an
- * extension. This works around the fact that the library might get mapped
- * at a different address in each backend.
- */
-ParallelContext *
-CreateParallelContextForExternalFunction(char *library_name,
- char *function_name,
- int nworkers)
-{
- MemoryContext oldcontext;
- ParallelContext *pcxt;
-
- /* We might be running in a very short-lived memory context. */
- oldcontext = MemoryContextSwitchTo(TopTransactionContext);
-
- /* Create the context. */
- pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers);
- pcxt->library_name = pstrdup(library_name);
- pcxt->function_name = pstrdup(function_name);
-
- /* Restore previous memory context. */
- MemoryContextSwitchTo(oldcontext);
-
- return pcxt;
-}
-
-/*
* Establish the dynamic shared memory segment for a parallel context and
* copy state and other bookkeeping information that will be needed by
* parallel workers into it.
@@ -249,15 +220,10 @@ InitializeParallelDSM(ParallelContext *pcxt)
pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
- /* Estimate how much we'll need for extension entrypoint info. */
- if (pcxt->library_name != NULL)
- {
- Assert(pcxt->entrypoint == ParallelExtensionTrampoline);
- Assert(pcxt->function_name != NULL);
- shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name)
- + strlen(pcxt->function_name) + 2);
- shm_toc_estimate_keys(&pcxt->estimator, 1);
- }
+ /* Estimate how much we'll need for the entrypoint info. */
+ shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name)
+ + strlen(pcxt->function_name) + 2);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
}
/*
@@ -297,7 +263,6 @@ InitializeParallelDSM(ParallelContext *pcxt)
fps->parallel_master_pgproc = MyProc;
fps->parallel_master_pid = MyProcPid;
fps->parallel_master_backend_id = MyBackendId;
- fps->entrypoint = pcxt->entrypoint;
SpinLockInit(&fps->mutex);
fps->last_xlog_end = 0;
shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
@@ -312,6 +277,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
char *asnapspace;
char *tstatespace;
char *error_queue_space;
+ char *entrypointstate;
+ Size lnamelen;
/* Serialize shared libraries we have loaded. */
libraryspace = shm_toc_allocate(pcxt->toc, library_len);
@@ -368,19 +335,18 @@ InitializeParallelDSM(ParallelContext *pcxt)
}
shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
- /* Serialize extension entrypoint information. */
- if (pcxt->library_name != NULL)
- {
- Size lnamelen = strlen(pcxt->library_name);
- char *extensionstate;
-
- extensionstate = shm_toc_allocate(pcxt->toc, lnamelen
- + strlen(pcxt->function_name) + 2);
- strcpy(extensionstate, pcxt->library_name);
- strcpy(extensionstate + lnamelen + 1, pcxt->function_name);
- shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE,
- extensionstate);
- }
+ /*
+ * Serialize extension entrypoint information. It's unsafe to pass
+ * function pointers across parallel processes as the function pointer
+ * may be different in new process in EXEC_BACKEND builds so we
+ * always pass library and function name.
+ */
+ lnamelen = strlen(pcxt->library_name);
+ entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen
+ + strlen(pcxt->function_name) + 2);
+ strcpy(entrypointstate, pcxt->library_name);
+ strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
}
/* Restore previous memory context. */
@@ -946,7 +912,11 @@ ParallelWorkerMain(Datum main_arg)
char *tsnapspace;
char *asnapspace;
char *tstatespace;
+ char *entrypointstate;
+ char *library_name;
+ char *function_name;
StringInfoData msgbuf;
+ parallel_worker_main_type entrypt;
/* Set flag to indicate that we're initializing a parallel worker. */
InitializingParallelWorker = true;
@@ -1077,6 +1047,15 @@ ParallelWorkerMain(Datum main_arg)
Assert(asnapspace != NULL);
PushActiveSnapshot(RestoreSnapshot(asnapspace));
+ /* Load the entry point. */
+ entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT);
+ Assert(entrypointstate != NULL);
+ library_name = entrypointstate;
+ function_name = entrypointstate + strlen(library_name) + 1;
+
+ entrypt = (parallel_worker_main_type)
+ load_library_function(library_name, function_name);
+
/*
* We've changed which tuples we can see, and must therefore invalidate
* system caches.
@@ -1102,11 +1081,8 @@ ParallelWorkerMain(Datum main_arg)
/*
* Time to do the real work: invoke the caller-supplied code.
- *
- * If you get a crash at this line, see the comments for
- * ParallelExtensionTrampoline.
*/
- fps->entrypoint(seg, toc);
+ entrypt(seg, toc);
/* Must exit parallel mode to pop active snapshot. */
ExitParallelMode();
@@ -1122,33 +1098,6 @@ ParallelWorkerMain(Datum main_arg)
}
/*
- * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a
- * function living in a dynamically loaded module, because the module might
- * not be loaded in every process, or might be loaded but not at the same
- * address. To work around that problem, CreateParallelContextForExtension()
- * arranges to call this function rather than calling the extension-provided
- * function directly; and this function then looks up the real entrypoint and
- * calls it.
- */
-static void
-ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc)
-{
- char *extensionstate;
- char *library_name;
- char *function_name;
- parallel_worker_main_type entrypt;
-
- extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE);
- Assert(extensionstate != NULL);
- library_name = extensionstate;
- function_name = extensionstate + strlen(library_name) + 1;
-
- entrypt = (parallel_worker_main_type)
- load_external_function(library_name, function_name, true, NULL);
- entrypt(seg, toc);
-}
-
-/*
* Update shared memory with the ending location of the last WAL record we
* wrote, if it's greater than the value already stored there.
*/
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 469a32c..49ea1e0 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -112,8 +112,7 @@ static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);
-/* Helper functions that run in the parallel worker. */
-static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
+/* Helper function that run in the parallel worker. */
static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
/*
@@ -393,7 +392,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
pstmt_data = ExecSerializePlan(planstate->plan, estate);
/* Create a parallel context. */
- pcxt = CreateParallelContext(ParallelQueryMain, nworkers);
+ pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
pei->pcxt = pcxt;
/*
@@ -814,7 +813,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
* to do this are also stored in the dsm_segment and can be accessed through
* the shm_toc.
*/
-static void
+void
ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
{
BufferUsage *buffer_usage;
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 0823317..0a9ca98 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -15,14 +15,11 @@
#include <unistd.h>
#include "libpq/pqsignal.h"
-#include "access/parallel.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/atomics.h"
#include "postmaster/bgworker_internals.h"
#include "postmaster/postmaster.h"
-#include "replication/logicallauncher.h"
-#include "replication/logicalworker.h"
#include "storage/dsm.h"
#include "storage/ipc.h"
#include "storage/latch.h"
@@ -111,27 +108,6 @@ struct BackgroundWorkerHandle
static BackgroundWorkerArray *BackgroundWorkerData;
/*
- * List of internal background workers. These are used for mapping the
- * function name to actual function when building with EXEC_BACKEND and also
- * to allow these to be loaded outside of shared_preload_libraries.
- */
-typedef struct InternalBGWorkerMain
-{
- char *bgw_function_name;
- bgworker_main_type bgw_main;
-} InternalBGWorkerMain;
-
-static const InternalBGWorkerMain InternalBGWorkers[] = {
- {"ParallelWorkerMain", ParallelWorkerMain},
- {"ApplyLauncherMain", ApplyLauncherMain},
- {"ApplyWorkerMain", ApplyWorkerMain},
- /* Dummy entry marking end of the array. */
- {NULL, NULL}
-};
-
-static bgworker_main_type GetInternalBgWorkerMain(BackgroundWorker *worker);
-
-/*
* Calculate shared memory needed.
*/
Size
@@ -776,18 +752,14 @@ StartBackgroundWorker(void)
}
/*
- * For internal workers set the entry point to known function address.
- * Otherwise use the entry point specified by library name (which will
- * be loaded, if necessary) and a function name (which will be looked up
- * in the named library).
+ * Load the function. For internal workers the function will be loaded
+ * according to known function map (see fmgr.c). Otherwise use the entry
+ * point specified by library name (which will be loaded, if necessary)
+ * and a function name (which will be looked up in the named library).
*/
- entrypt = GetInternalBgWorkerMain(worker);
-
- if (entrypt == NULL)
- entrypt = (bgworker_main_type)
- load_external_function(worker->bgw_library_name,
- worker->bgw_function_name,
- true, NULL);
+ entrypt = (bgworker_main_type)
+ load_library_function(worker->bgw_library_name,
+ worker->bgw_function_name);
/*
* Note that in normal processes, we would call InitPostgres here. For a
@@ -806,10 +778,11 @@ StartBackgroundWorker(void)
}
/*
- * Register a new background worker while processing shared_preload_libraries.
+ * Register a new static background worker.
*
- * This can only be called in the _PG_init function of a module library
- * that's loaded by shared_preload_libraries; otherwise it has no effect.
+ * This can only be called directly from postmaster or in the _PG_init
+ * function of a module library that's loaded by shared_preload_libraries;
+ * otherwise it will have no effect.
*/
void
RegisterBackgroundWorker(BackgroundWorker *worker)
@@ -822,7 +795,7 @@ RegisterBackgroundWorker(BackgroundWorker *worker)
(errmsg("registering background worker \"%s\"", worker->bgw_name)));
if (!process_shared_preload_libraries_in_progress &&
- GetInternalBgWorkerMain(worker) == NULL)
+ strcmp(worker->bgw_library_name, "postgres") != 0)
{
if (!IsUnderPostmaster)
ereport(LOG,
@@ -1152,28 +1125,3 @@ TerminateBackgroundWorker(BackgroundWorkerHandle *handle)
if (signal_postmaster)
SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE);
}
-
-/*
- * Search the known internal worker array and return its main function
- * pointer if found.
- *
- * Returns NULL if not known internal worker.
- */
-static bgworker_main_type
-GetInternalBgWorkerMain(BackgroundWorker *worker)
-{
- int i;
-
- /* Internal workers always have to use postgres as library name. */
- if (strncmp(worker->bgw_library_name, "postgres", BGW_MAXLEN) != 0)
- return NULL;
-
- for (i = 0; InternalBGWorkers[i].bgw_function_name; i++)
- {
- if (strncmp(InternalBGWorkers[i].bgw_function_name,
- worker->bgw_function_name, BGW_MAXLEN) == 0)
- return InternalBGWorkers[i].bgw_main;
- }
-
- return NULL;
-}
diff --git a/src/backend/utils/fmgr/fmgr.c b/src/backend/utils/fmgr/fmgr.c
index 68d2110..c23db6a 100644
--- a/src/backend/utils/fmgr/fmgr.c
+++ b/src/backend/utils/fmgr/fmgr.c
@@ -15,14 +15,18 @@
#include "postgres.h"
+#include "access/parallel.h"
#include "access/tuptoaster.h"
#include "catalog/pg_language.h"
#include "catalog/pg_proc.h"
+#include "executor/execParallel.h"
#include "executor/functions.h"
#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "pgstat.h"
+#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgrtab.h"
@@ -61,6 +65,25 @@ static void record_C_func(HeapTuple procedureTuple,
PGFunction user_fn, const Pg_finfo_record *inforec);
static Datum fmgr_security_definer(PG_FUNCTION_ARGS);
+/*
+ * These are used for mapping the function name to actual function when
+ * building with EXEC_BACKEND and also to allow bgworkers using these outside
+ * of shared_preload_libraries.
+ */
+typedef struct KnownFunctionPointer
+{
+ char *fn_name;
+ void *fn_addr;
+} KnownFunctionPointer;
+
+static const KnownFunctionPointer known_functions_map[] = {
+ {"ParallelWorkerMain", ParallelWorkerMain},
+ {"ParallelQueryMain", ParallelQueryMain},
+ {"ApplyLauncherMain", ApplyLauncherMain},
+ {"ApplyWorkerMain", ApplyWorkerMain},
+ /* Dummy entry marking end of the array. */
+ {NULL, NULL}
+};
/*
* Lookup routines for builtin-function table. We can search by either Oid
@@ -691,6 +714,36 @@ fmgr_security_definer(PG_FUNCTION_ARGS)
return result;
}
+/*
+ * This is similar to load_external_function (which is may call) but will
+ * also try to match funct name to internal known functions map when asked to
+ * load postgres function.
+ */
+void *
+load_library_function(char *libraryname, char *funcname)
+{
+ /*
+ * If the function is to be loaded from postgres itself, search the known
+ * functions map.
+ */
+ if (strcmp(libraryname, "postgres") == 0)
+ {
+ int i;
+
+ for (i = 0; known_functions_map[i].fn_name; i++)
+ {
+ if (strcmp(known_functions_map[i].fn_name,
+ funcname) == 0)
+ return known_functions_map[i].fn_addr;
+ }
+
+ /* We can only reach this by programming error. */
+ elog(ERROR, "internal function \"%s\" not found", funcname);
+ }
+
+ /* Otherwise load from external library. */
+ return load_external_function(libraryname, funcname, true, NULL);
+}
/*-------------------------------------------------------------------------
* Support routines for callers of fmgr-compatible functions
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 5065a38..b869727 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -52,8 +52,7 @@ extern bool InitializingParallelWorker;
#define IsParallelWorker() (ParallelWorkerNumber >= 0)
-extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
-extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers);
+extern ParallelContext *CreateParallelContext(const char *library_name, const char *function_name, int nworkers);
extern void InitializeParallelDSM(ParallelContext *pcxt);
extern void ReinitializeParallelDSM(ParallelContext *pcxt);
extern void LaunchParallelWorkers(ParallelContext *pcxt);
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 8bc4270..0b7ca59 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -38,4 +38,6 @@ extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
+extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
+
#endif /* EXECPARALLEL_H */
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index 0c695e2..2e41edf 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -667,6 +667,7 @@ extern bool get_fn_expr_arg_stable(FmgrInfo *flinfo, int argnum);
extern bool get_call_expr_arg_stable(fmNodePtr expr, int argnum);
extern bool get_fn_expr_variadic(FmgrInfo *flinfo);
extern bool CheckFunctionValidatorAccess(Oid validatorOid, Oid functionOid);
+extern void *load_library_function(char *libraryname, char *funcname);
/*
* Routines in dfmgr.c
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers