diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 1a1e5b5..f52d2ba 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -15,11 +15,15 @@
 #include "postgres_fdw.h"
 
 #include "access/xact.h"
+#include "access/xtm.h"
+#include "access/transam.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
+#undef DEBUG3
+#define DEBUG3 WARNING
 
 /*
  * Connection cache hash table entry
@@ -68,6 +72,8 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void check_conn_params(const char **keywords, const char **values);
 static void configure_remote_session(PGconn *conn);
 static void do_sql_command(PGconn *conn, const char *sql);
+static void do_sql_send_command(PGconn *conn, const char *sql);
+static void do_sql_wait_command(PGconn *conn, const char *sql);
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
 static void pgfdw_subxact_callback(SubXactEvent event,
@@ -358,6 +364,27 @@ do_sql_command(PGconn *conn, const char *sql)
 	PQclear(res);
 }
 
+static void
+do_sql_send_command(PGconn *conn, const char *sql)
+{
+	if (PQsendQuery(conn, sql) != PGRES_COMMAND_OK) {
+		PGresult *res = PQgetResult(conn);
+		pgfdw_report_error(ERROR, res, conn, true, sql);
+		PQclear(res);
+	}
+}
+
+static void
+do_sql_wait_command(PGconn *conn, const char *sql)
+{
+	PGresult *res;
+	while ((res = PQgetResult(conn)) != NULL) {
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pgfdw_report_error(ERROR, res, conn, true, sql);
+		PQclear(res);
+	}
+}
+
 /*
  * Start remote transaction or subtransaction, if needed.
  *
@@ -376,11 +403,21 @@ begin_remote_xact(ConnCacheEntry *entry)
 	/* Start main transaction if we haven't yet */
 	if (entry->xact_depth <= 0)
 	{
+		TransactionId gxid = GetTransactionManager()->GetGlobalTransactionId();
 		const char *sql;
 
 		elog(DEBUG3, "starting remote transaction on connection %p",
 			 entry->conn);
 
+		if (TransactionIdIsValid(gxid)) {
+			char stmt[64];
+			PGresult *res;
+
+			snprintf(stmt, sizeof(stmt), "select public.dtm_join_transaction(%d)", gxid);
+			res = PQexec(entry->conn, stmt);
+			PQclear(res);
+		}
+
 		if (IsolationIsSerializable())
 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
 		else
@@ -541,16 +578,36 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		/* If it has an open remote transaction, try to close it */
 		if (entry->xact_depth > 0)
 		{
-			elog(DEBUG3, "closing remote transaction on connection %p",
-				 entry->conn);
+			elog(DEBUG3, "closing remote transaction on connection %p event %d",
+				 entry->conn, event);
 
 			switch (event)
 			{
 				case XACT_EVENT_PARALLEL_PRE_COMMIT:
 				case XACT_EVENT_PRE_COMMIT:
 					/* Commit all remote transactions during pre-commit */
-					do_sql_command(entry->conn, "COMMIT TRANSACTION");
+					do_sql_send_command(entry->conn, "COMMIT TRANSACTION");
+					continue;
 
+				case XACT_EVENT_PRE_PREPARE:
+					/*
+					 * We disallow remote transactions that modified anything,
+					 * since it's not very reasonable to hold them open until
+					 * the prepared transaction is committed.  For the moment,
+					 * throw error unconditionally; later we might allow
+					 * read-only cases.  Note that the error will cause us to
+					 * come right back here with event == XACT_EVENT_ABORT, so
+					 * we'll clean up the connection state at that point.
+					 */
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("cannot prepare a transaction that modified remote tables")));
+					break;
+
+				case XACT_EVENT_PARALLEL_COMMIT:
+				case XACT_EVENT_COMMIT:
+				case XACT_EVENT_PREPARE:
+					do_sql_wait_command(entry->conn, "COMMIT TRANSACTION");
 					/*
 					 * If there were any errors in subtransactions, and we
 					 * made prepared statements, do a DEALLOCATE ALL to make
@@ -574,27 +631,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					entry->have_prep_stmt = false;
 					entry->have_error = false;
 					break;
-				case XACT_EVENT_PRE_PREPARE:
 
-					/*
-					 * We disallow remote transactions that modified anything,
-					 * since it's not very reasonable to hold them open until
-					 * the prepared transaction is committed.  For the moment,
-					 * throw error unconditionally; later we might allow
-					 * read-only cases.  Note that the error will cause us to
-					 * come right back here with event == XACT_EVENT_ABORT, so
-					 * we'll clean up the connection state at that point.
-					 */
-					ereport(ERROR,
-							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-							 errmsg("cannot prepare a transaction that modified remote tables")));
-					break;
-				case XACT_EVENT_PARALLEL_COMMIT:
-				case XACT_EVENT_COMMIT:
-				case XACT_EVENT_PREPARE:
-					/* Pre-commit should have closed the open transaction */
-					elog(ERROR, "missed cleaning up connection during pre-commit");
-					break;
 				case XACT_EVENT_PARALLEL_ABORT:
 				case XACT_EVENT_ABORT:
 					/* Assume we might have lost track of prepared statements */
@@ -618,6 +655,12 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 						entry->have_error = false;
 					}
 					break;
+
+				case XACT_EVENT_START:
+				case XACT_EVENT_ABORT_PREPARED:
+				case XACT_EVENT_COMMIT_PREPARED:
+					break;
+
 			}
 		}
 
@@ -631,21 +674,23 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		if (PQstatus(entry->conn) != CONNECTION_OK ||
 			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
 		{
-			elog(DEBUG3, "discarding connection %p", entry->conn);
+			elog(DEBUG3, "discarding connection %p, conn status=%d, trans status=%d", entry->conn, PQstatus(entry->conn), PQtransactionStatus(entry->conn));
 			PQfinish(entry->conn);
 			entry->conn = NULL;
 		}
 	}
 
-	/*
-	 * Regardless of the event type, we can now mark ourselves as out of the
-	 * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
-	 * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
-	 */
-	xact_got_connection = false;
+	if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT) { 
+		/*
+		 * Regardless of the event type, we can now mark ourselves as out of the
+		 * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
+		 * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
+		 */
+		xact_got_connection = false;
 
-	/* Also reset cursor numbering for next transaction */
-	cursor_number = 0;
+		/* Also reset cursor numbering for next transaction */
+		cursor_number = 0;
+	}
 }
 
 /*
diff --git a/contrib/postgres_fdw/postgres_fdw--1.0.sql b/contrib/postgres_fdw/postgres_fdw--1.0.sql
index a0f0fc1..0ce8f0e 100644
--- a/contrib/postgres_fdw/postgres_fdw--1.0.sql
+++ b/contrib/postgres_fdw/postgres_fdw--1.0.sql
@@ -16,3 +16,8 @@ LANGUAGE C STRICT;
 CREATE FOREIGN DATA WRAPPER postgres_fdw
   HANDLER postgres_fdw_handler
   VALIDATOR postgres_fdw_validator;
+
+CREATE FUNCTION postgres_fdw_exec(relid oid, sql cstring)
+RETURNS void
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT;
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 1902f1f..1925c61 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -230,6 +230,7 @@ typedef struct
  * SQL functions
  */
 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
+PG_FUNCTION_INFO_V1(postgres_fdw_exec);
 
 /*
  * FDW callback routines
@@ -3002,3 +3003,19 @@ conversion_error_callback(void *arg)
 				   NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
 				   RelationGetRelationName(errpos->rel));
 }
+
+Datum
+postgres_fdw_exec(PG_FUNCTION_ARGS)
+{
+	Oid relid = PG_GETARG_OID(0);
+	char const* sql = PG_GETARG_CSTRING(1);
+	Oid			userid = GetUserId();
+	ForeignTable *table = GetForeignTable(relid);
+	ForeignServer *server = GetForeignServer(table->serverid);
+	UserMapping *user = GetUserMapping(userid, server->serverid);
+	PGconn* conn =  GetConnection(server, user, false);
+	PGresult* res = PQexec(conn, sql);
+	PQclear(res);
+	ReleaseConnection(conn);
+	PG_RETURN_VOID();
+}
