
PostgresProffesional cluster teams wants to propose new version of eXtensible Transaction Manager API.
Previous discussion concerning this patch can be found here:


The API patch itself is small enough, but we think that it will be strange to provide just API without examples of its usage.

We have implemented various implementations of distributed transaction manager based on this API: pg_dtm (based ion snapshot sharing) and pg_tsdtm (CSN based on local system time). Based on this two DTM implementation we have developed various "cluster" implementations: multimaster+pg_dtm, multimaster+pg_tsdtm, pg_shard+pg_dtm, pg_shard+pg_tsdtm, postgres_fdw+pg_dtm, postgres_fdw+pg+tsdtm,... Multimaster is based on logical replication is something like BDR but synchronous: provide consistency across cluster.

But we want to make this patch as small as possible.
So we decided to include in it only pg_tsdtm and patch of postgres_fdw allowing to use it with pg_tsdtm. pg_tsdtm is simpler than pg_dtm because last one includes arbiter with RAFT protocol (centralized service)
and sockhub for efficient multiplexing backend connections.
Also, in theory, pg_tsdtm provides better scalability, because it is decentralized.

Architecture of DTM and tsDTM as well as benchmark results are available at WiKi page:


Please notice pg-tsdtm is just reference implementation of DTM using this XTM API. The primary idea of this patch is to add XTM API to PostgreSQL code, allowing to implement own transaction managers as Postgres extension. So please review first of all XTM API itself and not pg_tsdtm which is just and example of its usage.

The complete PostgreSQL branch with all our changes can be found here:


-- Konstantin Knizhnik Postgres Professional: http://www.postgrespro.com The Russian Postgres Company
diff --git a/contrib/pg_tsdtm/Makefile b/contrib/pg_tsdtm/Makefile
new file mode 100644
index 0000000..e70dffc
--- /dev/null
+++ b/contrib/pg_tsdtm/Makefile
@@ -0,0 +1,20 @@
+MODULE_big = pg_tsdtm
+OBJS = pg_tsdtm.o
+EXTENSION = pg_tsdtm
+DATA = pg_tsdtm--1.0.sql
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+subdir = contrib/pg_tsdtm
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+	env DESTDIR='$(abs_top_builddir)'/tmp_install make install
+	$(prove_check)
diff --git a/contrib/pg_tsdtm/dtm_recovery/dtm_recovery.cpp b/contrib/pg_tsdtm/dtm_recovery/dtm_recovery.cpp
new file mode 100644
index 0000000..38285be
--- /dev/null
+++ b/contrib/pg_tsdtm/dtm_recovery/dtm_recovery.cpp
@@ -0,0 +1,129 @@
+#include <iostream>
+#include <string>
+#include <vector>
+#include <set>
+#include <pqxx/connection>
+#include <pqxx/transaction>
+#include <pqxx/nontransaction>
+using namespace std;
+using namespace pqxx;
+int main (int argc, char* argv[])
+    if (argc == 1){
+        printf("Use -h to show usage options\n");
+        return 1;
+    }
+    vector<string> connections;
+    set<string> prepared_xacts;
+    set<string> committed_xacts;
+    bool verbose = false;
+    for (int i = 1; i < argc; i++) {
+        if (argv[i][0] == '-') {
+            switch (argv[i][1]) {
+              case 'C':
+              case 'c':
+                connections.push_back(string(argv[++i]));
+                continue;
+              case 'v':
+                verbose = true;
+                continue;
+            }
+        }
+        printf("Perform recovery of pg_tsdtm cluster.\n"
+               "Usage: dtm_recovery {options}\n"
+               "Options:\n"
+               "\t-c STR\tdatabase connection string\n"
+               "\t-v\tverbose mode: print extra information while processing\n");
+        return 1;
+    }
+    if (verbose) {
+        cout << "Collecting information about prepared transactions...\n";
+    }
+    for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
+    {
+        if (verbose) {
+            cout << "Connecting to " << *ic << "...\n";
+        }
+        connection con(*ic);
+        work txn(con);
+        result r = txn.exec("select gid from pg_prepared_xacts");
+        for (result::const_iterator it = r.begin(); it != r.end(); ++it)
+        {
+            string gid = it.at("gid").as(string());
+            prepared_xacts.insert(gid);
+        }
+        txn.commit();
+    }
+    if (verbose) {
+        cout << "Prepared transactions: ";
+        for (set<string>::iterator it = prepared_xacts.begin(); it != prepared_xacts.end(); ++it)
+        {
+            cout << *it << ", ";
+        }
+        cout << "\nChecking which of them are committed...\n";
+    }
+    for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
+    {
+        if (verbose) {
+            cout << "Connecting to " << *ic << "...\n";
+        }
+        connection con(*ic);
+        work txn(con);
+        con.prepare("commit-check", "select * from pg_committed_xacts where gid=$1");
+        for (set<string>::iterator it = prepared_xacts.begin(); it != prepared_xacts.end(); ++it)
+        {
+            string gid = *it;
+            result r = txn.prepared("commit-check")(gid).exec();
+            if (!r.empty()) {
+                committed_xacts.insert(gid);
+            }
+        }
+        txn.commit();
+    }
+    if (verbose) {
+        cout << "Committed transactions: ";
+        for (set<string>::iterator it = committed_xacts.begin(); it != committed_xacts.end(); ++it)
+        {
+            cout << *it << ", ";
+        }
+        cout << "\nCommitting them at all nodes...\n";
+    }
+    for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
+    {
+        if (verbose) {
+            cout << "Connecting to " << *ic << "...\n";
+        }
+        connection con(*ic);
+        work txn(con);
+        con.prepare("commit-check", "select * from pg_committed_xacts where gid=$1");
+        con.prepare("commit-prepared", "commit prepared $1");
+        con.prepare("rollback-prepared", "rollback prepared $1");
+        result r = txn.exec("select gid from pg_prepared_xacts");
+        for (result::const_iterator it = r.begin(); it != r.end(); ++it)
+        {
+            string gid = it.at("gid").as(string());
+            result rc = txn.prepared("commit-check")(gid).exec();
+            if (rc.empty()) {
+                if (committed_xacts.find(gid) != committed_xacts.end()) {
+                    if (verbose) {
+                        cout << "Commit transaction " << gid << "\n";
+                    }
+                    txn.prepared("commit-prepared")(gid);
+                } else {
+                    if (verbose) {
+                        cout << "Rollback transaction " << gid << "\n";
+                    }
+                    txn.prepared("rollback-prepared")(gid);
+                }
+            }
+        }
+        txn.commit();
+    }
+    if (verbose) {
+        cout << "Recovery completed\n";
+    }
+    return 0;
diff --git a/contrib/pg_tsdtm/dtm_recovery/makefile b/contrib/pg_tsdtm/dtm_recovery/makefile
new file mode 100644
index 0000000..4d12c0b
--- /dev/null
+++ b/contrib/pg_tsdtm/dtm_recovery/makefile
@@ -0,0 +1,10 @@
+CXXFLAGS=-g -Wall -O0 -pthread
+all: dtm_recovery
+dtm_recovery: dtm_recovery.cpp
+	$(CXX) $(CXXFLAGS) -o dtm_recovery dtm_recovery.cpp -lpqxx
+	rm -f dtm_recovery
diff --git a/contrib/pg_tsdtm/pg_tsdtm--1.0.sql b/contrib/pg_tsdtm/pg_tsdtm--1.0.sql
new file mode 100644
index 0000000..dcd81ac
--- /dev/null
+++ b/contrib/pg_tsdtm/pg_tsdtm--1.0.sql
@@ -0,0 +1,26 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_dtm" to load this file. \quit
+CREATE FUNCTION dtm_extend(gtid cstring default null) RETURNS bigint
+AS 'MODULE_PATHNAME','dtm_extend'
+CREATE FUNCTION dtm_access(snapshot bigint, gtid cstring default null) RETURNS bigint
+AS 'MODULE_PATHNAME','dtm_access'
+CREATE FUNCTION dtm_begin_prepare(gtid cstring) RETURNS void
+AS 'MODULE_PATHNAME','dtm_begin_prepare'
+CREATE FUNCTION dtm_prepare(gtid cstring, csn bigint) RETURNS bigint
+AS 'MODULE_PATHNAME','dtm_prepare'
+CREATE FUNCTION dtm_end_prepare(gtid cstring, csn bigint) RETURNS void
+AS 'MODULE_PATHNAME','dtm_end_prepare'
+CREATE FUNCTION dtm_get_csn(xid integer) RETURNS bigint
+AS 'MODULE_PATHNAME','dtm_get_csn'
diff --git a/contrib/pg_tsdtm/pg_tsdtm.c b/contrib/pg_tsdtm/pg_tsdtm.c
new file mode 100644
index 0000000..6dabe76
--- /dev/null
+++ b/contrib/pg_tsdtm/pg_tsdtm.c
@@ -0,0 +1,1021 @@
+ * pg_dtm.c
+ *
+ * Pluggable distributed transaction manager
+ *
+ */
+#include <unistd.h>
+#include <sys/time.h>
+#include <time.h>
+#include "postgres.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/s_lock.h"
+#include "storage/spin.h"
+#include "storage/lmgr.h"
+#include "storage/shmem.h"
+#include "storage/ipc.h"
+#include "access/xlogdefs.h"
+#include "access/xact.h"
+#include "access/xtm.h"
+#include "access/transam.h"
+#include "access/subtrans.h"
+#include "access/xlog.h"
+#include "access/clog.h"
+#include "access/twophase.h"
+#include "executor/spi.h"
+#include "utils/hsearch.h"
+#include "utils/tqual.h"
+#include <utils/guc.h>
+#include "pg_tsdtm.h"
+#define DTM_HASH_INIT_SIZE	1000000
+#define INVALID_CID    0
+#define MIN_WAIT_TIMEOUT 1000
+#define MAX_WAIT_TIMEOUT 100000
+#define MAX_GTID_SIZE  16
+#define USEC 1000000
+typedef uint64 timestamp_t;
+/* Distributed transaction state kept in shared memory */
+typedef struct DtmTransStatus
+	TransactionId xid;
+	XidStatus	status;
+	int			nSubxids;
+	cid_t		cid;			/* CSN */
+	struct DtmTransStatus *next;/* pointer to next element in finished
+								 * transaction list */
+}	DtmTransStatus;
+/* State of DTM node */
+typedef struct
+	cid_t		cid;			/* last assigned CSN; used to provide unique
+								 * ascending CSNs */
+	TransactionId oldest_xid;	/* XID of oldest transaction visible by any
+								 * active transaction (local or global) */
+	long		time_shift;		/* correction to system time */
+	volatile slock_t lock;		/* spinlock to protect access to hash table  */
+	DtmTransStatus *trans_list_head;	/* L1 list of finished transactions
+										 * present in xid2status hash table.
+										 * This list is used to perform
+										 * cleanup of too old transactions */
+	DtmTransStatus **trans_list_tail;
+}	DtmNodeState;
+/* Structure used to map global transaction identifier to XID */
+typedef struct
+	char		gtid[MAX_GTID_SIZE];
+	TransactionId xid;
+	TransactionId *subxids;
+	int			nSubxids;
+}	DtmTransId;
+#define DTM_TRACE(x)
+/* #define DTM_TRACE(x) fprintf x */
+static shmem_startup_hook_type prev_shmem_startup_hook;
+static HTAB *xid2status;
+static HTAB *gtid2xid;
+static DtmNodeState *local;
+static DtmCurrentTrans dtm_tx;
+static uint64 totalSleepInterrupts;
+static int	DtmVacuumDelay;
+static bool DtmRecordCommits;
+static Snapshot DtmGetSnapshot(Snapshot snapshot);
+static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum);
+static bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
+static TransactionId DtmAdjustOldestXid(TransactionId xid);
+static bool DtmDetectGlobalDeadLock(PGPROC *proc);
+static cid_t DtmGetCsn(TransactionId xid);
+static void DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids);
+static char const *DtmGetName(void);
+static TransactionManager DtmTM = {
+	PgTransactionIdGetStatus,
+	PgTransactionIdSetTreeStatus,
+	DtmGetSnapshot,
+	PgGetNewTransactionId,
+	DtmGetOldestXmin,
+	PgTransactionIdIsInProgress,
+	PgGetGlobalTransactionId,
+	DtmXidInMVCCSnapshot,
+	DtmDetectGlobalDeadLock,
+	DtmGetName
+void		_PG_init(void);
+void		_PG_fini(void);
+static void dtm_shmem_startup(void);
+static Size dtm_memsize(void);
+static void dtm_xact_callback(XactEvent event, void *arg);
+static timestamp_t dtm_get_current_time();
+static void dtm_sleep(timestamp_t interval);
+static cid_t dtm_get_cid();
+static cid_t dtm_sync(cid_t cid);
+ *	Time manipulation functions
+ */
+/* Get current time with microscond resolution */
+static timestamp_t
+	struct timeval tv;
+	gettimeofday(&tv, NULL);
+	return (timestamp_t) tv.tv_sec * USEC + tv.tv_usec + local->time_shift;
+/* Sleep for specified amount of time */
+static void
+dtm_sleep(timestamp_t interval)
+	struct timespec ts;
+	struct timespec rem;
+	ts.tv_sec = 0;
+	ts.tv_nsec = interval * 1000;
+	while (nanosleep(&ts, &rem) < 0)
+	{
+		totalSleepInterrupts += 1;
+		Assert(errno == EINTR);
+		ts = rem;
+	}
+/* Get unique ascending CSN.
+ * This function is called inside critical section
+ */
+static cid_t
+	cid_t		cid = dtm_get_current_time();
+	if (cid <= local->cid)
+	{
+		cid = ++local->cid;
+	}
+	else
+	{
+		local->cid = cid;
+	}
+	return cid;
+ * Adjust system time
+ */
+static cid_t
+dtm_sync(cid_t global_cid)
+	cid_t		local_cid;
+	while ((local_cid = dtm_get_cid()) < global_cid)
+	{
+		local->time_shift += global_cid - local_cid;
+	}
+	return local_cid;
+	DTM_TRACE((stderr, "DTM_PG_init \n"));
+	/*
+	 * In order to create our shared memory area, we have to be loaded via
+	 * shared_preload_libraries.  If not, fall out without hooking into any of
+	 * the main system.  (We don't throw error here because it seems useful to
+	 * allow the pg_stat_statements functions to be created even when the
+	 * module isn't active.  The functions must protect themselves against
+	 * being called then, however.)
+	 */
+	if (!process_shared_preload_libraries_in_progress)
+		return;
+	RequestAddinShmemSpace(dtm_memsize());
+	DefineCustomIntVariable(
+							"dtm.vacuum_delay",
+					"Minimal age of records which can be vacuumed (seconds)",
+							NULL,
+							&DtmVacuumDelay,
+							10,
+							1,
+							INT_MAX,
+							PGC_BACKEND,
+							0,
+							NULL,
+							NULL,
+							NULL
+		);
+	DefineCustomBoolVariable(
+							 "dtm.record_commits",
+							 "Store information about committed global transactions in pg_committed_xacts table",
+							 NULL,
+							 &DtmRecordCommits,
+							 false,
+							 PGC_BACKEND,
+							 0,
+							 NULL,
+							 NULL,
+							 NULL
+		);
+	/*
+	 * Install hooks.
+	 */
+	prev_shmem_startup_hook = shmem_startup_hook;
+	shmem_startup_hook = dtm_shmem_startup;
+ * Module unload callback
+ */
+	/* Uninstall hooks. */
+	shmem_startup_hook = prev_shmem_startup_hook;
+ * Estimate shared memory space needed.
+ */
+static Size
+	Size		size;
+	size = MAXALIGN(sizeof(DtmNodeState));
+	size = add_size(size, (sizeof(DtmTransId) + sizeof(DtmTransStatus) + HASH_PER_ELEM_OVERHEAD * 2) * DTM_HASH_INIT_SIZE);
+	return size;
+ * shmem_startup hook: allocate or attach to shared memory,
+ * then load any pre-existing statistics from file.
+ * Also create and load the query-texts file, which is expected to exist
+ * (even if empty) while the module is enabled.
+ */
+static void
+	if (prev_shmem_startup_hook)
+	{
+		prev_shmem_startup_hook();
+	}
+	DtmInitialize();
+static GlobalTransactionId
+	return GetLockedGlobalTransactionId();
+static void
+dtm_xact_callback(XactEvent event, void *arg)
+	DTM_TRACE((stderr, "Backend %d dtm_xact_callback %d\n", getpid(), event));
+	switch (event)
+	{
+			DtmLocalBegin(&dtm_tx);
+			break;
+			DtmLocalAbort(&dtm_tx);
+			DtmLocalEnd(&dtm_tx);
+			break;
+			DtmLocalCommit(&dtm_tx);
+			DtmLocalEnd(&dtm_tx);
+			break;
+			DtmLocalAbortPrepared(&dtm_tx, dtm_get_global_trans_id());
+			break;
+			DtmLocalCommitPrepared(&dtm_tx, dtm_get_global_trans_id());
+			break;
+			DtmLocalSavePreparedState(dtm_get_global_trans_id());
+			DtmLocalEnd(&dtm_tx);
+			break;
+		default:
+			break;
+	}
+ *	***************************************************************************
+ */
+	GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
+	cid_t		cid = DtmLocalExtend(&dtm_tx, gtid);
+	DTM_TRACE((stderr, "Backend %d extends transaction %u(%s) to global with cid=%lu\n", getpid(), dtm_tx.xid, gtid, cid));
+	PG_RETURN_INT64(cid);
+	cid_t		cid = PG_GETARG_INT64(0);
+	GlobalTransactionId gtid = PG_GETARG_CSTRING(1);
+	DTM_TRACE((stderr, "Backend %d joins transaction %u(%s) with cid=%lu\n", getpid(), dtm_tx.xid, gtid, cid));
+	cid = DtmLocalAccess(&dtm_tx, gtid, cid);
+	PG_RETURN_INT64(cid);
+	GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
+	DtmLocalBeginPrepare(gtid);
+	DTM_TRACE((stderr, "Backend %d begins prepare of transaction %s\n", getpid(), gtid));
+	GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
+	cid_t		cid = PG_GETARG_INT64(1);
+	cid = DtmLocalPrepare(gtid, cid);
+	DTM_TRACE((stderr, "Backend %d prepares transaction %s with cid=%lu\n", getpid(), gtid, cid));
+	PG_RETURN_INT64(cid);
+	GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
+	cid_t		cid = PG_GETARG_INT64(1);
+	DTM_TRACE((stderr, "Backend %d ends prepare of transactions %s with cid=%lu\n", getpid(), gtid, cid));
+	DtmLocalEndPrepare(gtid, cid);
+	TransactionId xid = PG_GETARG_INT32(0);
+	cid_t		csn = DtmGetCsn(xid);
+	PG_RETURN_INT64(csn);
+ *	***************************************************************************
+ */
+static uint32
+dtm_xid_hash_fn(const void *key, Size keysize)
+	return (uint32) *(TransactionId *) key;
+static int
+dtm_xid_match_fn(const void *key1, const void *key2, Size keysize)
+	return *(TransactionId *) key1 - *(TransactionId *) key2;
+static uint32
+dtm_gtid_hash_fn(const void *key, Size keysize)
+	GlobalTransactionId id = (GlobalTransactionId) key;
+	uint32		h = 0;
+	while (*id != 0)
+	{
+		h = h * 31 + *id++;
+	}
+	return h;
+static void *
+dtm_gtid_keycopy_fn(void *dest, const void *src, Size keysize)
+	return strcpy((char *) dest, (GlobalTransactionId) src);
+static int
+dtm_gtid_match_fn(const void *key1, const void *key2, Size keysize)
+	return strcmp((GlobalTransactionId) key1, (GlobalTransactionId) key2);
+static char const *
+	return "pg_tsdtm";
+static void
+DtmTransactionListAppend(DtmTransStatus * ts)
+	ts->next = NULL;
+	*local->trans_list_tail = ts;
+	local->trans_list_tail = &ts->next;
+static void
+DtmTransactionListInsertAfter(DtmTransStatus * after, DtmTransStatus * ts)
+	ts->next = after->next;
+	after->next = ts;
+	if (local->trans_list_tail == &after->next)
+	{
+		local->trans_list_tail = &ts->next;
+	}
+ * There can be different oldest XIDs at different cluster node.
+ * Seince we do not have centralized aribiter, we have to rely in DtmVacuumDelay.
+ * This function takes XID which PostgreSQL consider to be the latest and try to find XID which
+ * is older than it more than DtmVacuumDelay.
+ * If no such XID can be located, then return previously observed oldest XID
+ */
+static TransactionId
+DtmAdjustOldestXid(TransactionId xid)
+	if (TransactionIdIsValid(xid))
+	{
+		DtmTransStatus *ts,
+				   *prev = NULL;
+		timestamp_t now = dtm_get_current_time();
+		timestamp_t cutoff_time = now - DtmVacuumDelay * USEC;
+		SpinLockAcquire(&local->lock);
+		ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_FIND, NULL);
+		if (ts != NULL)
+		{
+			cutoff_time = ts->cid - DtmVacuumDelay * USEC;
+			for (ts = local->trans_list_head; ts != NULL && ts->cid < cutoff_time; prev = ts, ts = ts->next)
+			{
+				if (prev != NULL)
+					hash_search(xid2status, &prev->xid, HASH_REMOVE, NULL);
+			}
+		}
+		if (prev != NULL)
+		{
+			local->trans_list_head = prev;
+			local->oldest_xid = xid = prev->xid;
+		}
+		else
+		{
+			xid = local->oldest_xid;
+		}
+		SpinLockRelease(&local->lock);
+	}
+	return xid;
+DtmGetSnapshot(Snapshot snapshot)
+	snapshot = PgGetSnapshotData(snapshot);
+	RecentGlobalDataXmin = RecentGlobalXmin = DtmAdjustOldestXid(RecentGlobalDataXmin);
+	return snapshot;
+DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
+	TransactionId xmin = PgGetOldestXmin(rel, ignoreVacuum);
+	xmin = DtmAdjustOldestXid(xmin);
+	return xmin;
+ * Check tuple bisibility based on CSN of current transaction.
+ * If there is no niformation about transaction with this XID, then use standard PostgreSQL visibility rules.
+ */
+DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+	timestamp_t delay = MIN_WAIT_TIMEOUT;
+	Assert(xid != InvalidTransactionId);
+	SpinLockAcquire(&local->lock);
+	while (true)
+	{
+		DtmTransStatus *ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_FIND, NULL);
+		if (ts != NULL)
+		{
+			if (ts->cid > dtm_tx.snapshot)
+			{
+				DTM_TRACE((stderr, "%d: tuple with xid=%d(csn=%lld) is invisibile in snapshot %lld\n",
+						   getpid(), xid, ts->cid, dtm_tx.snapshot));
+				SpinLockRelease(&local->lock);
+				return true;
+			}
+			{
+				DTM_TRACE((stderr, "%d: wait for in-doubt transaction %u in snapshot %lu\n", getpid(), xid, dtm_tx.snapshot));
+				SpinLockRelease(&local->lock);
+				dtm_sleep(delay);
+				if (delay * 2 <= MAX_WAIT_TIMEOUT)
+					delay *= 2;
+				SpinLockAcquire(&local->lock);
+			}
+			else
+			{
+				bool		invisible = ts->status == TRANSACTION_STATUS_ABORTED;
+				DTM_TRACE((stderr, "%d: tuple with xid=%d(csn= %lld) is %s in snapshot %lld\n",
+						   getpid(), xid, ts->cid, invisible ? "rollbacked" : "committed", dtm_tx.snapshot));
+				SpinLockRelease(&local->lock);
+				return invisible;
+			}
+		}
+		else
+		{
+			DTM_TRACE((stderr, "%d: visibility check is skept for transaction %u in snapshot %lu\n", getpid(), xid, dtm_tx.snapshot));
+			break;
+		}
+	}
+	SpinLockRelease(&local->lock);
+	return PgXidInMVCCSnapshot(xid, snapshot);
+	bool		found;
+	static HASHCTL info;
+	info.keysize = sizeof(TransactionId);
+	info.entrysize = sizeof(DtmTransStatus);
+	info.hash = dtm_xid_hash_fn;
+	info.match = dtm_xid_match_fn;
+	xid2status = ShmemInitHash("xid2status",
+							   &info,
+	info.keysize = MAX_GTID_SIZE;
+	info.entrysize = sizeof(DtmTransId);
+	info.hash = dtm_gtid_hash_fn;
+	info.match = dtm_gtid_match_fn;
+	info.keycopy = dtm_gtid_keycopy_fn;
+	gtid2xid = ShmemInitHash("gtid2xid",
+							 &info,
+	TM = &DtmTM;
+	LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
+	local = (DtmNodeState *) ShmemInitStruct("dtm", sizeof(DtmNodeState), &found);
+	if (!found)
+	{
+		local->time_shift = 0;
+		local->oldest_xid = FirstNormalTransactionId;
+		local->cid = dtm_get_current_time();
+		local->trans_list_head = NULL;
+		local->trans_list_tail = &local->trans_list_head;
+		SpinLockInit(&local->lock);
+		RegisterXactCallback(dtm_xact_callback, NULL);
+	}
+	LWLockRelease(AddinShmemInitLock);
+ * Start transaction at local node.
+ * Associate local snapshot (current time) with this transaction.
+ */
+DtmLocalBegin(DtmCurrentTrans * x)
+	if (!TransactionIdIsValid(x->xid))
+	{
+		SpinLockAcquire(&local->lock);
+		x->xid = GetCurrentTransactionId();
+		Assert(TransactionIdIsValid(x->xid));
+		x->cid = INVALID_CID;
+		x->is_global = false;
+		x->is_prepared = false;
+		x->snapshot = dtm_get_cid();
+		SpinLockRelease(&local->lock);
+		DTM_TRACE((stderr, "DtmLocalBegin: transaction %u uses local snapshot %lu\n", x->xid, x->snapshot));
+	}
+ * Transaction is going to be distributed.
+ * Returns snapshot of current transaction.
+ */
+DtmLocalExtend(DtmCurrentTrans * x, GlobalTransactionId gtid)
+	if (gtid != NULL)
+	{
+		SpinLockAcquire(&local->lock);
+		{
+			DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_ENTER, NULL);
+			id->xid = x->xid;
+			id->nSubxids = 0;
+			id->subxids = 0;
+		}
+		SpinLockRelease(&local->lock);
+	}
+	x->is_global = true;
+	return x->snapshot;
+ * This function is executed on all nodes joining distributed transaction.
+ * global_cid is snapshot taken from node initiated this transaction
+ */
+DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
+	cid_t		local_cid;
+	SpinLockAcquire(&local->lock);
+	{
+		if (gtid != NULL)
+		{
+			DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_ENTER, NULL);
+			id->xid = x->xid;
+			id->nSubxids = 0;
+			id->subxids = 0;
+		}
+		local_cid = dtm_sync(global_cid);
+		x->snapshot = global_cid;
+		x->is_global = true;
+	}
+	SpinLockRelease(&local->lock);
+	if (global_cid < local_cid - DtmVacuumDelay * USEC)
+	{
+		elog(ERROR, "Too old snapshot: requested %ld, current %ld", global_cid, local_cid);
+	}
+	return global_cid;
+ * Set transaction status to in-doubt. Now all transactions accessing tuples updated by this transaction have to
+ * wait until it is either committed either aborted
+ */
+DtmLocalBeginPrepare(GlobalTransactionId gtid)
+	SpinLockAcquire(&local->lock);
+	{
+		DtmTransStatus *ts;
+		DtmTransId *id;
+		id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_FIND, NULL);
+		Assert(id != NULL);
+		Assert(TransactionIdIsValid(id->xid));
+		ts = (DtmTransStatus *) hash_search(xid2status, &id->xid, HASH_ENTER, NULL);
+		ts->cid = dtm_get_cid();
+		ts->nSubxids = id->nSubxids;
+		DtmTransactionListAppend(ts);
+		DtmAddSubtransactions(ts, id->subxids, id->nSubxids);
+	}
+	SpinLockRelease(&local->lock);
+ * Choose maximal CSN among all nodes.
+ * This function returns maximum of passed (global) and local (current time) CSNs.
+ */
+DtmLocalPrepare(GlobalTransactionId gtid, cid_t global_cid)
+	cid_t		local_cid;
+	SpinLockAcquire(&local->lock);
+	local_cid = dtm_get_cid();
+	if (local_cid > global_cid)
+	{
+		global_cid = local_cid;
+	}
+	SpinLockRelease(&local->lock);
+	return global_cid;
+ * Adjust system tiem according to the received maximal CSN
+ */
+DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
+	SpinLockAcquire(&local->lock);
+	{
+		DtmTransStatus *ts;
+		DtmTransId *id;
+		int			i;
+		id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_FIND, NULL);
+		Assert(id != NULL);
+		ts = (DtmTransStatus *) hash_search(xid2status, &id->xid, HASH_FIND, NULL);
+		Assert(ts != NULL);
+		ts->cid = cid;
+		for (i = 0; i < ts->nSubxids; i++)
+		{
+			ts = ts->next;
+			ts->cid = cid;
+		}
+		dtm_sync(cid);
+		DTM_TRACE((stderr, "Prepare transaction %u(%s) with CSN %lu\n", id->xid, gtid, cid));
+	}
+	SpinLockRelease(&local->lock);
+	/*
+	 * Record commit in pg_committed_xact table to be make it possible to
+	 * perform recovery in case of crash of some of cluster nodes
+	 */
+	if (DtmRecordCommits)
+	{
+		char		stmt[MAX_GTID_SIZE + 64];
+		int			rc;
+		sprintf(stmt, "insert into pg_committed_xacts values ('%s')", gtid);
+		SPI_connect();
+		rc = SPI_execute(stmt, true, 0);
+		SPI_finish();
+		if (rc != SPI_OK_INSERT)
+		{
+			elog(ERROR, "Failed to insert GTID %s in table pg_committed_xacts", gtid);
+		}
+	}
+ * Mark tranasction as prepared
+ */
+DtmLocalCommitPrepared(DtmCurrentTrans * x, GlobalTransactionId gtid)
+	Assert(gtid != NULL);
+	SpinLockAcquire(&local->lock);
+	{
+		DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_REMOVE, NULL);
+		Assert(id != NULL);
+		x->is_global = true;
+		x->is_prepared = true;
+		x->xid = id->xid;
+		free(id->subxids);
+		DTM_TRACE((stderr, "Global transaction %u(%s) is precommitted\n", x->xid, gtid));
+	}
+	SpinLockRelease(&local->lock);
+ * Set transaction status to committed
+ */
+DtmLocalCommit(DtmCurrentTrans * x)
+	SpinLockAcquire(&local->lock);
+	if (TransactionIdIsValid(x->xid))
+	{
+		bool		found;
+		DtmTransStatus *ts;
+		ts = (DtmTransStatus *) hash_search(xid2status, &x->xid, HASH_ENTER, &found);
+		if (x->is_prepared)
+		{
+			int			i;
+			DtmTransStatus *sts = ts;
+			Assert(found);
+			Assert(x->is_global);
+			for (i = 0; i < ts->nSubxids; i++)
+			{
+				sts = sts->next;
+				Assert(sts->cid == ts->cid);
+			}
+		}
+		else
+		{
+			TransactionId *subxids;
+			Assert(!found);
+			ts->cid = dtm_get_cid();
+			DtmTransactionListAppend(ts);
+			ts->nSubxids = xactGetCommittedChildren(&subxids);
+			DtmAddSubtransactions(ts, subxids, ts->nSubxids);
+		}
+		x->cid = ts->cid;
+		DTM_TRACE((stderr, "Local transaction %u is committed at %lu\n", x->xid, x->cid));
+	}
+	SpinLockRelease(&local->lock);
+ * Mark tranasction as prepared
+ */
+DtmLocalAbortPrepared(DtmCurrentTrans * x, GlobalTransactionId gtid)
+	Assert(gtid != NULL);
+	SpinLockAcquire(&local->lock);
+	{
+		DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_REMOVE, NULL);
+		Assert(id != NULL);
+		x->is_global = true;
+		x->is_prepared = true;
+		x->xid = id->xid;
+		free(id->subxids);
+		DTM_TRACE((stderr, "Global transaction %u(%s) is preaborted\n", x->xid, gtid));
+	}
+	SpinLockRelease(&local->lock);
+ * Set transaction status to aborted
+ */
+DtmLocalAbort(DtmCurrentTrans * x)
+	SpinLockAcquire(&local->lock);
+	{
+		bool		found;
+		DtmTransStatus *ts;
+		Assert(TransactionIdIsValid(x->xid));
+		ts = (DtmTransStatus *) hash_search(xid2status, &x->xid, HASH_ENTER, &found);
+		if (x->is_prepared)
+		{
+			Assert(found);
+			Assert(x->is_global);
+		}
+		else
+		{
+			Assert(!found);
+			ts->cid = dtm_get_cid();
+			ts->nSubxids = 0;
+			DtmTransactionListAppend(ts);
+		}
+		x->cid = ts->cid;
+		DTM_TRACE((stderr, "Local transaction %u is aborted at %lu\n", x->xid, x->cid));
+	}
+	SpinLockRelease(&local->lock);
+ * Cleanup dtm_tx structure
+ */
+DtmLocalEnd(DtmCurrentTrans * x)
+	x->is_global = false;
+	x->is_prepared = false;
+	x->xid = InvalidTransactionId;
+	x->cid = INVALID_CID;
+ * Now only timestapm based dealock detection is supported for pg_tsdtm.
+ * Please adjust "deadlock_timeout" parameter in postresql.conf to avoid false
+ * deadlock detection.
+ */
+DtmDetectGlobalDeadLock(PGPROC *proc)
+	elog(WARNING, "Global deadlock?");
+	return true;
+static cid_t
+DtmGetCsn(TransactionId xid)
+	cid_t		csn = 0;
+	SpinLockAcquire(&local->lock);
+	{
+		DtmTransStatus *ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_FIND, NULL);
+		if (ts != NULL)
+		{
+			csn = ts->cid;
+		}
+	}
+	SpinLockRelease(&local->lock);
+	return csn;
+ * Save state of parepared transaction
+ */
+DtmLocalSavePreparedState(GlobalTransactionId gtid)
+	if (gtid != NULL)
+	{
+		SpinLockAcquire(&local->lock);
+		{
+			DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_FIND, NULL);
+			if (id != NULL)
+			{
+				TransactionId *subxids;
+				int			nSubxids = xactGetCommittedChildren(&subxids);
+				if (nSubxids != 0)
+				{
+					id->subxids = (TransactionId *) malloc(nSubxids * sizeof(TransactionId));
+					id->nSubxids = nSubxids;
+					memcpy(id->subxids, subxids, nSubxids * sizeof(TransactionId));
+				}
+			}
+		}
+		SpinLockRelease(&local->lock);
+	}
+ * Add subtransactions to finished transactions list.
+ * Copy CSN and status of parent transaction.
+ */
+static void
+DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids)
+	int			i;
+	for (i = 0; i < nSubxids; i++)
+	{
+		bool		found;
+		DtmTransStatus *sts;
+		Assert(TransactionIdIsValid(subxids[i]));
+		sts = (DtmTransStatus *) hash_search(xid2status, &subxids[i], HASH_ENTER, &found);
+		Assert(!found);
+		sts->status = ts->status;
+		sts->cid = ts->cid;
+		sts->nSubxids = 0;
+		DtmTransactionListInsertAfter(ts, sts);
+	}
diff --git a/contrib/pg_tsdtm/pg_tsdtm.control b/contrib/pg_tsdtm/pg_tsdtm.control
new file mode 100644
index 0000000..f9b8215
--- /dev/null
+++ b/contrib/pg_tsdtm/pg_tsdtm.control
@@ -0,0 +1,4 @@
+comment = 'Pluggable distributed transaction manager'
+default_version = '1.0'
+module_pathname = '$libdir/pg_tsdtm'
+relocatable = true
\ No newline at end of file
diff --git a/contrib/pg_tsdtm/pg_tsdtm.h b/contrib/pg_tsdtm/pg_tsdtm.h
new file mode 100644
index 0000000..467038a
--- /dev/null
+++ b/contrib/pg_tsdtm/pg_tsdtm.h
@@ -0,0 +1,57 @@
+#ifndef DTM_BACKEND_H
+#define DTM_BACKEND_H
+typedef int nodeid_t;
+typedef uint64 cid_t;
+typedef struct
+	TransactionId xid;
+	bool		is_global;
+	bool		is_prepared;
+	cid_t		cid;
+	cid_t		snapshot;
+}	DtmCurrentTrans;
+typedef char const *GlobalTransactionId;
+/* Initialize DTM extension */
+void		DtmInitialize(void);
+/* Invoked at start of any local or global transaction */
+void		DtmLocalBegin(DtmCurrentTrans * x);
+/* Extend local transaction to global by assigning upper bound CSN which is returned to coordinator */
+cid_t		DtmLocalExtend(DtmCurrentTrans * x, GlobalTransactionId gtid);
+/* Function called at first access to any datanode except first one involved in distributed transaction */
+cid_t		DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t snapshot);
+/* Mark transaction as in-doubt */
+void		DtmLocalBeginPrepare(GlobalTransactionId gtid);
+/* Choose CSN for global transaction */
+cid_t		DtmLocalPrepare(GlobalTransactionId gtid, cid_t cid);
+/* Assign CSN to global transaction */
+void		DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid);
+/* Do local commit of global transaction */
+void		DtmLocalCommitPrepared(DtmCurrentTrans * x, GlobalTransactionId gtid);
+/* Do local abort of global transaction */
+void		DtmLocalAbortPrepared(DtmCurrentTrans * x, GlobalTransactionId gtid);
+/* Do local commit of global transaction */
+void		DtmLocalCommit(DtmCurrentTrans * x);
+/* Do local abort of global transaction */
+void		DtmLocalAbort(DtmCurrentTrans * x);
+/* Invoked at the end of any local or global transaction: free transaction state */
+void		DtmLocalEnd(DtmCurrentTrans * x);
+/* Save global preapred transactoin state */
+void		DtmLocalSavePreparedState(GlobalTransactionId gtid);
diff --git a/contrib/pg_tsdtm/t/001_distributed_transactions.pl b/contrib/pg_tsdtm/t/001_distributed_transactions.pl
new file mode 100644
index 0000000..b34895b
--- /dev/null
+++ b/contrib/pg_tsdtm/t/001_distributed_transactions.pl
@@ -0,0 +1,133 @@
+# Test of proper transaction isolation.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+use DBI();
+use DBD::Pg();
+sub query_row
+	my ($dbi, $sql, @keys) = @_;
+	my $sth = $dbi->prepare($sql) || die;
+	$sth->execute(@keys) || die;
+	my $ret = $sth->fetchrow_array;
+	print "query_row('$sql') -> $ret \n";
+	return $ret;
+sub query_exec
+	my ($dbi, $sql) = @_;
+	print "query_exec('$sql')\n";
+	my $rv = $dbi->do($sql) || die;
+	return $rv;
+sub PostgresNode::psql_ok {
+	my ($self, $sql, $comment) = @_;
+	$self->command_ok(['psql', '-A', '-t', '--no-psqlrc',
+		'-d', $self->connstr, '-c', $sql], $comment);
+sub PostgresNode::psql_fails {
+	my ($self, $sql, $comment) = @_;
+	$self->command_ok(['psql', '-A', '-t', '--no-psqlrc',
+		'-d', $self->connstr, '-c', $sql], $comment);
+# Setup nodes
+# Setup first node
+my $node1 = get_new_node("node1");
+$node1->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+shared_preload_libraries = 'pg_tsdtm'
+# Setup second node
+my $node2 = get_new_node("node2");
+$node2->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+shared_preload_libraries = 'pg_tsdtm'
+$node1->psql('postgres', "create extension pg_tsdtm;");
+$node1->psql('postgres', "create table t(u int primary key, v int)");
+$node1->psql('postgres', "insert into t (select generate_series(0, 9), 0)");
+$node2->psql('postgres', "create extension pg_tsdtm;");
+$node2->psql('postgres', "create table t(u int primary key, v int)");
+$node2->psql('postgres', "insert into t (select generate_series(0, 9), 0)");
+# we need two connections to each node (run two simultameous global tx)
+my $conn11 = DBI->connect('DBI:Pg:' . $node1->connstr('postgres'));
+my $conn21 = DBI->connect('DBI:Pg:' . $node2->connstr('postgres'));
+my $conn12 = DBI->connect('DBI:Pg:' . $node1->connstr('postgres'));
+my $conn22 = DBI->connect('DBI:Pg:' . $node2->connstr('postgres'));
+sub count_total
+	my ($c1, $c2) = @_;
+	query_exec($c1, "begin");
+	query_exec($c2, "begin");
+	my $snapshot = query_row($c1, "select dtm_extend()");
+	query_row($c2, "select dtm_access($snapshot)");
+	my $sum1 = query_row($c1, "select sum(v) from t");
+	my $sum2 = query_row($c2, "select sum(v) from t");
+	query_exec($c1, "commit");
+	query_exec($c2, "commit");
+	my $tot = $sum1 + $sum2;
+	print "total = $tot\n";
+	return $tot;
+# Sanity check on dirty reads
+my $gtid1 = "gtx1";
+# start global tx
+query_exec($conn11, "begin transaction");
+query_exec($conn21, "begin transaction");
+my $snapshot = query_row($conn11, "select dtm_extend('$gtid1')");
+query_exec($conn21, "select dtm_access($snapshot, '$gtid1')");
+# transfer some amount of integers to different node
+query_exec($conn11, "update t set v = v - 10 where u=1");
+my $intermediate_total = count_total($conn12, $conn22);
+query_exec($conn21, "update t set v = v + 10 where u=2");
+# commit our global tx
+query_exec($conn11, "prepare transaction '$gtid1'");
+query_exec($conn21, "prepare transaction '$gtid1'");
+query_exec($conn11, "select dtm_begin_prepare('$gtid1')");
+query_exec($conn21, "select dtm_begin_prepare('$gtid1')");
+my $csn = query_row($conn11, "select dtm_prepare('$gtid1', 0)");
+query_exec($conn21, "select dtm_prepare('$gtid1', $csn)");
+query_exec($conn11, "select dtm_end_prepare('$gtid1', $csn)");
+query_exec($conn21, "select dtm_end_prepare('$gtid1', $csn)");
+query_exec($conn11, "commit prepared '$gtid1'");
+query_exec($conn21, "commit prepared '$gtid1'");
+is($intermediate_total, 0, "Check for absence of dirty reads");
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 3df86d1..1f36ffb 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -15,6 +15,8 @@
 #include "postgres_fdw.h"
 #include "access/xact.h"
+#include "access/xtm.h"
+#include "access/transam.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "utils/hsearch.h"
@@ -61,11 +63,17 @@ static unsigned int prep_stmt_number = 0;
 /* tracks whether any work is needed in callback functions */
 static bool xact_got_connection = false;
+typedef long long csn_t;
+static csn_t currentGlobalTransactionId = 0;
+static int	currentLocalTransactionId = 0;
 /* prototypes of private functions */
 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void check_conn_params(const char **keywords, const char **values);
 static void configure_remote_session(PGconn *conn);
 static void do_sql_command(PGconn *conn, const char *sql);
+static void do_sql_send_command(PGconn *conn, const char *sql);
+static void do_sql_wait_command(PGconn *conn, const char *sql);
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
 static void pgfdw_subxact_callback(SubXactEvent event,
@@ -357,6 +365,32 @@ do_sql_command(PGconn *conn, const char *sql)
+static void
+do_sql_send_command(PGconn *conn, const char *sql)
+	if (PQsendQuery(conn, sql) != PGRES_COMMAND_OK)
+	{
+		PGresult   *res = PQgetResult(conn);
+		elog(WARNING, "Failed to send command %s", sql);
+		pgfdw_report_error(ERROR, res, conn, true, sql);
+		PQclear(res);
+	}
+static void
+do_sql_wait_command(PGconn *conn, const char *sql)
+	PGresult   *res;
+	while ((res = PQgetResult(conn)) != NULL)
+	{
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pgfdw_report_error(ERROR, res, conn, true, sql);
+		PQclear(res);
+	}
  * Start remote transaction or subtransaction, if needed.
@@ -375,17 +409,58 @@ begin_remote_xact(ConnCacheEntry *entry)
 	/* Start main transaction if we haven't yet */
 	if (entry->xact_depth <= 0)
+		TransactionId gxid = GetTransactionManager()->GetGlobalTransactionId();
 		const char *sql;
 		elog(DEBUG3, "starting remote transaction on connection %p",
+		if (TransactionIdIsValid(gxid))
+		{
+			char		stmt[64];
+			PGresult   *res;
+			snprintf(stmt, sizeof(stmt), "select public.dtm_join_transaction(%d)", gxid);
+			res = PQexec(entry->conn, stmt);
+			PQclear(res);
+		}
 		if (IsolationIsSerializable())
 		do_sql_command(entry->conn, sql);
 		entry->xact_depth = 1;
+		if (UseTsDtmTransactions)
+		{
+			if (!currentGlobalTransactionId)
+			{
+				PGresult   *res = PQexec(entry->conn, psprintf("SELECT public.dtm_extend('%d.%d')",
+									MyProcPid, ++currentLocalTransactionId));
+				char	   *resp;
+				if (PQresultStatus(res) != PGRES_TUPLES_OK)
+				{
+					pgfdw_report_error(ERROR, res, entry->conn, true, sql);
+				}
+				resp = PQgetvalue(res, 0, 0);
+				if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%lld", &currentGlobalTransactionId) != 1)
+				{
+					pgfdw_report_error(ERROR, res, entry->conn, true, sql);
+				}
+				PQclear(res);
+			}
+			else
+			{
+				PGresult   *res = PQexec(entry->conn, psprintf("SELECT public.dtm_access(%llu, '%d.%d')", currentGlobalTransactionId, MyProcPid, currentLocalTransactionId));
+				if (PQresultStatus(res) != PGRES_TUPLES_OK)
+				{
+					pgfdw_report_error(ERROR, res, entry->conn, true, sql);
+				}
+				PQclear(res);
+			}
+		}
@@ -511,6 +586,78 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+typedef bool (*DtmCommandResultHandler) (PGresult *result, void *arg);
+static bool
+RunDtmStatement(char const * sql, unsigned expectedStatus, DtmCommandResultHandler handler, void *arg)
+	ConnCacheEntry *entry;
+	bool		allOk = true;
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		if (entry->xact_depth > 0)
+		{
+			do_sql_send_command(entry->conn, sql);
+		}
+	}
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		if (entry->xact_depth > 0)
+		{
+			PGresult   *result = PQgetResult(entry->conn);
+			if (PQresultStatus(result) != expectedStatus || (handler && !handler(result, arg)))
+			{
+				elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result), expectedStatus);
+				pgfdw_report_error(ERROR, result, entry->conn, true, sql);
+				allOk = false;
+			}
+			PQclear(result);
+			PQgetResult(entry->conn);	/* consume NULL result */
+		}
+	}
+	return allOk;
+static bool
+RunDtmCommand(char const * sql)
+	return RunDtmStatement(sql, PGRES_COMMAND_OK, NULL, NULL);
+static bool
+RunDtmFunction(char const * sql)
+	return RunDtmStatement(sql, PGRES_TUPLES_OK, NULL, NULL);
+static bool
+DtmMaxCSN(PGresult *result, void *arg)
+	char	   *resp = PQgetvalue(result, 0, 0);
+	csn_t	   *maxCSN = (csn_t *) arg;
+	csn_t		csn = 0;
+	if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%lld", &csn) != 1)
+	{
+		return false;
+	}
+	else
+	{
+		if (*maxCSN < csn)
+		{
+			*maxCSN = csn;
+		}
+		return true;
+	}
  * pgfdw_xact_callback --- cleanup at main-transaction end.
@@ -524,6 +671,40 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	if (!xact_got_connection)
+	if (currentGlobalTransactionId != 0)
+	{
+		switch (event)
+		{
+				{
+					csn_t		maxCSN = 0;
+					if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'",
+									MyProcPid, currentLocalTransactionId)) ||
+						!RunDtmFunction(psprintf("SELECT public.dtm_begin_prepare('%d.%d')",
+									MyProcPid, currentLocalTransactionId)) ||
+						!RunDtmStatement(psprintf("SELECT public.dtm_prepare('%d.%d',0)",
+												  MyProcPid, currentLocalTransactionId), PGRES_TUPLES_OK, DtmMaxCSN, &maxCSN) ||
+						!RunDtmFunction(psprintf("SELECT public.dtm_end_prepare('%d.%d',%lld)",
+							MyProcPid, currentLocalTransactionId, maxCSN)) ||
+						!RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'",
+									  MyProcPid, currentLocalTransactionId)))
+					{
+						RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'",
+									  MyProcPid, currentLocalTransactionId));
+						ereport(ERROR,
+								 errmsg("transaction was aborted at one of the shards")));
+						break;
+					}
+					return;
+				}
+			default:
+				break;
+		}
+	}
 	 * Scan all connection cache entries to find open remote transactions, and
 	 * close them.
@@ -540,15 +721,40 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		/* If it has an open remote transaction, try to close it */
 		if (entry->xact_depth > 0)
-			elog(DEBUG3, "closing remote transaction on connection %p",
-				 entry->conn);
+			elog(DEBUG3, "closing remote transaction on connection %p event %d",
+				 entry->conn, event);
 			switch (event)
 					/* Commit all remote transactions during pre-commit */
-					do_sql_command(entry->conn, "COMMIT TRANSACTION");
+					do_sql_send_command(entry->conn, "COMMIT TRANSACTION");
+					continue;
+					/*
+					 * We disallow remote transactions that modified anything,
+					 * since it's not very reasonable to hold them open until
+					 * the prepared transaction is committed.  For the moment,
+					 * throw error unconditionally; later we might allow
+					 * read-only cases.  Note that the error will cause us to
+					 * come right back here with event == XACT_EVENT_ABORT, so
+					 * we'll clean up the connection state at that point.
+					 */
+					ereport(ERROR,
+							 errmsg("cannot prepare a transaction that modified remote tables")));
+					break;
+					if (!currentGlobalTransactionId)
+					{
+						do_sql_wait_command(entry->conn, "COMMIT TRANSACTION");
+					}
 					 * If there were any errors in subtransactions, and we
@@ -573,27 +779,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					entry->have_prep_stmt = false;
 					entry->have_error = false;
-					/*
-					 * We disallow remote transactions that modified anything,
-					 * since it's not very reasonable to hold them open until
-					 * the prepared transaction is committed.  For the moment,
-					 * throw error unconditionally; later we might allow
-					 * read-only cases.  Note that the error will cause us to
-					 * come right back here with event == XACT_EVENT_ABORT, so
-					 * we'll clean up the connection state at that point.
-					 */
-					ereport(ERROR,
-							 errmsg("cannot prepare a transaction that modified remote tables")));
-					break;
-					/* Pre-commit should have closed the open transaction */
-					elog(ERROR, "missed cleaning up connection during pre-commit");
-					break;
 					/* Assume we might have lost track of prepared statements */
@@ -617,6 +803,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 						entry->have_error = false;
+					break;
@@ -630,21 +821,26 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		if (PQstatus(entry->conn) != CONNECTION_OK ||
 			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
-			elog(DEBUG3, "discarding connection %p", entry->conn);
+			elog(WARNING, "discarding connection %p, conn status=%d, trans status=%d", entry->conn, PQstatus(entry->conn), PQtransactionStatus(entry->conn));
 			entry->conn = NULL;
+	{
+		/*
+		 * Regardless of the event type, we can now mark ourselves as out of
+		 * the transaction.  (Note: if we are here during PRE_COMMIT or
+		 * PRE_PREPARE, this saves a useless scan of the hashtable during
+		 */
+		xact_got_connection = false;
-	/*
-	 * Regardless of the event type, we can now mark ourselves as out of the
-	 * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
-	 * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
-	 */
-	xact_got_connection = false;
+		/* Also reset cursor numbering for next transaction */
+		cursor_number = 0;
-	/* Also reset cursor numbering for next transaction */
-	cursor_number = 0;
+		currentGlobalTransactionId = 0;
+	}
diff --git a/contrib/postgres_fdw/postgres_fdw--1.0.sql b/contrib/postgres_fdw/postgres_fdw--1.0.sql
index a0f0fc1..0ce8f0e 100644
--- a/contrib/postgres_fdw/postgres_fdw--1.0.sql
+++ b/contrib/postgres_fdw/postgres_fdw--1.0.sql
@@ -16,3 +16,8 @@ LANGUAGE C STRICT;
   HANDLER postgres_fdw_handler
   VALIDATOR postgres_fdw_validator;
+CREATE FUNCTION postgres_fdw_exec(relid oid, sql cstring)
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index d5a2af9..08b28b6 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -193,10 +193,14 @@ typedef struct
 	List	   *already_used;	/* expressions already dealt with */
 } ec_member_foreign_arg;
+bool		UseTsDtmTransactions;
+void		_PG_init(void);
  * SQL functions
  * FDW callback routines
@@ -3214,3 +3218,29 @@ find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
 	/* We didn't find any suitable equivalence class expression */
 	return NULL;
+	Oid			relid = PG_GETARG_OID(0);
+	char const *sql = PG_GETARG_CSTRING(1);
+	Oid			userid = GetUserId();
+	ForeignTable *table = GetForeignTable(relid);
+	ForeignServer *server = GetForeignServer(table->serverid);
+	UserMapping *user = GetUserMapping(userid, server->serverid);
+	PGconn	   *conn = GetConnection(server, user, false);
+	PGresult   *res = PQexec(conn, sql);
+	PQclear(res);
+	ReleaseConnection(conn);
+	DefineCustomBoolVariable("postgres_fdw.use_tsdtm",
+							 "Use timestamp base distributed transaction manager for FDW connections", NULL,
+						  &UseTsDtmTransactions, false, PGC_USERSET, 0, NULL,
+							 NULL, NULL);
diff --git a/contrib/postgres_fdw/tests/dtmbench.cpp b/contrib/postgres_fdw/tests/dtmbench.cpp
new file mode 100644
index 0000000..c8e7d72
--- /dev/null
+++ b/contrib/postgres_fdw/tests/dtmbench.cpp
@@ -0,0 +1,275 @@
+#include <time.h>
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <inttypes.h>
+#include <sys/time.h>
+#include <pthread.h>
+#include <string>
+#include <vector>
+#include <pqxx/connection>
+#include <pqxx/transaction>
+#include <pqxx/nontransaction>
+#include <pqxx/pipeline>
+using namespace std;
+using namespace pqxx;
+typedef void* (*thread_proc_t)(void*);
+typedef uint32_t xid_t;
+struct thread
+    pthread_t t;
+    size_t transactions;
+    size_t updates;
+    size_t selects;
+    size_t aborts;
+    int id;
+    void start(int tid, thread_proc_t proc) {
+        id = tid;
+        updates = 0;
+        selects = 0;
+        aborts = 0;
+        transactions = 0;
+        pthread_create(&t, NULL, proc, this);
+    }
+    void wait() {
+        pthread_join(t, NULL);
+    }
+struct config
+    int nReaders;
+    int nWriters;
+    int nIterations;
+    int nAccounts;
+    int updatePercent;
+	int nShards;
+    string connection;
+    config() {
+		nShards = 1;
+        nReaders = 1;
+        nWriters = 10;
+        nIterations = 1000;
+        nAccounts = 10000;
+        updatePercent = 100;
+    }
+config cfg;
+bool running;
+#define USEC 1000000
+static time_t getCurrentTime()
+    struct timeval tv;
+    gettimeofday(&tv, NULL);
+    return (time_t)tv.tv_sec*USEC + tv.tv_usec;
+void exec(transaction_base& txn, char const* sql, ...)
+    va_list args;
+    va_start(args, sql);
+    char buf[1024];
+    vsprintf(buf, sql, args);
+    va_end(args);
+    txn.exec(buf);
+template<class T>
+T execQuery( transaction_base& txn, char const* sql, ...)
+    va_list args;
+    va_start(args, sql);
+    char buf[1024];
+    vsprintf(buf, sql, args);
+    va_end(args);
+    result r = txn.exec(buf);
+    return r[0][0].as(T());
+void* reader(void* arg)
+    thread& t = *(thread*)arg;
+	connection conn(cfg.connection);
+    int64_t prevSum = 0;
+    while (running) {
+        work txn(conn);
+        result r = txn.exec("select sum(v) from t");
+        int64_t sum = r[0][0].as(int64_t());
+        if (sum != prevSum) {
+            printf("Total=%ld\n", sum);
+            prevSum = sum;
+        }
+        t.transactions += 1;
+        t.selects += 1;
+        txn.commit();
+    }
+    return NULL;
+void* writer(void* arg)
+    thread& t = *(thread*)arg;
+    connection conn(cfg.connection);
+    for (int i = 0; i < cfg.nIterations; i++)
+    {
+		work txn(conn);
+        int srcAcc = random() % cfg.nAccounts;
+        int dstAcc = random() % cfg.nAccounts;
+        try {
+            if (random() % 100 < cfg.updatePercent) {
+                exec(txn, "update t set v = v - 1 where u=%d", srcAcc);
+                exec(txn, "update t set v = v + 1 where u=%d", dstAcc);
+                t.updates += 2;
+            } else {
+                int64_t sum = execQuery<int64_t>(txn, "select v from t where u=%d", srcAcc)
+                    + execQuery<int64_t>(txn, "select v from t where u=%d", dstAcc);
+                if (sum > cfg.nIterations*cfg.nWriters || sum < -cfg.nIterations*cfg.nWriters) {
+                    printf("Wrong sum=%ld\n", sum);
+                }
+                t.selects += 2;
+            }
+            txn.commit();
+            t.transactions += 1;
+        } catch (pqxx_exception const& x) {
+            txn.abort();
+            t.aborts += 1;
+            i -= 1;
+            continue;
+        }
+    }
+    return NULL;
+void initializeDatabase()
+    connection conn(cfg.connection);
+	int accountsPerShard = (cfg.nAccounts + cfg.nShards - 1)/cfg.nShards;
+	for (int i = 0; i < cfg.nShards; i++)
+	{
+		work txn(conn);
+		exec(txn, "alter table t_fdw%i add check (u between %d and %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1);
+		exec(txn, "insert into t_fdw%i (select generate_series(%d,%d), %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1, 0);
+		txn.commit();
+	}
+int main (int argc, char* argv[])
+    bool initialize = false;
+    if (argc == 1){
+        printf("Use -h to show usage options\n");
+        return 1;
+    }
+    for (int i = 1; i < argc; i++) {
+        if (argv[i][0] == '-') {
+            switch (argv[i][1]) {
+            case 'r':
+                cfg.nReaders = atoi(argv[++i]);
+                continue;
+            case 'w':
+                cfg.nWriters = atoi(argv[++i]);
+                continue;
+            case 'a':
+                cfg.nAccounts = atoi(argv[++i]);
+                continue;
+            case 'n':
+                cfg.nIterations = atoi(argv[++i]);
+                continue;
+            case 'p':
+                cfg.updatePercent = atoi(argv[++i]);
+                continue;
+            case 'c':
+                cfg.connection = string(argv[++i]);
+                continue;
+            case 'i':
+                initialize = true;
+				cfg.nShards = atoi(argv[++i]);
+                continue;
+            }
+        }
+        printf("Options:\n"
+               "\t-r N\tnumber of readers (1)\n"
+               "\t-w N\tnumber of writers (10)\n"
+               "\t-a N\tnumber of accounts (100000)\n"
+               "\t-n N\tnumber of iterations (1000)\n"
+               "\t-p N\tupdate percent (100)\n"
+               "\t-c STR\tdatabase connection string\n"
+               "\t-i N\tinitialize N shards\n");
+        return 1;
+    }
+    if (initialize) {
+        initializeDatabase();
+        printf("%d accounts inserted\n", cfg.nAccounts);
+        return 0;
+    }
+    time_t start = getCurrentTime();
+    running = true;
+    vector<thread> readers(cfg.nReaders);
+    vector<thread> writers(cfg.nWriters);
+    size_t nAborts = 0;
+    size_t nUpdates = 0;
+    size_t nSelects = 0;
+    size_t nTransactions = 0;
+    for (int i = 0; i < cfg.nReaders; i++) {
+        readers[i].start(i, reader);
+    }
+    for (int i = 0; i < cfg.nWriters; i++) {
+        writers[i].start(i, writer);
+    }
+    for (int i = 0; i < cfg.nWriters; i++) {
+        writers[i].wait();
+        nUpdates += writers[i].updates;
+        nSelects += writers[i].selects;
+        nAborts += writers[i].aborts;
+        nTransactions += writers[i].transactions;
+    }
+    running = false;
+    for (int i = 0; i < cfg.nReaders; i++) {
+        readers[i].wait();
+        nSelects += readers[i].selects;
+        nTransactions += writers[i].transactions;
+    }
+    time_t elapsed = getCurrentTime() - start;
+    printf(
+        "{\"tps\":%f, \"transactions\":%ld,"
+        " \"selects\":%ld, \"updates\":%ld, \"aborts\":%ld, \"abort_percent\": %d,"
+        " \"readers\":%d, \"writers\":%d, \"update_percent\":%d, \"accounts\":%d, \"iterations\":%d ,\"shards\":%d}\n",
+        (double)(nTransactions*USEC)/elapsed,
+        nTransactions,
+        nSelects,
+        nUpdates,
+        nAborts,
+        (int)(nAborts*100/nTransactions),
+        cfg.nReaders,
+        cfg.nWriters,
+        cfg.updatePercent,
+        cfg.nAccounts,
+        cfg.nIterations,
+		cfg.nShards);
+    return 0;
diff --git a/contrib/postgres_fdw/tests/makefile b/contrib/postgres_fdw/tests/makefile
new file mode 100644
index 0000000..766d99f
--- /dev/null
+++ b/contrib/postgres_fdw/tests/makefile
@@ -0,0 +1,10 @@
+CXXFLAGS=-g -Wall -O2 -pthread
+all: dtmbench
+dtmbench: dtmbench.cpp
+	$(CXX) $(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx
+	rm -f dtmbench
\ No newline at end of file
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 94455b2..37523a1 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
 OBJS = clog.o commit_ts.o multixact.o parallel.o rmgr.o slru.o subtrans.o \
 	timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
 	xact.o xlog.o xlogarchive.o xlogfuncs.o \
-	xloginsert.o xlogreader.o xlogutils.o
+	xloginsert.o xlogreader.o xlogutils.o xtm.o
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c
index 06aff18..ba7b09f 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -38,6 +38,7 @@
 #include "access/xlog.h"
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
+#include "access/xtm.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
@@ -92,6 +93,12 @@ static void TransactionIdSetStatusBit(TransactionId xid, XidStatus status,
 static void set_status_by_pages(int nsubxids, TransactionId *subxids,
 					XidStatus status, XLogRecPtr lsn);
+TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
+					TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
+	return TM->SetTransactionStatus(xid, nsubxids, subxids, status, lsn);
  * TransactionIdSetTreeStatus
@@ -145,7 +152,7 @@ static void set_status_by_pages(int nsubxids, TransactionId *subxids,
  * cache yet.
-TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
+PgTransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
 					TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
 	int			pageno = TransactionIdToPage(xid);		/* get page of parent */
@@ -391,6 +398,12 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i
 TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
+	return TM->GetTransactionStatus(xid, lsn);
+PgTransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
 	int			pageno = TransactionIdToPage(xid);
 	int			byteno = TransactionIdToByte(xid);
 	int			bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 8a22836..6c2813b 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1381,15 +1381,21 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	 * callbacks will release the locks the transaction held.
 	if (isCommit)
+	{
 										hdr->nsubxacts, children,
 										hdr->ncommitrels, commitrels,
 										hdr->ninvalmsgs, invalmsgs,
+	}
+	{
 									   hdr->nsubxacts, children,
 									   hdr->nabortrels, abortrels);
+	}
 	ProcArrayRemove(proc, latestXid);
@@ -2150,3 +2156,12 @@ RecordTransactionAbortPrepared(TransactionId xid,
+ * Return identified of current global transaction
+ */
+const char*
+	return MyLockedGxact ? MyLockedGxact->gid : NULL;
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 2f7e645..2f8514d 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -19,6 +19,7 @@
 #include "access/transam.h"
 #include "access/xact.h"
 #include "access/xlog.h"
+#include "access/xtm.h"
 #include "commands/dbcommands.h"
 #include "miscadmin.h"
 #include "postmaster/autovacuum.h"
@@ -33,6 +34,11 @@
 /* pointer to "variable cache" in shared memory (set up by shmem.c) */
 VariableCache ShmemVariableCache = NULL;
+GetNewTransactionId(bool isSubXact)
+	return TM->GetNewTransactionId(isSubXact);
  * Allocate the next XID for a new transaction or subtransaction.
@@ -45,7 +51,7 @@ VariableCache ShmemVariableCache = NULL;
  * issue a warning about XID wrap.
-GetNewTransactionId(bool isSubXact)
+PgGetNewTransactionId(bool isSubXact)
 	TransactionId xid;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b0d5440..c8d58d3 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -301,7 +301,6 @@ static void AtCommit_Memory(void);
 static void AtStart_Cache(void);
 static void AtStart_Memory(void);
 static void AtStart_ResourceOwner(void);
-static void CallXactCallbacks(XactEvent event);
 static void CallSubXactCallbacks(SubXactEvent event,
 					 SubTransactionId mySubid,
 					 SubTransactionId parentSubid);
@@ -1909,6 +1908,7 @@ StartTransaction(void)
 	s->state = TRANS_INPROGRESS;
+	CallXactCallbacks(XACT_EVENT_START);
@@ -3309,7 +3309,7 @@ UnregisterXactCallback(XactCallback callback, void *arg)
-static void
 CallXactCallbacks(XactEvent event)
 	XactCallbackItem *item;
@@ -5607,3 +5607,10 @@ xact_redo(XLogReaderState *record)
 		elog(PANIC, "xact_redo: unknown op code %u", info);
+	CurrentTransactionState->state = TRANS_INPROGRESS;
+	CurrentTransactionState->blockState = TBLOCK_STARTED;
diff --git a/src/backend/access/transam/xtm.c b/src/backend/access/transam/xtm.c
new file mode 100644
index 0000000..7915b1e
--- /dev/null
+++ b/src/backend/access/transam/xtm.c
@@ -0,0 +1,58 @@
+ *
+ * xtm.c
+ *		PostgreSQL implementation of transaction manager protocol
+ *
+ * This module defines default iplementaiton of PostgreSQL transaction manager protocol
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/xtm.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+#include "access/transam.h"
+#include "access/xtm.h"
+	return InvalidTransactionId;
+PgDetectGlobalDeadLock(PGPROC *proc)
+	return false;
+char const *
+	return "postgres";
+TransactionManager PgTM = {
+	PgTransactionIdGetStatus,
+	PgTransactionIdSetTreeStatus,
+	PgGetSnapshotData,
+	PgGetNewTransactionId,
+	PgGetOldestXmin,
+	PgTransactionIdIsInProgress,
+	PgGetGlobalTransactionId,
+	PgXidInMVCCSnapshot,
+	PgDetectGlobalDeadLock,
+	PgGetTransactionManagerName
+TransactionManager *TM = &PgTM;
+TransactionManager *
+	return TM;
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 6ded0f0..c2e878c 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -51,6 +51,7 @@
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog.h"
+#include "access/xtm.h"
 #include "catalog/catalog.h"
 #include "miscadmin.h"
 #include "storage/proc.h"
@@ -971,6 +972,12 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
+TransactionIdIsInProgress(TransactionId xid)
+	return TM->IsInProgress(xid);
  * TransactionIdIsInProgress -- is given transaction running in some backend
@@ -998,7 +1005,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
  * PGXACT again anyway; see GetNewTransactionId).
-TransactionIdIsInProgress(TransactionId xid)
+PgTransactionIdIsInProgress(TransactionId xid)
 	static TransactionId *xids = NULL;
 	int			nxids = 0;
@@ -1259,6 +1266,12 @@ TransactionIdIsActive(TransactionId xid)
+GetOldestXmin(Relation rel, bool ignoreVacuum)
+	return TM->GetOldestXmin(rel, ignoreVacuum);
  * GetOldestXmin -- returns oldest transaction that was running
  *					when any current transaction was started.
@@ -1308,7 +1321,7 @@ TransactionIdIsActive(TransactionId xid)
  * GetOldestXmin() move backwards, with no consequences for data integrity.
-GetOldestXmin(Relation rel, bool ignoreVacuum)
+PgGetOldestXmin(Relation rel, bool ignoreVacuum)
 	ProcArrayStruct *arrayP = procArray;
 	TransactionId result;
@@ -1470,6 +1483,12 @@ GetMaxSnapshotSubxidCount(void)
+GetSnapshotData(Snapshot snapshot)
+	return TM->GetSnapshot(snapshot);
  * GetSnapshotData -- returns information about running transactions.
@@ -1506,7 +1525,7 @@ GetMaxSnapshotSubxidCount(void)
  * not statically allocated (see xip allocation below).
-GetSnapshotData(Snapshot snapshot)
+PgGetSnapshotData(Snapshot snapshot)
 	ProcArrayStruct *arrayP = procArray;
 	TransactionId xmin;
diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c
index 69f678b..ec8b77e 100644
--- a/src/backend/storage/lmgr/deadlock.c
+++ b/src/backend/storage/lmgr/deadlock.c
@@ -30,6 +30,7 @@
 #include "pgstat.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
+#include "access/xtm.h"
 #include "utils/memutils.h"
@@ -272,7 +273,7 @@ DeadLockCheck(PGPROC *proc)
 	else if (blocking_autovacuum_proc != NULL)
-		return DS_NO_DEADLOCK;
+		return TM->DetectGlobalDeadLock(proc) ? DS_DISTRIBUTED_DEADLOCK : DS_NO_DEADLOCK;
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index e3e9599..21bf32b 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -3639,6 +3639,20 @@ GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
 	return LockMethods[lockmethodid]->lockModeNames[mode];
+EnumerateLocks(LockIterator iterator, void* arg)
+	PROCLOCK   *proclock;
+	hash_seq_init(&status, LockMethodProcLockHash);
+	while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
+	{
+		iterator(proclock, arg);
+	}
 #ifdef LOCK_DEBUG
  * Dump all locks in the given proc's myProcLocks lists.
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 084be5a..8ded531 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -1395,6 +1395,22 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
 						   "Processes holding the lock: %s. Wait queue: %s.",
 											   lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
+			else if (deadlock_state == DS_DISTRIBUTED_DEADLOCK)
+			{
+				/*
+				 * This message is a bit redundant with the error that will be
+				 * reported subsequently, but in some cases the error report
+				 * might not make it to the log (eg, if it's caught by an
+				 * exception handler), and we want to ensure all long-wait
+				 * events get logged.
+				 */
+				ereport(LOG,
+						(errmsg("process %d detected distributed deadlock while waiting for %s on %s after %ld.%03d ms",
+								MyProcPid, modename, buf.data, msecs, usecs),
+						 (errdetail_log_plural("Process holding the lock: %s. Wait queue: %s.",
+						   "Processes holding the lock: %s. Wait queue: %s.",
+											   lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
+			}
 			if (myWaitStatus == STATUS_WAITING)
@@ -1419,7 +1435,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
 				 * future-proofing, print a message if it looks like someone
 				 * else kicked us off the lock.
-				if (deadlock_state != DS_HARD_DEADLOCK)
+				if (deadlock_state != DS_HARD_DEADLOCK && deadlock_state != DS_DISTRIBUTED_DEADLOCK)
 							(errmsg("process %d failed to acquire %s on %s after %ld.%03d ms",
 								MyProcPid, modename, buf.data, msecs, usecs),
@@ -1636,7 +1652,7 @@ CheckDeadLock(void)
 	/* Run the deadlock check, and set deadlock_state for use by ProcSleep */
 	deadlock_state = DeadLockCheck(MyProc);
-	if (deadlock_state == DS_HARD_DEADLOCK)
+	if (deadlock_state == DS_HARD_DEADLOCK || deadlock_state == DS_DISTRIBUTED_DEADLOCK)
 		 * Oops.  We have a deadlock.
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index 465933d..7ca0e06 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -66,6 +66,7 @@
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/xtm.h"
 #include "access/xlog.h"
 #include "storage/bufmgr.h"
 #include "storage/procarray.h"
@@ -1454,6 +1455,12 @@ HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin)
 	return TransactionIdPrecedes(HeapTupleHeaderGetRawXmax(tuple), OldestXmin);
+XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+	return TM->IsInSnapshot(xid, snapshot);
  * XidInMVCCSnapshot
  *		Is the given XID still-in-progress according to the snapshot?
@@ -1464,8 +1471,8 @@ HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin)
  * TransactionIdIsCurrentTransactionId first, except for known-committed
  * XIDs which could not be ours anyway.
-static bool
-XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+PgXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 	uint32		i;
diff --git a/src/include/access/clog.h b/src/include/access/clog.h
index 06c069a..3adf6b1 100644
--- a/src/include/access/clog.h
+++ b/src/include/access/clog.h
@@ -27,6 +27,7 @@ typedef int XidStatus;
 extern void TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index b7ce0c6..05d6aef 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -56,4 +56,6 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
 extern void FinishPreparedTransaction(const char *gid, bool isCommit);
+extern const char *GetLockedGlobalTransactionId(void);
 #endif   /* TWOPHASE_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index ebeb582..4a59047 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -77,6 +77,7 @@ extern bool MyXactAccessedTempRel;
 typedef enum
@@ -84,7 +85,9 @@ typedef enum
 } XactEvent;
 typedef void (*XactCallback) (XactEvent event, void *arg);
@@ -100,6 +103,8 @@ typedef enum
 typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 									SubTransactionId parentSubid, void *arg);
+void		CallXactCallbacks(XactEvent event);
 /* ----------------
  *		transaction-related XLOG entries
@@ -380,4 +385,6 @@ extern void EnterParallelMode(void);
 extern void ExitParallelMode(void);
 extern bool IsInParallelMode(void);
+extern void MarkAsAborted(void);
 #endif   /* XACT_H */
diff --git a/src/include/access/xtm.h b/src/include/access/xtm.h
new file mode 100644
index 0000000..08fa259
--- /dev/null
+++ b/src/include/access/xtm.h
@@ -0,0 +1,103 @@
+ * xtm.h
+ *
+ * PostgreSQL transaction-commit-log manager
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/xtm.h
+ */
+#ifndef XTM_H
+#define XTM_H
+#include "storage/proc.h"
+#include "access/clog.h"
+#include "utils/snapmgr.h"
+#include "utils/relcache.h"
+typedef struct
+	/*
+	 * Get current transaction status (encapsulation of TransactionIdGetStatus
+	 * in clog.c)
+	 */
+	XidStatus	(*GetTransactionStatus) (TransactionId xid, XLogRecPtr *lsn);
+	/*
+	 * Set current transaction status (encapsulation of
+	 * TransactionIdSetTreeStatus in clog.c)
+	 */
+	void		(*SetTransactionStatus) (TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
+	/*
+	 * Get current transaction snaphot (encapsulation of GetSnapshotData in
+	 * procarray.c)
+	 */
+	Snapshot	(*GetSnapshot) (Snapshot snapshot);
+	/*
+	 * Assign new Xid to transaction (encapsulation of GetNewTransactionId in
+	 * varsup.c)
+	 */
+	TransactionId (*GetNewTransactionId) (bool isSubXact);
+	/*
+	 * Get oldest transaction Xid that was running when any current
+	 * transaction was started (encapsulation of GetOldestXmin in procarray.c)
+	 */
+	TransactionId (*GetOldestXmin) (Relation rel, bool ignoreVacuum);
+	/*
+	 * Check if current transaction is not yet completed (encapsulation of
+	 * TransactionIdIsInProgress in procarray.c)
+	 */
+	bool		(*IsInProgress) (TransactionId xid);
+	/*
+	 * Get global transaction XID: returns XID of current transaction if it is
+	 * global, InvalidTransactionId otherwise
+	 */
+	TransactionId (*GetGlobalTransactionId) (void);
+	/*
+	 * Is the given XID still-in-progress according to the snapshot
+	 * (encapsulation of XidInMVCCSnapshot in tqual.c)
+	 */
+	bool		(*IsInSnapshot) (TransactionId xid, Snapshot snapshot);
+	/* Detect distributed deadlock */
+	bool		(*DetectGlobalDeadLock) (PGPROC *proc);
+	char const *(*GetName) (void);
+}	TransactionManager;
+/* Get pointer to transaction manager: actually returns content of TM variable */
+TransactionManager *GetTransactionManager(void);
+extern TransactionManager *TM;	/* Current transaction manager (can be
+								 * substituted by extensions) */
+extern TransactionManager PgTM; /* Standard PostgreSQL transaction manager */
+/* Standard PostgreSQL function implementing TM interface */
+extern bool PgXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
+extern void PgTransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
+				   TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
+extern XidStatus PgTransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn);
+extern Snapshot PgGetSnapshotData(Snapshot snapshot);
+extern TransactionId PgGetOldestXmin(Relation rel, bool ignoreVacuum);
+extern bool PgTransactionIdIsInProgress(TransactionId xid);
+extern TransactionId PgGetGlobalTransactionId(void);
+extern TransactionId PgGetNewTransactionId(bool isSubXact);
+extern bool PgDetectGlobalDeadLock(PGPROC *proc);
+extern char const *PgGetTransactionManagerName(void);
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 6b4e365..0d45894 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -454,8 +454,9 @@ typedef enum
 	DS_NO_DEADLOCK,				/* no deadlock detected */
 	DS_SOFT_DEADLOCK,			/* deadlock avoided by queue rearrangement */
 	DS_HARD_DEADLOCK,			/* deadlock, no way out but ERROR */
-	DS_BLOCKED_BY_AUTOVACUUM	/* no deadlock; queue blocked by autovacuum
+	DS_BLOCKED_BY_AUTOVACUUM,	/* no deadlock; queue blocked by autovacuum
 								 * worker */
+	DS_DISTRIBUTED_DEADLOCK		/* distributed deadlock detected by DTM */
 } DeadLockState;
@@ -547,6 +548,10 @@ extern void DumpLocks(PGPROC *proc);
 extern void DumpAllLocks(void);
+typedef void (*LockIterator) (PROCLOCK *lock, void *arg);
+extern void EnumerateLocks(LockIterator iterator, void *arg);
 /* Lock a VXID (used to wait for a transaction to finish) */
 extern void VirtualXactLockTableInsert(VirtualTransactionId vxid);
 extern void VirtualXactLockTableCleanup(void);
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:

Reply via email to