On Thu, Jan 26, 2017 at 6:04 PM, vinayak <pokale_vinayak...@lab.ntt.co.jp> wrote: > Hi Sawada-san, > > On 2017/01/26 16:51, Masahiko Sawada wrote: > > Thank you for reviewing! > > I think this is a bug of pg_fdw_resolver contrib module. I had > forgotten to change the SQL executed by pg_fdw_resolver process. > Attached latest version 002 patch. > > As previous version patch conflicts to current HEAD, attached updated > version patches. Also I fixed some bugs in pg_fdw_xact_resolver and > added some documentations. > Please review it. > > Thank you updating the patches. > > I have applied patches on Postgres HEAD. > I have created the postgres=fdw extension in PostgreSQL and then I got > segmentation fault. > Details: > =# 2017-01-26 17:52:56.156 JST [3411] LOG: worker process: foreign > transaction resolver launcher (PID 3418) was terminated by signal 11: > Segmentation fault > 2017-01-26 17:52:56.156 JST [3411] LOG: terminating any other active server > processes > 2017-01-26 17:52:56.156 JST [3425] WARNING: terminating connection because > of crash of another server process > 2017-01-26 17:52:56.156 JST [3425] DETAIL: The postmaster has commanded > this server process to roll back the current transaction and exit, because > another server process exited abnormally and possibly corrupted shared > memory. > 2017-01-26 17:52:56.156 JST [3425] HINT: In a moment you should be able to > reconnect to the database and repeat your command. > > Is this a bug? >
Thank you for testing! Sorry, I attached wrong version patch of pg_fdw_xact_resovler. Please use attached patch. Regards, -- Masahiko Sawada NIPPON TELEGRAPH AND TELEPHONE CORPORATION NTT Open Source Software Center
diff --git a/contrib/pg_fdw_xact_resolver/Makefile b/contrib/pg_fdw_xact_resolver/Makefile new file mode 100644 index 0000000..f8924f0 --- /dev/null +++ b/contrib/pg_fdw_xact_resolver/Makefile @@ -0,0 +1,15 @@ +# contrib/pg_fdw_xact_resolver/Makefile + +MODULES = pg_fdw_xact_resolver +PGFILEDESC = "pg_fdw_xact_resolver - foreign transaction resolver demon" + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/pg_fdw_xact_resolver +top_builddir = ../../ +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/pg_fdw_xact_resolver/pg_fdw_xact_resolver.c b/contrib/pg_fdw_xact_resolver/pg_fdw_xact_resolver.c new file mode 100644 index 0000000..c57de0a --- /dev/null +++ b/contrib/pg_fdw_xact_resolver/pg_fdw_xact_resolver.c @@ -0,0 +1,453 @@ +/* ------------------------------------------------------------------------- + * + * pg_fdw_xact_resolver.c + * + * Contrib module to launch foreign transaction resolver to resolve unresolved + * transactions prepared on foreign servers. + * + * The extension launches foreign transaction resolver launcher process as a + * background worker. The launcher then launches separate background worker + * process to resolve the foreign transaction in each database. The worker + * process simply connects to the database specified and calls pg_fdw_xact_resolve() + * function, which tries to resolve the transactions. The launcher process + * launches at most one worker at a time. + * + * Copyright (C) 2017, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/pg_fdw_xact_resolver/pg_fdw_xact_resolver.c + * + * ------------------------------------------------------------------------- + */ +#include "postgres.h" + +/* These are always necessary for a bgworker */ +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/shmem.h" + +/* these headers are used by this particular worker's code */ +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/xact.h" +#include "access/fdw_xact.h" +#include "catalog/pg_database.h" +#include "executor/spi.h" +#include "fmgr.h" +#include "lib/stringinfo.h" +#include "pgstat.h" +#include "utils/builtins.h" +#include "utils/snapmgr.h" +#include "utils/timestamp.h" +#include "tcop/utility.h" + +PG_MODULE_MAGIC; + +void _PG_init(void); + +/* + * Flags set by interrupt handlers of foreign transaction resolver for later + * service in the main loop. + */ +static volatile sig_atomic_t got_sighup = false; +static volatile sig_atomic_t got_sigterm = false; +static volatile sig_atomic_t got_sigquit = false; +static volatile sig_atomic_t got_sigusr1 = false; + +static void FDWXactResolver_worker_main(Datum dbid_datum); +static void FDWXactResolverMain(Datum main_arg); +static List *get_database_list(void); + +/* GUC variable */ +static int fx_resolver_naptime; + +/* + * Signal handler for SIGTERM + * Set a flag to let the main loop to terminate, and set our latch to wake + * it up. + */ +static void +FDWXactResolver_SIGTERM(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sigterm = true; + SetLatch(MyLatch); + + errno = save_errno; +} + +/* + * Signal handler for SIGQUIT + * Set a flag to let the main loop to terminate, and set our latch to wake + * it up. + */ +static void +FDWXactResolver_SIGQUIT(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sigquit = true; + SetLatch(MyLatch); + + errno = save_errno; +} +/* + * Signal handler for SIGHUP + * Set a flag to tell the main loop to reread the config file, and set + * our latch to wake it up. + */ +static void +FDWXactResolver_SIGHUP(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sighup = true; + SetLatch(MyLatch); + + errno = save_errno; +} + +static void +FDWXactResolver_SIGUSR1(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sigusr1 = true; + SetLatch(MyLatch); + + errno = save_errno; +} + +/* + * Entrypoint of this module. + * + * Launches the foreign transaction resolver demon. + */ +void +_PG_init(void) +{ + BackgroundWorker worker; + + if (!process_shared_preload_libraries_in_progress) + return; + + DefineCustomIntVariable("pg_fdw_xact_resolver.naptime", + "Time to sleep between pg_fdw_xact_resolver runs.", + NULL, + &fx_resolver_naptime, + 60, + 1, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, NULL, NULL); + + /* set up common data for all our workers */ + /* + * For some reason unless background worker set + * BGWORKER_BACKEND_DATABASE_CONNECTION, it's not added to BackendList and + * hence notification to this backend is not enabled. So set that flag even + * if the backend itself doesn't need database connection. + */ + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_restart_time = 5; + snprintf(worker.bgw_name, BGW_MAXLEN, "foreign transaction resolver launcher"); + worker.bgw_main = FDWXactResolverMain; + worker.bgw_main_arg = (Datum) 0;/* Craft some dummy arg. */ + worker.bgw_notify_pid = 0; + + RegisterBackgroundWorker(&worker); +} + +void +FDWXactResolverMain(Datum main_arg) +{ + /* For launching background worker */ + BackgroundWorker worker; + BackgroundWorkerHandle *handle = NULL; + pid_t pid; + List *dbid_list = NIL; + TimestampTz launched_time = GetCurrentTimestamp(); + TimestampTz next_launch_time = launched_time + (fx_resolver_naptime * 1000L); + + ereport(LOG, + (errmsg("pg_fdw_xact_resolver launcher started"))); + + /* Properly accept or ignore signals the postmaster might send us */ + pqsignal(SIGHUP, FDWXactResolver_SIGHUP); /* set flag to read config + * file */ + pqsignal(SIGTERM, FDWXactResolver_SIGTERM); /* request shutdown */ + pqsignal(SIGQUIT, FDWXactResolver_SIGQUIT); /* hard crash time */ + pqsignal(SIGUSR1, FDWXactResolver_SIGUSR1); + + /* Unblock signals */ + BackgroundWorkerUnblockSignals(); + + /* Initialize connection */ + BackgroundWorkerInitializeConnection(NULL, NULL); + + /* + * Main loop: do this until the SIGTERM handler tells us to terminate + */ + while (!got_sigterm) + { + int rc; + int naptime_msec; + TimestampTz current_time = GetCurrentTimestamp(); + + /* Determine sleep time */ + naptime_msec = (fx_resolver_naptime * 1000L) - (current_time - launched_time); + + if (naptime_msec < 0) + naptime_msec = 0; + + /* + * Background workers mustn't call usleep() or any direct equivalent: + * instead, they may wait on their process latch, which sleeps as + * necessary, but is awakened if postmaster dies. That way the + * background process goes away immediately in an emergency. + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + naptime_msec, + WAIT_EVENT_PG_SLEEP); + ResetLatch(MyLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + /* + * Postmaster wants to stop this process. Exit with non-zero code, so + * that the postmaster starts this process again. The worker processes + * will receive the signal and end themselves. This process will restart + * them if necessary. + */ + if (got_sigquit) + proc_exit(2); + + /* In case of a SIGHUP, just reload the configuration */ + if (got_sighup) + { + got_sighup = false; + ProcessConfigFile(PGC_SIGHUP); + } + + if (got_sigusr1) + { + got_sigusr1 = false; + + /* If we had started a worker check whether it completed */ + if (handle) + { + BgwHandleStatus status; + + status = GetBackgroundWorkerPid(handle, &pid); + if (status == BGWH_STOPPED) + handle = NULL; + } + } + + current_time = GetCurrentTimestamp(); + + /* + * If no background worker is running, we can start one if there are + * unresolved foreign transactions. + */ + if (!handle && + TimestampDifferenceExceeds(next_launch_time, current_time, naptime_msec)) + { + Oid dbid; + + /* Get the database list if empty*/ + if (!dbid_list) + dbid_list = get_database_list(); + + /* Launch a worker if dbid_list has database */ + if (dbid_list) + { + /* Work on the first dbid, and remove it from the list */ + dbid = linitial_oid(dbid_list); + dbid_list = list_delete_oid(dbid_list, dbid); + + Assert(OidIsValid(dbid)); + + /* Start the foreign transaction resolver */ + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + + /* We will start another worker if needed */ + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = FDWXactResolver_worker_main; + snprintf(worker.bgw_name, BGW_MAXLEN, "foreign transaction resolver (dbid %u)", dbid); + worker.bgw_main_arg = ObjectIdGetDatum(dbid); + + /* set bgw_notify_pid so that we can wait for it to finish */ + worker.bgw_notify_pid = MyProcPid; + + RegisterDynamicBackgroundWorker(&worker, &handle); + } + + /* Set next launch time */ + launched_time = current_time; + next_launch_time = TimestampTzPlusMilliseconds(launched_time, + fx_resolver_naptime * 1000L); + } + } + + /* Time to exit */ + ereport(LOG, + (errmsg("foreign transaction resolver shutting down"))); + + proc_exit(0); /* done */ +} + +/* FDWXactWorker_SIGTERM + * Terminates the foreign transaction resolver worker process */ +static void +FDWXactWorker_SIGTERM(SIGNAL_ARGS) +{ + /* Just terminate the current process */ + proc_exit(1); +} + +/* Per database foreign transaction resolver */ +static void +FDWXactResolver_worker_main(Datum dbid_datum) +{ + char *command = "SELECT * FROM pg_fdw_xact_resolve() WHERE status = 'resolved'"; + Oid dbid = DatumGetObjectId(dbid_datum); + int ret; + + /* + * This background worker does not loop infinitely, so we need handler only + * for SIGTERM, in which case the process should just exit quickly. + */ + pqsignal(SIGTERM, FDWXactWorker_SIGTERM); + pqsignal(SIGQUIT, FDWXactWorker_SIGTERM); + + /* Unblock signals */ + BackgroundWorkerUnblockSignals(); + + /* + * Run this background worker in superuser mode, so that all the foreign + * server and user information isaccessible. + */ + BackgroundWorkerInitializeConnectionByOid(dbid, InvalidOid); + + /* + * Start a transaction on which we can call resolver function. + * Note that each StartTransactionCommand() call should be preceded by a + * SetCurrentStatementStartTimestamp() call, which sets both the time + * for the statement we're about the run, and also the transaction + * start time. Also, each other query sent to SPI should probably be + * preceded by SetCurrentStatementStartTimestamp(), so that statement + * start time is always up to date. + * + * The SPI_connect() call lets us run queries through the SPI manager, + * and the PushActiveSnapshot() call creates an "active" snapshot + * which is necessary for queries to have MVCC data to work on. + * + * The pgstat_report_activity() call makes our activity visible + * through the pgstat views. + */ + SetCurrentStatementStartTimestamp(); + StartTransactionCommand(); + SPI_connect(); + PushActiveSnapshot(GetTransactionSnapshot()); + pgstat_report_activity(STATE_RUNNING, command); + + /* Run the resolver function */ + ret = SPI_execute(command, false, 0); + + if (ret < 0) + elog(LOG, "error running pg_fdw_xact_resolve() within database %d", + dbid); + + if (SPI_processed > 0) + ereport(LOG, + (errmsg("resolved %lu foreign transactions", SPI_processed))); + + /* + * And finish our transaction. + */ + SPI_finish(); + PopActiveSnapshot(); + CommitTransactionCommand(); + pgstat_report_activity(STATE_IDLE, NULL); + + /* Done exit now */ + proc_exit(0); +} + +/* Get database list */ +static List * +get_database_list(void) +{ + List *dblist = NIL; + ListCell *cell; + ListCell *next; + ListCell *prev = NULL; + HeapScanDesc scan; + HeapTuple tup; + Relation rel; + MemoryContext resultcxt; + + /* This is the context that we will allocate our output data in */ + resultcxt = CurrentMemoryContext; + + SetCurrentStatementStartTimestamp(); + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + + rel = heap_open(DatabaseRelationId, AccessShareLock); + scan = heap_beginscan_catalog(rel, 0, NULL); + + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + MemoryContext oldcxt; + + /* + * Allocate our results in the caller's context, not the + * transaction's. We do this inside the loop, and restore the original + * context at the end, so that leaky things like heap_getnext() are + * not called in a potentially long-lived context. + */ + oldcxt = MemoryContextSwitchTo(resultcxt); + dblist = lappend_oid(dblist, HeapTupleGetOid(tup)); + MemoryContextSwitchTo(oldcxt); + } + + heap_endscan(scan); + heap_close(rel, AccessShareLock); + + CommitTransactionCommand(); + + /* + * Check if database has foreign transaction entry. Delete entry + * from the list if the database has. + */ + for (cell = list_head(dblist); cell != NULL; cell = next) + { + Oid dbid = lfirst_oid(cell); + bool exists; + + next = lnext(cell); + + exists = fdw_xact_exists(InvalidTransactionId, dbid, InvalidOid, InvalidOid); + + if (!exists) + dblist = list_delete_cell(dblist, cell, prev); + else + prev = cell; + } + + return dblist; +} diff --git a/doc/src/sgml/contrib.sgml b/doc/src/sgml/contrib.sgml index c8708ec..e2048ee 100644 --- a/doc/src/sgml/contrib.sgml +++ b/doc/src/sgml/contrib.sgml @@ -127,6 +127,7 @@ CREATE EXTENSION <replaceable>module_name</> FROM unpackaged; &passwordcheck; &pgbuffercache; &pgcrypto; + &pg-fdw-xact-resolver &pgfreespacemap; &pgprewarm; &pgrowlocks; diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml index 2624c62..79d076c 100644 --- a/doc/src/sgml/filelist.sgml +++ b/doc/src/sgml/filelist.sgml @@ -133,6 +133,7 @@ <!ENTITY passwordcheck SYSTEM "passwordcheck.sgml"> <!ENTITY pgbuffercache SYSTEM "pgbuffercache.sgml"> <!ENTITY pgcrypto SYSTEM "pgcrypto.sgml"> +<!ENTITY pg-fdw-xact-resolver SYSTEM "pg-fdw-xact-resolver.sgml"> <!ENTITY pgfreespacemap SYSTEM "pgfreespacemap.sgml"> <!ENTITY pgprewarm SYSTEM "pgprewarm.sgml"> <!ENTITY pgrowlocks SYSTEM "pgrowlocks.sgml"> diff --git a/doc/src/sgml/pg-fdw-xact-resolver.sgml b/doc/src/sgml/pg-fdw-xact-resolver.sgml new file mode 100644 index 0000000..b47073c --- /dev/null +++ b/doc/src/sgml/pg-fdw-xact-resolver.sgml @@ -0,0 +1,60 @@ +<!-- doc/src/sgml/pg-fdw-xact-resolver.sgml --> + +<sect1 id="pg-fdw-xact-resolver" xreflabel="pg_fdw_xact_resolver"> + <title>pg_fdw_xact_resolver</title> + + <indexterm zone="pg-fdw-xact-resolver"> + <primary>pg_fdw_xact_resolver</primary> + </indexterm> + + <para> + The <filename>pg_fdw_xact_resolver</> module launches foreign transaction + resolver process to resolve unresolved transactions prepared on foreign + servers. + </para> + + <para> + The transaction involving multiple foreign servers uses two-phase-commit + protocol when transaction commits. Any crash or connection failure + after transaction prepared but before commit leaves the preapred transaction + in unresolved state. To resolve such a dandling transaction, we need to + call <function>pg_fdw_xact_resolve</function>. + </para> + + <para> + The foreign transaction resolver process launches separate bacground + worker process to resolve the dangling forign transaction in each + database. The process simply connects to the database as needed and + callls <function>pg_fdw_xact_resolve</function>. The launcher process + launches at most one worker at a time. + </para> + + <sect2> + <title>Configuration Parameters</title> + + <variablelist> + <varlistentry> + <term> + <varname>pg_fdw_xact_resovler.naptime</varname> (<type>integer</type>) + </term> + + <listitem> + <para> + Specifies the minimum delay between foreign transaction resolver runs + on any given database. The dealy is measured in seconds, and the + default is one minute (1min). This parameter can only be set in + the <filename>postgresql.conf</filename> file of on the server + command line. + </para> + </listitem> + </varlistentry> + </variablelist> + </sect2> + + <sect2> + <title>Author</title> + <para> + Ahutosh Bapat <email>ashutosh.ba...@enterprisedb.com</email>, Masahiko Sawada + <email>sawada.m...@gmail.com</email> + </para> + </sect2>
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers