Hi All,

Here are the steps and infrastructure for achieving atomic commits across
multiple foreign servers. I have tried to address most of the concerns
raised in this mail thread before. Let me know, if I have left something.
Attached is a WIP patch implementing the same for postgres_fdw. I have
tried to make it FDW-independent.

A. Steps during transaction processing
------------------------------------------------

1. When an FDW connects to a foreign server and starts a transaction, it
registers that server with a boolean flag indicating whether that server is
capable of participating in a two phase commit. In the patch this is
implemented using function RegisterXactForeignServer(), which raises an
error, thus aborting the transaction, if there is at least one foreign
server incapable of 2PC in a multiserver transaction. This error thrown as
early as possible. If all the foreign servers involved in the transaction
are capable of 2PC, the function just updates the information. As of now,
in the patch the function is in the form of a stub.

Whether a foreign server is capable of 2PC, can be
a. FDW level decision e.g. file_fdw as of now, is incapable of 2PC but it
can build the capabilities which can be used for all the servers using
file_fdw
b. a decision based on server version type etc. thus FDW can decide that by
looking at the server properties for each server
c. a user decision where the FDW can allow a user to specify it in the form
of CREATE/ALTER SERVER option. Implemented in the patch.

For a transaction involving only a single foreign server, the current code
remains unaltered as two phase commit is not needed. Rest of the discussion
pertains to a transaction involving more than one foreign servers.
At the commit or abort time, the FDW receives call backs with the
appropriate events. FDW then takes following actions on each event.

2. On XACT_EVENT_PRE_COMMIT event, the FDW coins one prepared transaction
id per foreign server involved and saves it along with xid, dbid, foreign
server id and user mapping and foreign transaction status = PREPARING
in-memory. The prepared transaction id can be anything represented as byte
string. Same information is flushed to the disk to survive crashes. This is
implemented in the patch as prepare_foreign_xact(). Persistent and
in-memory storages and their usages are discussed later in the mail. FDW
then prepares the transaction on the foreign server. If this step is
successful, the foreign transaction status is changed to PREPARED. If the
step is unsuccessful, the local transaction is aborted and each FDW will
receive XACT_EVENT_ABORT (discussed later). The updates to the foreign
transaction status need not be flushed to the disk, as they can be inferred
from the status of local transaction.

3. If the local transaction is committed, the FDW callback will get
XACT_EVENT_COMMIT event. Foreign transaction status is changed to
COMMITTING. FDW tries to commit the foreign transaction with the prepared
transaction id. If the commit is successful, the foreign transaction entry
is removed. If the commit is unsuccessful because of local/foreign server
crash or network failure, the foreign prepared transaction resolver takes
care of the committing it at later point of time.

4. If the local transaction is aborted, the FDW callback will get
XACT_EVENT_ABORT event. At this point, the FDW may or may not have prepared
a transaction on foreign server as per step 1 above. If it has not prepared
the transaction, it simply aborts the transaction on foreign server; a
server crash or network failure doesn't alter the ultimate result in this
case. If FDW has prepared the foreign transaction, it updates the foreign
transaction status as ABORTING and tries to rollback the prepared
transaction. If the rollback is successful, the foreign transaction entry
is removed. If the rollback is not successful, the foreign prepared
transaction resolver takes care of aborting it at later point of time.

B. Foreign prepared transaction resolver
---------------------------------------------------
In the patch this is implemented as a built-in function pg_fdw_resolve().
Ideally the functionality should be run by a background worker process
frequently.

The resolver looks at each entry and invokes the FDW routine to resolve the
transaction. The FDW routine returns boolean status: true if the prepared
transaction was resolved (committed/aborted), false otherwise.
The resolution is as follows -
1. If foreign transaction status is COMMITTING or ABORTING, commits or
aborts the prepared transaction resp through the FDW routine. If the
transaction is successfully resolved, it removes the foreign transaction
entry.
2. Else, it checks if the local transaction was committed or aborted, it
update the foreign transaction status accordingly and takes the action
according to above step 1.
3. The resolver doesn't touch entries created by in-progress local
transactions.

If server/backend crashes after it has registered the foreign transaction
entry (during step A.1), we will be left with a prepared transaction id,
which was never prepared on the foreign server. Similarly the
server/backend crashes after it has resolved the foreign prepared
transaction but before removing the entry, same situation can arise. FDW
should detect these situations, when foreign server complains about
non-existing prepared transaction ids and consider such foreign
transactions as resolved.

After looking at all the entries the resolver flushes the entries to the
disk, so as to retain the latest status across shutdown and crash.

C. Other methods and infrastructure
------------------------------------------------
1. Method to show the current foreign transaction entries (in progress or
waiting to be resolved). Implemented as function pg_fdw_xact() in the patch.
2. Method to drop foreign transaction entries in case they are resolved by
user/DBA themselves. Not implemented in the patch.
3. Method to prevent altering or dropping foreign server and user mapping
used to prepare the foreign transaction till the later gets resolved. Not
implemented in the patch. While altering or dropping the foreign server or
user mapping, that portion of the code needs to check if there exists an
foreign transaction entry depending upon the foreign server or user mapping
and should error out.
4. The information about the xid needs to be available till it is decided
whether to commit or abort the foreign transaction and that decision is
persisted. That should put some constraint on the xid wraparound or oldest
active transaction. Not implemented in the patch.
5. Method to propagate the foreign transaction information to the slave.

D. Persistent and in-memory storage considerations
--------------------------------------------------------------------
I considered following options for persistent storage
1. in-memory table and file(s) - The foreign transaction entries are saved
and manipulated in shared memory. They are written to file whenever
persistence is necessary e.g. while registering the foreign transaction in
step A.2. Requirements C.1, C.2 need some SQL interface in the form of
built-in functions or SQL commands.

The patch implements the in-memory foreign transaction table as a fixed
size array of foreign transaction entries (similar to prepared transaction
entries in twophase.c). This puts a restriction on number of foreign
prepared transactions that need to be maintained at a time. We need
separate locks to syncronize the access to the shared memory; the patch
uses only a single LW lock. There is restriction on the length of prepared
transaction id (or prepared transaction information saved by FDW to be
general), since everything is being saved in fixed size memory. We may be
able to overcome that restriction by writing this information to separate
files (one file per foreign prepared transaction). We need to take the same
route as 2PC for C.5.

2. New catalog - This method takes out the need to have separate method for
C1, C5 and even C2, also the synchronization will be taken care of by row
locks, there will be no limit on the number of foreign transactions as well
as the size of foreign prepared transaction information. But big problem
with this approach is that, the changes to the catalogs are atomic with the
local transaction. If a foreign prepared transaction can not be aborted
while the local transaction is rolled back, that entry needs to retained.
But since the local transaction is aborting the corresponding catalog entry
would become invisible and thus unavailable to the resolver (alas! we do
not have autonomous transaction support). We may be able to overcome this,
by simulating autonomous transaction through a background worker (which can
also act as a resolver). But the amount of communication and
synchronization, might affect the performance.

A mixed approach where the backend shifts the entries from storage in
approach 1 to catalog, thus lifting the constraints on size is possible,
but is very complicated.

Any other ideas to use catalog table as the persistent storage here? Does
anybody think, catalog table is a viable option?

3. WAL records - Since the algorithm follows "write ahead of action", WAL
seems to be a possible way to persist the foreign transaction entries. But
WAL records can not be used for repeated scan as is required by the foreign
transaction resolver. Also, replaying WALs is controlled by checkpoint, so
not all WALs are replayed. If a checkpoint happens after a foreign prepared
transaction remains resolved, corresponding WALs will never be replayed,
thus causing the foreign prepared transaction to remain unresolved forever
without a clue. So, WALs alone don't seem to be a fit here.

The algorithms rely on the FDWs to take right steps to the large extent,
rather than controlling each step explicitly. It expects the FDWs to take
the right steps for each event and call the right functions to manipulate
foreign transaction entries. It does not ensure the correctness of these
steps, by say examining the foreign transaction entries in response to each
event or by making the callback return the information and manipulate the
entries within the core. I am willing to go the stricter but more intrusive
route if the others also think that way. Otherwise, the current approach is
less intrusive and I am fine with that too.

-- 
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company
diff --git a/contrib/pg_xlogdump/rmgrdesc.c b/contrib/pg_xlogdump/rmgrdesc.c
index 180818d..2fc6d82 100644
--- a/contrib/pg_xlogdump/rmgrdesc.c
+++ b/contrib/pg_xlogdump/rmgrdesc.c
@@ -14,20 +14,21 @@
 #include "access/gin.h"
 #include "access/gist_private.h"
 #include "access/hash.h"
 #include "access/heapam_xlog.h"
 #include "access/multixact.h"
 #include "access/nbtree.h"
 #include "access/rmgr.h"
 #include "access/spgist.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/fdw_xact.h"
 #include "catalog/storage_xlog.h"
 #include "commands/dbcommands.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "rmgrdesc.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
 
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
 	{ name, desc, identify},
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4e02cb2..3653c58 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -8,20 +8,22 @@
  * IDENTIFICATION
  *		  contrib/postgres_fdw/connection.c
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
 #include "postgres_fdw.h"
 
 #include "access/xact.h"
+#include "access/fdw_xact.h"
+#include "commands/defrem.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
 
 /*
  * Connection cache hash table entry
  *
  * The lookup key in this hash table is the foreign server OID plus the user
@@ -42,45 +44,51 @@ typedef struct ConnCacheKey
 } ConnCacheKey;
 
 typedef struct ConnCacheEntry
 {
 	ConnCacheKey key;			/* hash key (must be first) */
 	PGconn	   *conn;			/* connection to foreign server, or NULL */
 	int			xact_depth;		/* 0 = no xact open, 1 = main xact open, 2 =
 								 * one level of subxact open, etc */
 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
 	bool		have_error;		/* have any subxacts aborted in this xact? */
+	char		*prep_xact_name;	/* Name of prepared transaction on this connection */
+	int			prep_xact_id;
 } ConnCacheEntry;
 
 /*
  * Connection cache (initialized on first use)
  */
 static HTAB *ConnectionHash = NULL;
 
 /* for assigning cursor numbers and prepared statement numbers */
 static unsigned int cursor_number = 0;
 static unsigned int prep_stmt_number = 0;
 
 /* tracks whether any work is needed in callback functions */
 static bool xact_got_connection = false;
 
 /* prototypes of private functions */
 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void check_conn_params(const char **keywords, const char **values);
 static void configure_remote_session(PGconn *conn);
 static void do_sql_command(PGconn *conn, const char *sql);
-static void begin_remote_xact(ConnCacheEntry *entry);
+static void begin_remote_xact(ConnCacheEntry *entry, ForeignServer *server);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
 static void pgfdw_subxact_callback(SubXactEvent event,
 					   SubTransactionId mySubid,
 					   SubTransactionId parentSubid,
 					   void *arg);
+static bool is_server_twophase_compliant(ForeignServer *server);
+static void prepare_foreign_xact(ConnCacheEntry *entry, char *prep_xact_name);
+static void resolve_foreign_xact(ConnCacheEntry *entry, int action);
+static char *pgfdw_get_prep_xact_id(ConnCacheEntry *entry);
 
 
 /*
  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
  * server with the user's authorization.  A new connection is established
  * if we don't already have a suitable one, and a transaction is opened at
  * the right subtransaction nesting depth if we didn't do that already.
  *
  * will_prep_stmt must be true if caller intends to create any prepared
  * statements.  Since those don't go away automatically at transaction end
@@ -88,21 +96,21 @@ static void pgfdw_subxact_callback(SubXactEvent event,
  *
  * XXX Note that caching connections theoretically requires a mechanism to
  * detect change of FDW objects to invalidate already established connections.
  * We could manage that by watching for invalidation events on the relevant
  * syscaches.  For the moment, though, it's not clear that this would really
  * be useful and not mere pedantry.  We could not flush any active connections
  * mid-transaction anyway.
  */
 PGconn *
 GetConnection(ForeignServer *server, UserMapping *user,
-			  bool will_prep_stmt)
+			  bool will_prep_stmt, bool start_transaction)
 {
 	bool		found;
 	ConnCacheEntry *entry;
 	ConnCacheKey key;
 
 	/* First time through, initialize connection cache hashtable */
 	if (ConnectionHash == NULL)
 	{
 		HASHCTL		ctl;
 
@@ -116,38 +124,36 @@ GetConnection(ForeignServer *server, UserMapping *user,
 									 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
 		/*
 		 * Register some callback functions that manage connection cleanup.
 		 * This should be done just once in each backend.
 		 */
 		RegisterXactCallback(pgfdw_xact_callback, NULL);
 		RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
 	}
 
-	/* Set flag that we did GetConnection during the current transaction */
-	xact_got_connection = true;
-
 	/* Create hash key for the entry.  Assume no pad bytes in key struct */
 	key.serverid = server->serverid;
 	key.userid = user->userid;
 
 	/*
 	 * Find or create cached entry for requested connection.
 	 */
 	entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
 	if (!found)
 	{
 		/* initialize new hashtable entry (key is already filled in) */
 		entry->conn = NULL;
 		entry->xact_depth = 0;
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
+		entry->prep_xact_name = NULL;
 	}
 
 	/*
 	 * We don't check the health of cached connection here, because it would
 	 * require some overhead.  Broken connection will be detected when the
 	 * connection is actually used.
 	 */
 
 	/*
 	 * If cache entry doesn't have a connection, we have to establish a new
@@ -155,26 +161,33 @@ GetConnection(ForeignServer *server, UserMapping *user,
 	 * will be left in a valid empty state.)
 	 */
 	if (entry->conn == NULL)
 	{
 		entry->xact_depth = 0;	/* just to be sure */
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
 		entry->conn = connect_pg_server(server, user);
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
 			 entry->conn, server->servername);
+		entry->prep_xact_name = NULL;
 	}
 
 	/*
 	 * Start a new transaction or subtransaction if needed.
 	 */
-	begin_remote_xact(entry);
+	if (start_transaction)
+	{
+		begin_remote_xact(entry, server);
+		/* Set flag that we did GetConnection during the current transaction */
+		xact_got_connection = true;
+	}
+
 
 	/* Remember if caller will prepare statements */
 	entry->have_prep_stmt |= will_prep_stmt;
 
 	return entry->conn;
 }
 
 /*
  * Connect to remote server using specified server and user mapping properties.
  */
@@ -362,29 +375,38 @@ do_sql_command(PGconn *conn, const char *sql)
  * Start remote transaction or subtransaction, if needed.
  *
  * Note that we always use at least REPEATABLE READ in the remote session.
  * This is so that, if a query initiates multiple scans of the same or
  * different foreign tables, we will get snapshot-consistent results from
  * those scans.  A disadvantage is that we can't provide sane emulation of
  * READ COMMITTED behavior --- it would be nice if we had some other way to
  * control which remote queries share a snapshot.
  */
 static void
-begin_remote_xact(ConnCacheEntry *entry)
+begin_remote_xact(ConnCacheEntry *entry, ForeignServer *server)
 {
 	int			curlevel = GetCurrentTransactionNestLevel();
 
 	/* Start main transaction if we haven't yet */
 	if (entry->xact_depth <= 0)
 	{
 		const char *sql;
 
+		/*
+		 * Register the new foreign server and check whether the two phase
+		 * compliance is needed. The function would throw error, if the
+		 * transaction involves multiple foreign server and the one being
+		 * registered does not support 2PC. 
+		 */
+		RegisterXactForeignServer(entry->key.serverid,
+									is_server_twophase_compliant(server));
+
 		elog(DEBUG3, "starting remote transaction on connection %p",
 			 entry->conn);
 
 		if (IsolationIsSerializable())
 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
 		else
 			sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
 		do_sql_command(entry->conn, sql);
 		entry->xact_depth = 1;
 	}
@@ -506,20 +528,50 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 		if (clear)
 			PQclear(res);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
 	if (clear)
 		PQclear(res);
 }
 
 /*
+ * pgfdw_get_prep_xact_id
+ * The function crafts prepared transaction identifier. PostgreSQL documentation
+ * mentions two restrictions on the name
+ * 1. String literal, less than 200 bytes long.
+ * 2. Should not be same as any other concurrent prepared transaction id.
+ *
+ * To make the prepared transaction id, we should ideally use something like
+ * UUID, which gives unique ids with high probability, but that may be expensive
+ * here and UUID extension which provides the function to generate UUID is part
+ * of the extension not the core.
+ */
+static char *
+pgfdw_get_prep_xact_id(ConnCacheEntry *entry)
+{
+/* Maximum length of the prepared transaction id, borrowed from twophase.c */
+#define PREP_XACT_ID_MAX_LEN 200
+#define RANDOM_LARGE_MULTIPLIER 1000
+	char	*prep_xact_name;
+	MemoryContext	old_context;
+
+	old_context = MemoryContextSwitchTo(CacheMemoryContext);
+	/* Allocate the memory in the same context as the hash entry */
+	prep_xact_name = (char *)palloc(PREP_XACT_ID_MAX_LEN * sizeof(char));
+	MemoryContextSwitchTo(old_context);
+	snprintf(prep_xact_name, PREP_XACT_ID_MAX_LEN, "%s_%4d_%d_%d",
+								"px", abs(random() * RANDOM_LARGE_MULTIPLIER),
+								entry->key.serverid, entry->key.userid);
+	return prep_xact_name;
+}
+/*
  * pgfdw_xact_callback --- cleanup at main-transaction end.
  */
 static void
 pgfdw_xact_callback(XactEvent event, void *arg)
 {
 	HASH_SEQ_STATUS scan;
 	ConnCacheEntry *entry;
 
 	/* Quick exit if no connections were touched in this transaction. */
 	if (!xact_got_connection)
@@ -540,22 +592,38 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
 		/* If it has an open remote transaction, try to close it */
 		if (entry->xact_depth > 0)
 		{
 			elog(DEBUG3, "closing remote transaction on connection %p",
 				 entry->conn);
 
 			switch (event)
 			{
 				case XACT_EVENT_PRE_COMMIT:
-					/* Commit all remote transactions during pre-commit */
-					do_sql_command(entry->conn, "COMMIT TRANSACTION");
+					/* 
+					 * If the local server requires two phase commit (because
+					 * there are more than one foreign servers involved in the
+					 * transaction), execute the first phase i.e. prepare the
+					 * transaction on the foreign server. The server will tell
+					 * us whether to commit or rollback that prepared
+					 * transaction.
+					 * Otherwise, commit the transaction. If the COMMIT fails,
+					 * the local transaction will be rolled back.
+					 */
+
+					if (FdwTwoPhaseNeeded())
+						prepare_foreign_xact(entry, pgfdw_get_prep_xact_id(entry));
+					else
+					{
+						do_sql_command(entry->conn, "COMMIT TRANSACTION");
+						entry->xact_depth = 0;
+					}
 
 					/*
 					 * If there were any errors in subtransactions, and we
 					 * made prepared statements, do a DEALLOCATE ALL to make
 					 * sure we get rid of all prepared statements. This is
 					 * annoying and not terribly bulletproof, but it's
 					 * probably not worth trying harder.
 					 *
 					 * DEALLOCATE ALL only exists in 8.3 and later, so this
 					 * constrains how old a server postgres_fdw can
@@ -567,86 +635,107 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					if (entry->have_prep_stmt && entry->have_error)
 					{
 						res = PQexec(entry->conn, "DEALLOCATE ALL");
 						PQclear(res);
 					}
 					entry->have_prep_stmt = false;
 					entry->have_error = false;
 					break;
 				case XACT_EVENT_PRE_PREPARE:
-
 					/*
 					 * We disallow remote transactions that modified anything,
 					 * since it's not very reasonable to hold them open until
 					 * the prepared transaction is committed.  For the moment,
 					 * throw error unconditionally; later we might allow
 					 * read-only cases.  Note that the error will cause us to
 					 * come right back here with event == XACT_EVENT_ABORT, so
 					 * we'll clean up the connection state at that point.
 					 */
 					ereport(ERROR,
 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 							 errmsg("cannot prepare a transaction that modified remote tables")));
 					break;
 				case XACT_EVENT_COMMIT:
+					/*
+					 * The local transaction has committed. If we prepared a
+					 * transaction on a foreign server, commit it.
+					 */
+					if (entry->prep_xact_name)
+					{
+						resolve_foreign_xact(entry, FDW_XACT_COMMITTING);
+						entry->xact_depth = 0;
+					}
+					else
+						/* Pre-commit should have closed the open transaction */
+						elog(ERROR, "missed cleaning up connection during pre-commit");
+
+					break;
 				case XACT_EVENT_PREPARE:
 					/* Pre-commit should have closed the open transaction */
 					elog(ERROR, "missed cleaning up connection during pre-commit");
 					break;
 				case XACT_EVENT_ABORT:
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
 					/* If we're aborting, abort all remote transactions too */
-					res = PQexec(entry->conn, "ABORT TRANSACTION");
-					/* Note: can't throw ERROR, it would be infinite loop */
-					if (PQresultStatus(res) != PGRES_COMMAND_OK)
-						pgfdw_report_error(WARNING, res, entry->conn, true,
-										   "ABORT TRANSACTION");
+					if (entry->prep_xact_name)
+					{
+						/*
+						 * We have prepared transaction on the remote server,
+						 * roll it back.
+						 */
+						resolve_foreign_xact(entry, FDW_XACT_ABORTING);
+						entry->xact_depth = 0;
+					}
 					else
 					{
+						res = PQexec(entry->conn, "ABORT TRANSACTION");
+						entry->xact_depth = 0;
+						/* Note: can't throw ERROR, it would be infinite loop */
+						if (PQresultStatus(res) != PGRES_COMMAND_OK)
+							pgfdw_report_error(WARNING, res, entry->conn, true,
+											   "ABORT TRANSACTION");
+					}
+					/* As above, make sure to clear any prepared stmts */
+					if (entry->have_prep_stmt && entry->have_error)
+					{
+						res = PQexec(entry->conn, "DEALLOCATE ALL");
 						PQclear(res);
-						/* As above, make sure to clear any prepared stmts */
-						if (entry->have_prep_stmt && entry->have_error)
-						{
-							res = PQexec(entry->conn, "DEALLOCATE ALL");
-							PQclear(res);
-						}
-						entry->have_prep_stmt = false;
-						entry->have_error = false;
 					}
+					entry->have_prep_stmt = false;
+					entry->have_error = false;
 					break;
 			}
 		}
 
-		/* Reset state to show we're out of a transaction */
-		entry->xact_depth = 0;
-
 		/*
 		 * If the connection isn't in a good idle state, discard it to
 		 * recover. Next GetConnection will open a new connection.
 		 */
 		if (PQstatus(entry->conn) != CONNECTION_OK ||
 			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
 		{
 			elog(DEBUG3, "discarding connection %p", entry->conn);
 			PQfinish(entry->conn);
 			entry->conn = NULL;
 		}
 	}
 
 	/*
-	 * Regardless of the event type, we can now mark ourselves as out of the
-	 * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
-	 * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
+	 * TODO:
+	 * With 2PC this optimization needs to be revised.
+	 * If we have aborted or committed the transaction, we can now mark ourselves
+	 * as out of the transaction.
 	 */
-	xact_got_connection = false;
+	if (event != XACT_EVENT_PRE_COMMIT)  
+		xact_got_connection = false;
 
 	/* Also reset cursor numbering for next transaction */
 	cursor_number = 0;
 }
 
 /*
  * pgfdw_subxact_callback --- cleanup at subtransaction end.
  */
 static void
 pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
@@ -705,10 +794,109 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 				pgfdw_report_error(WARNING, res, entry->conn, true, sql);
 			else
 				PQclear(res);
 		}
 
 		/* OK, we're outta that level of subtransaction */
 		entry->xact_depth--;
 	}
 }
+
+/*
+ * is_server_twophase_compliant
+ * Returns true if the foreign server is configured to support 2PC.
+ */
+static bool
+is_server_twophase_compliant(ForeignServer *server)
+{
+	/* 
+	 * TODO:
+	 * Do we need to check whether the server passed in is indeed
+	 * PostgreSQL server? Probably not.
+	 */
+	ListCell		*lc;
+	
+	/* Check the options for two phase compliance */ 
+	foreach(lc, server->options)
+	{
+		DefElem    *d = (DefElem *) lfirst(lc);
+
+		if (strcmp(d->defname, "twophase_compliant") == 0)
+		{
+			return defGetBoolean(d);
+		}
+	}
+	/* By default a server is not 2PC compliant */
+	return false;
+}
+
+/*
+ * prepare_foreign_xact
+ * Prepare the transaction on the foreign server indicated by the entry with
+ * passed in GID.
+ */
+static void
+prepare_foreign_xact(ConnCacheEntry *entry, char *prep_xact_name)
+{
+	StringInfo	command = makeStringInfo();
+	PGresult   *res;
+
+	entry->prep_xact_id = insert_fdw_xact(MyDatabaseId, GetCurrentTransactionId(),
+										entry->key.serverid, entry->key.userid,
+										strlen(prep_xact_name) + 1,
+										prep_xact_name, FDW_XACT_PREPARING);
+	appendStringInfo(command, "PREPARE TRANSACTION '%s'", prep_xact_name);
+	res = PQexec(entry->conn, command->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		/*
+		 * An error occured, and we didn't prepare the transaction. Delete the
+		 * entry from foreign transaction table. Raise an error, so that the
+		 * local server knows that one of the foreign server has failed to
+		 * prepare the transaction.
+		 */
+		remove_fdw_xact(entry->prep_xact_id);
+		pgfdw_report_error(ERROR, res, entry->conn, true, command->data);
+	}
+
+	PQclear(res);
+	/* Preparation succeeded, remember that we have prepared a transaction */
+	entry->prep_xact_name = prep_xact_name;
+	/* The transaction got prepared, register this fact */
+	update_fdw_xact(entry->prep_xact_id, FDW_XACT_PREPARED);
+}
+
+static void
+resolve_foreign_xact(ConnCacheEntry *entry, int action)
+{
+	StringInfo	command = makeStringInfo();
+	PGresult	*res;
+
+	/* Remember transaction resolution */
+	update_fdw_xact(entry->prep_xact_id, action);
+	if (action == FDW_XACT_COMMITTING)
+		appendStringInfo(command, "COMMIT PREPARED '%s'",
+								entry->prep_xact_name);
+	else if (action == FDW_XACT_ABORTING)
+		appendStringInfo(command, "ROLLBACK PREPARED '%s'",
+								entry->prep_xact_name);
+	else
+		elog(ERROR, "Wrong action %d while resolving foreign transaction", action);
+
+	res = PQexec(entry->conn, command->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		/*
+		 * The prepared foreign transaction couldn't be resolved. It will be
+		 * resolved later when pg_fdw_resolve() gets called.
+		 */
+	}
+	else
+		/* We succeeded in resolving the transaction, update the information */
+		remove_fdw_xact(entry->prep_xact_id);
+
+	PQclear(res);
+	pfree(entry->prep_xact_name);
+	entry->prep_xact_name = NULL;
+	return;
+}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 583cce7..50b60ea 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -1,20 +1,21 @@
 -- ===================================================================
 -- create FDW objects
 -- ===================================================================
 CREATE EXTENSION postgres_fdw;
 CREATE SERVER testserver1 FOREIGN DATA WRAPPER postgres_fdw;
 DO $d$
     BEGIN
         EXECUTE $$CREATE SERVER loopback FOREIGN DATA WRAPPER postgres_fdw
             OPTIONS (dbname '$$||current_database()||$$',
-                     port '$$||current_setting('port')||$$'
+                     port '$$||current_setting('port')||$$',
+					 twophase_compliant 'true'
             )$$;
     END;
 $d$;
 CREATE USER MAPPING FOR public SERVER testserver1
 	OPTIONS (user 'value', password 'value');
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 -- ===================================================================
 -- create objects used through FDW loopback server
 -- ===================================================================
 CREATE TYPE user_enum AS ENUM ('foo', 'bar', 'buz');
@@ -3249,10 +3250,133 @@ ERROR:  type "public.Colors" does not exist
 LINE 4:   "Col" public."Colors" OPTIONS (column_name 'Col')
                 ^
 QUERY:  CREATE FOREIGN TABLE t5 (
   c1 integer OPTIONS (column_name 'c1'),
   c2 text OPTIONS (column_name 'c2') COLLATE pg_catalog."C",
   "Col" public."Colors" OPTIONS (column_name 'Col')
 ) SERVER loopback
 OPTIONS (schema_name 'import_source', table_name 't5');
 CONTEXT:  importing foreign table "t5"
 ROLLBACK;
+-- This will suppress the context of errors, which contains prepared transaction
+-- IDs. Those come out to be different each time.
+\set VERBOSITY terse
+-- Test transactional consistency for multiple server case
+-- create two loopback servers for testing consistency on two connections
+DO $d$
+    BEGIN
+        EXECUTE $$CREATE SERVER loopback1 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$',
+					 twophase_compliant 'true'
+            )$$;
+    END;
+$d$;
+DO $d$
+    BEGIN
+        EXECUTE $$CREATE SERVER loopback2 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$',
+					 twophase_compliant 'true'
+            )$$;
+    END;
+$d$;
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback1;
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+-- create a local table to refer to as foreign table. Add a row. The table has
+-- constraints which are deferred till end of transaction. This allows commit
+-- time errors occur by inserting data which violates constraints.
+CREATE TABLE lt(val int UNIQUE DEFERRABLE INITIALLY DEFERRED);
+INSERT INTO lt VALUES (1);
+INSERT INTO lt VALUES (3);
+-- create two foreign tables each on separate server referring to the local table.
+CREATE FOREIGN TABLE ft1_lt (val int) SERVER loopback1 OPTIONS (table_name 'lt');
+CREATE FOREIGN TABLE ft2_lt (val int) SERVER loopback2 OPTIONS (table_name 'lt');
+-- In a transaction insert two rows one each to the two foreign tables. One of
+-- the rows violates the constraint and other not. At the time of commit
+-- constraints on one of the server will rollback transaction on that server.
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (1); -- Violates constraint
+	INSERT INTO ft2_lt VALUES (2);
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (4);
+	INSERT INTO ft2_lt VALUES (3); -- Violates constraint
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+-- Transaction involving local changes and remote changes, one of them or both
+-- violating the constraints
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (1); -- violates constraints
+	INSERT INTO ft1_lt VALUES (2);
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (2);
+	INSERT INTO ft1_lt VALUES (1); -- violates constraints
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (1); -- violates constraints 
+	INSERT INTO ft1_lt VALUES (3); -- violates constraints
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+-- Next test shows local transaction fails if "any" of the remote transactions
+-- fail to commit. But any COMMITted transaction on the remote servers remains
+-- COMMITTED.
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (2);
+	INSERT INTO ft1_lt VALUES (4);
+	INSERT INTO ft2_lt VALUES (1); -- violates constraints
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+-- What if one of the servers involved in a transaction isn't capable of 2PC?
+-- The transaction in this test doesn't violate any constraints.
+ALTER SERVER loopback2 OPTIONS (SET twophase_compliant 'false'); 
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (2);
+	INSERT INTO ft1_lt VALUES (4);
+	INSERT INTO ft2_lt VALUES (5);
+ERROR:  Detected Two Phase Commit incapable foreign servers in a transaction involving multiple foreign servers.
+COMMIT TRANSACTION;
+DROP SERVER loopback1 CASCADE;
+NOTICE:  drop cascades to 2 other objects
+DROP SERVER loopback2 CASCADE;
+NOTICE:  drop cascades to 2 other objects
+DROP TABLE lt;
+\set VERBOSITY default
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 7547ec2..ed956ab 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -98,21 +98,22 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
 					 errmsg("invalid option \"%s\"", def->defname),
 					 errhint("Valid options in this context are: %s",
 							 buf.data)));
 		}
 
 		/*
 		 * Validate option value, when we can do so without any context.
 		 */
 		if (strcmp(def->defname, "use_remote_estimate") == 0 ||
-			strcmp(def->defname, "updatable") == 0)
+			strcmp(def->defname, "updatable") == 0 ||
+			strcmp(def->defname, "twophase_compliant") == 0)
 		{
 			/* these accept only boolean values */
 			(void) defGetBoolean(def);
 		}
 		else if (strcmp(def->defname, "fdw_startup_cost") == 0 ||
 				 strcmp(def->defname, "fdw_tuple_cost") == 0)
 		{
 			/* these must have a non-negative numeric value */
 			double		val;
 			char	   *endp;
@@ -146,20 +147,22 @@ InitPgFdwOptions(void)
 		{"column_name", AttributeRelationId, false},
 		/* use_remote_estimate is available on both server and table */
 		{"use_remote_estimate", ForeignServerRelationId, false},
 		{"use_remote_estimate", ForeignTableRelationId, false},
 		/* cost factors */
 		{"fdw_startup_cost", ForeignServerRelationId, false},
 		{"fdw_tuple_cost", ForeignServerRelationId, false},
 		/* updatable is available on both server and table */
 		{"updatable", ForeignServerRelationId, false},
 		{"updatable", ForeignTableRelationId, false},
+		/* 2PC compatibility */
+		{"twophase_compliant", ForeignServerRelationId, false},
 		{NULL, InvalidOid, false}
 	};
 
 	/* Prevent redundant initialization. */
 	if (postgres_fdw_options)
 		return;
 
 	/*
 	 * Get list of valid libpq options.
 	 *
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index d76e739..c2ebeec 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -9,20 +9,21 @@
  *		  contrib/postgres_fdw/postgres_fdw.c
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
 #include "postgres_fdw.h"
 
 #include "access/htup_details.h"
 #include "access/sysattr.h"
+#include "access/fdw_xact.h"
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
 #include "optimizer/cost.h"
 #include "optimizer/pathnode.h"
@@ -281,20 +282,22 @@ static void postgresExplainForeignScan(ForeignScanState *node,
 static void postgresExplainForeignModify(ModifyTableState *mtstate,
 							 ResultRelInfo *rinfo,
 							 List *fdw_private,
 							 int subplan_index,
 							 ExplainState *es);
 static bool postgresAnalyzeForeignTable(Relation relation,
 							AcquireSampleRowsFunc *func,
 							BlockNumber *totalpages);
 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
 							Oid serverOid);
+static bool postgresResolvePreparedTransaction(Oid serveroid, Oid userid, int prep_xact_len,
+									char *prep_xact_name, int xact_resolution);
 
 /*
  * Helper functions
  */
 static void estimate_path_cost_size(PlannerInfo *root,
 						RelOptInfo *baserel,
 						List *join_conds,
 						double *p_rows, int *p_width,
 						Cost *p_startup_cost, Cost *p_total_cost);
 static void get_remote_estimate(const char *sql,
@@ -361,20 +364,22 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	/* Support functions for EXPLAIN */
 	routine->ExplainForeignScan = postgresExplainForeignScan;
 	routine->ExplainForeignModify = postgresExplainForeignModify;
 
 	/* Support functions for ANALYZE */
 	routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
 
 	/* Support functions for IMPORT FOREIGN SCHEMA */
 	routine->ImportForeignSchema = postgresImportForeignSchema;
 
+	/* Support functions for resolving transactions */
+	routine->ResolvePreparedTransaction = postgresResolvePreparedTransaction;
 	PG_RETURN_POINTER(routine);
 }
 
 /*
  * postgresGetForeignRelSize
  *		Estimate # of rows and width of the result of the scan
  *
  * We should consider the effect of all baserestrictinfo clauses here, but
  * not any join clauses.
  */
@@ -912,21 +917,21 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	/* Get info about foreign table. */
 	fsstate->rel = node->ss.ss_currentRelation;
 	table = GetForeignTable(RelationGetRelid(fsstate->rel));
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(userid, server->serverid);
 
 	/*
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	fsstate->conn = GetConnection(server, user, false);
+	fsstate->conn = GetConnection(server, user, false, true);
 
 	/* Assign a unique ID for my cursor */
 	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
 	fsstate->cursor_exists = false;
 
 	/* Get private info created by planner functions. */
 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
 									 FdwScanPrivateSelectSql));
 	fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
 											   FdwScanPrivateRetrievedAttrs);
@@ -1297,21 +1302,21 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 	 */
 	rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
 
 	/* Get info about foreign table. */
 	table = GetForeignTable(RelationGetRelid(rel));
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(userid, server->serverid);
 
 	/* Open connection; report that we'll create a prepared statement. */
-	fmstate->conn = GetConnection(server, user, true);
+	fmstate->conn = GetConnection(server, user, true, true);
 	fmstate->p_name = NULL;		/* prepared statement not made yet */
 
 	/* Deconstruct fdw_private data. */
 	fmstate->query = strVal(list_nth(fdw_private,
 									 FdwModifyPrivateUpdateSql));
 	fmstate->target_attrs = (List *) list_nth(fdw_private,
 											  FdwModifyPrivateTargetAttnums);
 	fmstate->has_returning = intVal(list_nth(fdw_private,
 											 FdwModifyPrivateHasReturning));
 	fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
@@ -1747,21 +1752,21 @@ estimate_path_cost_size(PlannerInfo *root,
 		deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
 						 &retrieved_attrs);
 		if (fpinfo->remote_conds)
 			appendWhereClause(&sql, root, baserel, fpinfo->remote_conds,
 							  true, NULL);
 		if (remote_join_conds)
 			appendWhereClause(&sql, root, baserel, remote_join_conds,
 							  (fpinfo->remote_conds == NIL), NULL);
 
 		/* Get the remote estimate */
-		conn = GetConnection(fpinfo->server, fpinfo->user, false);
+		conn = GetConnection(fpinfo->server, fpinfo->user, false, true);
 		get_remote_estimate(sql.data, conn, &rows, &width,
 							&startup_cost, &total_cost);
 		ReleaseConnection(conn);
 
 		retrieved_rows = rows;
 
 		/* Factor in the selectivity of the locally-checked quals */
 		local_sel = clauselist_selectivity(root,
 										   local_join_conds,
 										   baserel->relid,
@@ -2311,21 +2316,21 @@ postgresAnalyzeForeignTable(Relation relation,
 	 * it's probably not worth redefining that API at this point.
 	 */
 
 	/*
 	 * Get the connection to use.  We do the remote access as the table's
 	 * owner, even if the ANALYZE was started by some other user.
 	 */
 	table = GetForeignTable(RelationGetRelid(relation));
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
-	conn = GetConnection(server, user, false);
+	conn = GetConnection(server, user, false, true);
 
 	/*
 	 * Construct command to get page count for relation.
 	 */
 	initStringInfo(&sql);
 	deparseAnalyzeSizeSql(&sql, relation);
 
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
@@ -2403,21 +2408,21 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 											ALLOCSET_SMALL_INITSIZE,
 											ALLOCSET_SMALL_MAXSIZE);
 
 	/*
 	 * Get the connection to use.  We do the remote access as the table's
 	 * owner, even if the ANALYZE was started by some other user.
 	 */
 	table = GetForeignTable(RelationGetRelid(relation));
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
-	conn = GetConnection(server, user, false);
+	conn = GetConnection(server, user, false, true);
 
 	/*
 	 * Construct cursor that retrieves whole rows from remote.
 	 */
 	cursor_number = GetCursorNumber(conn);
 	initStringInfo(&sql);
 	appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
 	deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
 
 	/* In what follows, do not risk leaking any PGresults. */
@@ -2605,21 +2610,21 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
 					 errmsg("invalid option \"%s\"", def->defname)));
 	}
 
 	/*
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
 	server = GetForeignServer(serverOid);
 	mapping = GetUserMapping(GetUserId(), server->serverid);
-	conn = GetConnection(server, mapping, false);
+	conn = GetConnection(server, mapping, false, true);
 
 	/* Don't attempt to import collation if remote server hasn't got it */
 	if (PQserverVersion(conn) < 90100)
 		import_collate = false;
 
 	/* Create workspace for strings */
 	initStringInfo(&buf);
 
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
@@ -2963,10 +2968,62 @@ static void
 conversion_error_callback(void *arg)
 {
 	ConversionLocation *errpos = (ConversionLocation *) arg;
 	TupleDesc	tupdesc = RelationGetDescr(errpos->rel);
 
 	if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
 		errcontext("column \"%s\" of foreign table \"%s\"",
 				   NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
 				   RelationGetRelationName(errpos->rel));
 }
+
+/*
+ * postgresResolvePreparedTransaction
+ * Resolve (COMMIT/ABORT) a prepared transaction on the foreign server.
+ * Returns true if resolution is successful, false otherwise.
+ */
+extern bool 
+postgresResolvePreparedTransaction(Oid serveroid, Oid userid, int prep_xact_len,
+									char *prep_xact_name, int xact_resolution)
+{
+	ForeignServer	*foreign_server = GetForeignServer(serveroid); 
+	UserMapping		*user_mapping = GetUserMapping(userid, serveroid);
+	PGconn			*conn = GetConnection(foreign_server, user_mapping, false, false);
+	StringInfo		command = makeStringInfo();
+	PGresult		*res;
+
+	/*
+	 * TODO: This connection shouldn't be doing any active transaction on the
+	 * foreign server. How do we ensure that?
+	 */
+	if (xact_resolution == FDW_XACT_COMMITTING)
+		appendStringInfo(command, "COMMIT PREPARED '%s'", prep_xact_name); 
+	else if (xact_resolution == FDW_XACT_ABORTING)
+		appendStringInfo(command, "ROLLBACK PREPARED '%s'", prep_xact_name); 
+
+	res = PQexec(conn, command->data);
+	ReleaseConnection(conn);
+	if (PQresultStatus(res) == PGRES_COMMAND_OK) 
+	{
+		PQclear(res);
+		return true;
+	}
+
+	/*
+	 * TODO: need to work out a macro for error code, rather than hard coded
+	 * value here. We can't use ERRCODE_UNDEFINED_OBJECT directly, it's way different
+	 * than the integer value.
+	 */
+	if (atoi(PQresultErrorField(res, PG_DIAG_SQLSTATE)) == 42704)
+	{
+		/*
+		 * The prepared transaction name couldn't be identified, probably
+		 * resolved.
+		 */
+		PQclear(res);
+		return true;
+	}
+
+	/* For anything else, return failed status */
+	PQclear(res);
+	return false;
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 950c6f7..f446c90 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -19,21 +19,21 @@
 #include "utils/relcache.h"
 
 #include "libpq-fe.h"
 
 /* in postgres_fdw.c */
 extern int	set_transmission_modes(void);
 extern void reset_transmission_modes(int nestlevel);
 
 /* in connection.c */
 extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
-			  bool will_prep_stmt);
+			  bool will_prep_stmt, bool start_transaction);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 				   bool clear, const char *sql);
 
 /* in option.c */
 extern int ExtractConnectionOptions(List *defelems,
 						 const char **keywords,
 						 const char **values);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 83e8fa7..95b940e 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2,21 +2,22 @@
 -- create FDW objects
 -- ===================================================================
 
 CREATE EXTENSION postgres_fdw;
 
 CREATE SERVER testserver1 FOREIGN DATA WRAPPER postgres_fdw;
 DO $d$
     BEGIN
         EXECUTE $$CREATE SERVER loopback FOREIGN DATA WRAPPER postgres_fdw
             OPTIONS (dbname '$$||current_database()||$$',
-                     port '$$||current_setting('port')||$$'
+                     port '$$||current_setting('port')||$$',
+					 twophase_compliant 'true'
             )$$;
     END;
 $d$;
 
 CREATE USER MAPPING FOR public SERVER testserver1
 	OPTIONS (user 'value', password 'value');
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 
 -- ===================================================================
 -- create objects used through FDW loopback server
@@ -714,10 +715,106 @@ IMPORT FOREIGN SCHEMA nonesuch FROM SERVER nowhere INTO notthere;
 -- We can fake this by dropping the type locally in our transaction.
 CREATE TYPE "Colors" AS ENUM ('red', 'green', 'blue');
 CREATE TABLE import_source.t5 (c1 int, c2 text collate "C", "Col" "Colors");
 
 CREATE SCHEMA import_dest5;
 BEGIN;
 DROP TYPE "Colors" CASCADE;
 IMPORT FOREIGN SCHEMA import_source LIMIT TO (t5)
   FROM SERVER loopback INTO import_dest5;  -- ERROR
 ROLLBACK;
+
+-- This will suppress the context of errors, which contains prepared transaction
+-- IDs. Those come out to be different each time.
+\set VERBOSITY terse
+-- Test transactional consistency for multiple server case
+-- create two loopback servers for testing consistency on two connections
+DO $d$
+    BEGIN
+        EXECUTE $$CREATE SERVER loopback1 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$',
+					 twophase_compliant 'true'
+            )$$;
+    END;
+$d$;
+
+DO $d$
+    BEGIN
+        EXECUTE $$CREATE SERVER loopback2 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$',
+					 twophase_compliant 'true'
+            )$$;
+    END;
+$d$;
+
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback1;
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+
+-- create a local table to refer to as foreign table. Add a row. The table has
+-- constraints which are deferred till end of transaction. This allows commit
+-- time errors occur by inserting data which violates constraints.
+CREATE TABLE lt(val int UNIQUE DEFERRABLE INITIALLY DEFERRED);
+INSERT INTO lt VALUES (1);
+INSERT INTO lt VALUES (3);
+
+-- create two foreign tables each on separate server referring to the local table.
+CREATE FOREIGN TABLE ft1_lt (val int) SERVER loopback1 OPTIONS (table_name 'lt');
+CREATE FOREIGN TABLE ft2_lt (val int) SERVER loopback2 OPTIONS (table_name 'lt');
+
+-- In a transaction insert two rows one each to the two foreign tables. One of
+-- the rows violates the constraint and other not. At the time of commit
+-- constraints on one of the server will rollback transaction on that server.
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (1); -- Violates constraint
+	INSERT INTO ft2_lt VALUES (2);
+COMMIT TRANSACTION;
+
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (4);
+	INSERT INTO ft2_lt VALUES (3); -- Violates constraint
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+-- Transaction involving local changes and remote changes, one of them or both
+-- violating the constraints
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (1); -- violates constraints
+	INSERT INTO ft1_lt VALUES (2);
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (2);
+	INSERT INTO ft1_lt VALUES (1); -- violates constraints
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (1); -- violates constraints 
+	INSERT INTO ft1_lt VALUES (3); -- violates constraints
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+-- Next test shows local transaction fails if "any" of the remote transactions
+-- fail to commit. But any COMMITted transaction on the remote servers remains
+-- COMMITTED.
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (2);
+	INSERT INTO ft1_lt VALUES (4);
+	INSERT INTO ft2_lt VALUES (1); -- violates constraints
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+-- What if one of the servers involved in a transaction isn't capable of 2PC?
+-- The transaction in this test doesn't violate any constraints.
+ALTER SERVER loopback2 OPTIONS (SET twophase_compliant 'false'); 
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (2);
+	INSERT INTO ft1_lt VALUES (4);
+	INSERT INTO ft2_lt VALUES (5);
+COMMIT TRANSACTION;
+
+DROP SERVER loopback1 CASCADE;
+DROP SERVER loopback2 CASCADE;
+DROP TABLE lt;
+\set VERBOSITY default
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 9d4d5db..b43ca07 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -8,16 +8,17 @@
 #
 #-------------------------------------------------------------------------
 
 subdir = src/backend/access/transam
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = clog.o commit_ts.o multixact.o rmgr.o slru.o subtrans.o \
 	timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
 	xact.o xlog.o xlogarchive.o xlogfuncs.o \
-	xloginsert.o xlogreader.o xlogutils.o
+	xloginsert.o xlogreader.o xlogutils.o \
+	fdw_xact.o
 
 include $(top_srcdir)/src/backend/common.mk
 
 # ensure that version checks in xlog.c get recompiled when catversion.h changes
 xlog.o: xlog.c $(top_srcdir)/src/include/catalog/catversion.h
diff --git a/src/backend/access/transam/fdw_xact.c b/src/backend/access/transam/fdw_xact.c
new file mode 100644
index 0000000..6394f19
--- /dev/null
+++ b/src/backend/access/transam/fdw_xact.c
@@ -0,0 +1,464 @@
+/*-------------------------------------------------------------------------
+ *
+ * fdw_xact.c
+ *		PostgreSQL distributed transaction manager. 
+ *
+ * This module manages the transactions involving foreign servers. 
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/fdw_xact.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include <unistd.h>
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "funcapi.h"
+
+#include "access/fdw_xact.h"
+#include "access/htup_details.h"
+#include "storage/lock.h"
+#include "storage/fd.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "catalog/pg_type.h"
+#include "foreign/foreign.h"
+#include "foreign/fdwapi.h"
+
+#define FDW_XACT_FILE_NAME	"fdw_xact"
+
+typedef struct
+{
+	bool		read_from_disk;
+	/*
+	 * TODO:
+	 * We should augment this structure with CRC checksum to validate the
+	 * contents of the file.
+	 */
+	FdwXactData fdw_xacts[1];	/* Variable length array */
+} FdwXactGlobalData;
+
+/*
+ * TODO:
+ * This should be turned into a GUC. If we do so, the size also needs to be
+ * recorded in the file so that a change in the GUC value can be noticed.
+ */
+int	max_fdw_xacts = 100;
+
+static FdwXactGlobalData	*FdwXactGlobal;
+
+static int search_free_fdwxact();
+
+/* TODO: we should do something better to reduce lock conflicts */
+/*
+ * Initialization of shared memory
+ */
+extern Size
+FdwXactShmemSize(void)
+{
+	Size		size;
+
+	/* Need the fixed struct, foreign transaction information array */ 
+	size = offsetof(FdwXactGlobalData, fdw_xacts);
+	size = MAXALIGN(size);
+	size = add_size(size, mul_size(max_fdw_xacts,
+								   sizeof(FdwXactData)));
+
+	return size;
+}
+
+extern void
+FdwXactShmemInit(void)
+{
+	bool		found;
+
+	FdwXactGlobal = ShmemInitStruct("Foreign transactions table",
+									FdwXactShmemSize(),
+									&found);
+	if (!IsUnderPostmaster)
+	{
+		FdwXactGlobal->read_from_disk = true;
+		Assert(!found);
+	}
+	else
+	{
+		Assert(FdwXactGlobal);
+		Assert(found);
+	}
+}
+
+/* 
+ * read_fdw_xact_file
+ * Read the information about the prepared foreign transactions from the disk.
+ * The in-memory copy of the file is upto date always except for the first read
+ * from the disk after server boot. If we read the file at the time of
+ * initialising the memory, we can be always sure that the in-memory copy of the
+ * foreign transaction information is the latest one. This won't need any
+ * special status for unread file.
+ */
+static void
+read_fdw_xact_file()
+{
+	/*
+	 * If we haven't read the file containing information about FDW
+	 * transactions, read it.
+	 */
+	LWLockAcquire(FdwXactLock, LW_EXCLUSIVE);
+	if (FdwXactGlobal->read_from_disk)
+	{
+		int	fd = OpenTransientFile(FDW_XACT_FILE_NAME, O_EXCL | PG_BINARY | O_RDONLY, 0);
+		int	read_size;
+
+		if (fd < 0)
+			ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open FDW transaction state file \"%s\": %m",
+						FDW_XACT_FILE_NAME)));
+
+		/* TODO:
+		 * If we turn the max number of foreign transactions into a GUC, we will
+		 * need to make sure that the changed value has enough space to load the
+		 * file.
+		 */
+		read_size = read(fd, FdwXactGlobal, FdwXactShmemSize());
+		CloseTransientFile(fd);
+		
+		/* If there is nothing in the file, 0 out the memory */
+		if (read_size == 0)
+			memset(FdwXactGlobal, 0, FdwXactShmemSize());
+
+		FdwXactGlobal->read_from_disk = false;
+	}
+	LWLockRelease(FdwXactLock);
+	return;
+}
+
+static void
+flush_fdw_xact_file()
+{
+	int	fd;
+	fd = OpenTransientFile(FDW_XACT_FILE_NAME, O_EXCL | PG_BINARY | O_WRONLY, 0);
+	/*
+	 * TODO: We might come out of this without writing, in case the process is
+	 * interrupted. Take care of this case; check for EINTR error value.
+	 * Do we need to fsync this information or the subsequent close would do
+	 * that?
+	 */
+	if (write(fd, FdwXactGlobal, FdwXactShmemSize()) != FdwXactShmemSize())
+	{
+		CloseTransientFile(fd);
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not write FDW transaction state file: %s", FDW_XACT_FILE_NAME)));
+	}
+	CloseTransientFile(fd);
+	return;
+}
+
+/*
+ * search_free_fdwxact
+ * Search for a free slot in FdwXactGlobal array
+ * The caller is expected to hold the lock on this array.
+ */
+static int 
+search_free_fdwxact()
+{
+	int	ret_id;
+	for (ret_id = 0; ret_id < max_fdw_xacts; ret_id++)
+	{
+		FdwXactData *fdw_xact = &(FdwXactGlobal->fdw_xacts[ret_id]);
+		/* A slot with invalid DBID is considered as free slot */
+		if (!OidIsValid(fdw_xact->dboid))
+			return ret_id;	
+	}
+	/* If we reached here, every slot is filled, throw an error */
+	elog(ERROR, "Limit of foreign prepared transactions exceeded");
+	/* Keep the compiler happy */
+	return -1;
+}
+
+extern int 
+insert_fdw_xact(Oid dboid, TransactionId xid, Oid foreign_server, Oid userid,
+					int fdw_xact_id_len, char *fdw_xact_id, int fdw_xact_status)
+{
+	int	ret_id;
+	FdwXactData	*fdw_xact;
+
+	read_fdw_xact_file();
+	
+	LWLockAcquire(FdwXactLock, LW_EXCLUSIVE);
+	ret_id = search_free_fdwxact();
+
+	fdw_xact = &(FdwXactGlobal->fdw_xacts[ret_id]);
+	fdw_xact->dboid = dboid;
+	fdw_xact->local_xid = xid;
+	fdw_xact->serveroid = foreign_server;
+	fdw_xact->userid = userid;
+	fdw_xact->prep_name_len = fdw_xact_id_len;
+	fdw_xact->fdw_xact_status = fdw_xact_status;
+	memcpy(fdw_xact->prep_xact_info, fdw_xact_id, fdw_xact->prep_name_len);
+	/*
+	 * Write the file to the disk, so that the fact that we performed some
+	 * transaction on this server survives a subsequent crash.
+	 */
+	flush_fdw_xact_file();
+	LWLockRelease(FdwXactLock);
+	return ret_id;
+}
+
+extern int
+update_fdw_xact(int fdw_xact_id, int fdw_xact_status)
+{
+	FdwXactData 	*fdw_xact;
+	read_fdw_xact_file();
+	/* TODO: Validate fdw_xact_id? */
+	/* TODO: Do we need to take a lock here? Probably not. The only process that
+	 * updates the this entry is the one which created it or later one which
+	 * resolves it. Those two can not run in parallel.
+	 */
+	fdw_xact = &(FdwXactGlobal->fdw_xacts[fdw_xact_id]);
+	fdw_xact->fdw_xact_status = fdw_xact_status;
+	/*
+	 * We don't need to flush the file on every update. Status of local
+	 * transaction is enough to infer what should happen to the foreign
+	 * transaction.
+	 */
+	return fdw_xact_id;
+}
+
+extern void 
+remove_fdw_xact(int fdw_xact_id)
+{
+	FdwXactData 	*fdw_xact;
+	/* Read if the file has not already been read */
+	read_fdw_xact_file();
+	/* TODO: Validate fdw_xact_id? */
+	/* The resolver process or the process which created this entry, both can
+	 * try to delete this entry simultaneously or the search can read this entry
+	 * while being deleted and grab it prematurely.
+	 */
+	LWLockAcquire(FdwXactLock, LW_EXCLUSIVE);
+	fdw_xact = &(FdwXactGlobal->fdw_xacts[fdw_xact_id]);
+	fdw_xact->dboid = InvalidOid;
+	LWLockRelease(FdwXactLock);
+}
+
+/*
+ * pg_fdw_xact
+ *		Produce a view with one row per prepared transaction on foreign server.
+ *
+ * This function is here so we don't have to export the
+ * FdwXactGlobalData struct definition.
+ *
+ * TODO:
+ * Like pg_prepared_xact() we should create a working set to take care of
+ * synchronization. 
+ */
+Datum
+pg_fdw_xact(PG_FUNCTION_ARGS)
+{
+	FuncCallContext *funcctx;
+	int				cur;
+
+	read_fdw_xact_file();
+	if (SRF_IS_FIRSTCALL())
+	{
+		TupleDesc	tupdesc;
+		MemoryContext oldcontext;
+
+		/* create a function context for cross-call persistence */
+		funcctx = SRF_FIRSTCALL_INIT();
+
+		/*
+		 * Switch to memory context appropriate for multiple function calls
+		 */
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		/* build tupdesc for result tuples */
+		/* this had better match pg_prepared_xacts view in system_views.sql */
+		tupdesc = CreateTemplateTupleDesc(6, false);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "database oid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "local transaction",
+						   XIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 3, "foreign server oid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 4, "user mapping oid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 5, "status",
+						   INT8OID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 6, "info",
+						   TEXTOID, -1, 0);
+
+		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+		/* We always start from the first entry */
+		funcctx->user_fctx = (void *) 0;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	funcctx = SRF_PERCALL_SETUP();
+	cur = (int) (int *)funcctx->user_fctx;
+
+	/*
+	 * TODO:
+	 * We should really take lock here OR take a snapshot of foreign
+	 * transactions data.
+	 */
+	while (cur < max_fdw_xacts)
+	{
+		FdwXactData	*fdw_xact = &(FdwXactGlobal->fdw_xacts[cur]);
+		Datum		values[6];
+		bool		nulls[6];
+		HeapTuple	tuple;
+		Datum		result;
+
+		/* Skip the empty slots */
+		if (!OidIsValid(fdw_xact->dboid))
+		{
+			cur++;
+			continue;
+		}
+
+		/*
+		 * Form tuple with appropriate data.
+		 */
+		MemSet(values, 0, sizeof(values));
+		MemSet(nulls, 0, sizeof(nulls));
+
+		values[0] = ObjectIdGetDatum(fdw_xact->dboid);
+		values[1] = TransactionIdGetDatum(fdw_xact->local_xid);
+		values[2] = ObjectIdGetDatum(fdw_xact->serveroid);
+		values[3] = ObjectIdGetDatum(fdw_xact->userid);
+		/* TODO: this should be really interpreted by FDW */
+		values[4] = Int8GetDatum(fdw_xact->fdw_xact_status);
+		values[5] = CStringGetTextDatum(fdw_xact->prep_xact_info);
+
+		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+		result = HeapTupleGetDatum(tuple);
+		cur++;
+		funcctx->user_fctx = (void *)(cur);
+		SRF_RETURN_NEXT(funcctx, result);
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * pg_fdw_resolve
+ * a user interface to initiate foreign transaction resolution.
+ */
+Datum
+pg_fdw_resolve(PG_FUNCTION_ARGS)
+{
+	int	cnt_xact;
+	read_fdw_xact_file();
+	/* Scan all the foreign transaction */
+	for (cnt_xact = 0; cnt_xact < max_fdw_xacts; cnt_xact++)
+	{
+		ForeignServer		*foreign_server;
+		ForeignDataWrapper	*fdw;
+		FdwRoutine			*fdw_routine;
+		bool				resolved;
+		FdwXactData			*fdw_xact;
+
+		/*
+		 * TODO: we need to make sure that no one is modifying this entry, while
+		 * we try to resolve it. Following sequence might lead to inconsistent
+		 * entry
+		 * 1. Backend which created this entry marked it as COMMITTING or
+		 * ABORTING
+		 * 2. this function chose to resolve the same entry in some other
+		 * backend (there can be multiple such invocation if we keep it in the
+		 * form of a built-in.
+		 *
+		 * That's a race condition. But possibly there is no hazard except the
+		 * extra messages to the foreign server, as explained below.
+		 * Both of these backends will send corresponding messages to the
+		 * foreign server, one succeeding and other getting an error. Eventual
+		 * result will be that the entry will be removed by one and other will
+		 * leave it untouched. 
+		 */
+		fdw_xact = &(FdwXactGlobal->fdw_xacts[cnt_xact]);
+		
+		/*
+		 * Leave empty slots aside. Leave the entries, whose foreign servers are
+		 * not part of the database where this function was called. We can not
+		 * get information about such foreign servers.
+		 */
+		if (!OidIsValid(fdw_xact->dboid) ||
+				fdw_xact->dboid != MyDatabaseId)
+			continue;
+
+		else if (fdw_xact->fdw_xact_status == FDW_XACT_COMMITTING ||
+				fdw_xact->fdw_xact_status == FDW_XACT_ABORTING)
+		{
+			/*
+			 * We have already decided what to do with the foreing transaction
+			 * nothing to be done.
+			 */
+		}
+		else if (TransactionIdDidCommit(fdw_xact->local_xid))
+		{
+			/* TODO: we should inform the user through warning or error, and let
+			 * him deal when Assert conditions fail.
+			 */
+			Assert(fdw_xact->fdw_xact_status != FDW_XACT_ABORTING);
+			update_fdw_xact(cnt_xact, FDW_XACT_COMMITTING);
+		}
+		else if (TransactionIdDidAbort(fdw_xact->local_xid))
+		{
+			/* TODO: we should inform the user through warning or error, and let
+			 * him deal when Assert conditions fail.
+			 */
+			Assert(fdw_xact->fdw_xact_status != FDW_XACT_COMMITTING);
+			update_fdw_xact(cnt_xact, FDW_XACT_ABORTING);
+		}
+		else if (!TransactionIdIsInProgress(fdw_xact->local_xid))
+		{
+			/*
+			 * The transaction is in progress but not on any of the backends. So
+			 * probably, it crashed before actual abort or commit. So assume it
+			 * to be aborted.
+			 * TODO: In HeapTupleSatisfiesUpdate() any transaction which is not
+			 * TransactionIdIsInProgress() and TransactionIdDidCommit() is
+			 * considered aborted. Can we do the same here?
+			 */
+			/* TODO: we should inform the user through warning or error, and let
+			 * him deal when Assert conditions fail.
+			 */
+			Assert(fdw_xact->fdw_xact_status != FDW_XACT_COMMITTING);
+			update_fdw_xact(cnt_xact, FDW_XACT_ABORTING);
+		}
+		else
+		{
+			/* The local transaction is still in progress */
+			continue;
+		}
+
+		/* Onwards we deal only with resolvable transactions */
+		/* Get the FDW hook to resolve the transaction */
+		foreign_server = GetForeignServer(fdw_xact->serveroid); 
+		fdw = GetForeignDataWrapper(foreign_server->fdwid);
+		fdw_routine = GetFdwRoutine(fdw->fdwhandler);
+		Assert(fdw_routine->ResolvePreparedTransaction);
+		resolved = fdw_routine->ResolvePreparedTransaction(fdw_xact->serveroid,
+												fdw_xact->userid,
+												fdw_xact->prep_name_len,
+												fdw_xact->prep_xact_info,
+												fdw_xact->fdw_xact_status);
+		
+		/* If we succeeded in resolving the transaction, remove the entry */
+		if (resolved)
+			remove_fdw_xact(cnt_xact);
+	}
+	
+	/* Flush the foreign transaction udpates to the disk */
+	LWLockAcquire(FdwXactLock, LW_EXCLUSIVE);
+	flush_fdw_xact_file();
+	LWLockRelease(FdwXactLock);
+}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 97000ef..76c16d6 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -146,20 +146,24 @@ typedef struct TransactionStateData
 	ResourceOwner curTransactionOwner;	/* my query resources */
 	TransactionId *childXids;	/* subcommitted child XIDs, in XID order */
 	int			nChildXids;		/* # of subcommitted child XIDs */
 	int			maxChildXids;	/* allocated size of childXids[] */
 	Oid			prevUser;		/* previous CurrentUserId setting */
 	int			prevSecContext; /* previous SecurityRestrictionContext */
 	bool		prevXactReadOnly;		/* entry-time xact r/o state */
 	bool		startedInRecovery;		/* did we start in recovery? */
 	bool		didLogXid;		/* has xid been included in WAL record? */
 	struct TransactionStateData *parent;		/* back link to parent */
+	int			num_foreign_servers;	/* number of foreign servers participating in the transaction,
+										   Only valid for top level transaction */
+	int			can_prepare;			/* can all the foreign server involved in
+										   this transaction participate in 2PC */
 } TransactionStateData;
 
 typedef TransactionStateData *TransactionState;
 
 /*
  * CurrentTransactionState always points to the current transaction state
  * block.  It will point to TopTransactionStateData when not in a
  * transaction at all, or when in a top-level transaction.
  */
 static TransactionStateData TopTransactionStateData = {
@@ -175,21 +179,23 @@ static TransactionStateData TopTransactionStateData = {
 	NULL,						/* cur transaction context */
 	NULL,						/* cur transaction resource owner */
 	NULL,						/* subcommitted child Xids */
 	0,							/* # of subcommitted child Xids */
 	0,							/* allocated size of childXids[] */
 	InvalidOid,					/* previous CurrentUserId setting */
 	0,							/* previous SecurityRestrictionContext */
 	false,						/* entry-time xact r/o state */
 	false,						/* startedInRecovery */
 	false,						/* didLogXid */
-	NULL						/* link to parent state block */
+	NULL,						/* link to parent state block */
+	0,							/* number of foreign servers participating in the transaction */
+	true						/* By default all the foreign server are capable of 2PC */
 };
 
 /*
  * unreportedXids holds XIDs of all subtransactions that have not yet been
  * reported in a XLOG_XACT_ASSIGNMENT record.
  */
 static int	nUnreportedXids;
 static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS];
 
 static TransactionState CurrentTransactionState = &TopTransactionStateData;
@@ -1807,20 +1813,23 @@ StartTransaction(void)
 	/* SecurityRestrictionContext should never be set outside a transaction */
 	Assert(s->prevSecContext == 0);
 
 	/*
 	 * initialize other subsystems for new transaction
 	 */
 	AtStart_GUC();
 	AtStart_Cache();
 	AfterTriggerBeginXact();
 
+	/* Foreign transaction stuff */
+	s->num_foreign_servers = 0;
+
 	/*
 	 * done with start processing, set current transaction state to "in
 	 * progress"
 	 */
 	s->state = TRANS_INPROGRESS;
 
 	ShowTransactionState("StartTransaction");
 }
 
 
@@ -4971,10 +4980,39 @@ xact_redo(XLogReaderState *record)
 	{
 		xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record);
 
 		if (standbyState >= STANDBY_INITIALIZED)
 			ProcArrayApplyXidAssignment(xlrec->xtop,
 										xlrec->nsubxacts, xlrec->xsub);
 	}
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
+
+extern void
+RegisterXactForeignServer(Oid serveroid, bool can_prepare)
+{
+	TransactionState	top_xact_state = &TopTransactionStateData;
+	top_xact_state->num_foreign_servers++;
+
+	if (top_xact_state->num_foreign_servers == 1)
+		top_xact_state->can_prepare = can_prepare;
+
+	top_xact_state->can_prepare = top_xact_state->can_prepare && can_prepare;
+
+	/*
+	 * If multiple foreign servers are involved in the transaction and any one
+	 * of them is not capable of 2PC
+	 */
+	if (top_xact_state->num_foreign_servers > 1 &&
+		!top_xact_state->can_prepare)
+		ereport(ERROR,
+				(errcode(ERRCODE_T_R_INTEGRITY_CONSTRAINT_VIOLATION),
+				 errmsg("Detected Two Phase Commit incapable foreign servers in a transaction involving multiple foreign servers.")));
+}
+
+extern bool
+FdwTwoPhaseNeeded()
+{
+	TransactionState	top_xact_state = &TopTransactionStateData;
+	return (top_xact_state->num_foreign_servers > 1);
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 16b9808..9b8893b 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -14,20 +14,21 @@
  */
 #include "postgres.h"
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/nbtree.h"
 #include "access/subtrans.h"
 #include "access/twophase.h"
+#include "access/fdw_xact.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/postmaster.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
@@ -130,20 +131,21 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, PMSignalShmemSize());
 		size = add_size(size, ProcSignalShmemSize());
 		size = add_size(size, CheckpointerShmemSize());
 		size = add_size(size, AutoVacuumShmemSize());
 		size = add_size(size, ReplicationSlotsShmemSize());
 		size = add_size(size, WalSndShmemSize());
 		size = add_size(size, WalRcvShmemSize());
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, FdwXactShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
 
 		/* freeze the addin request size and include it */
 		addin_request_allowed = false;
 		size = add_size(size, total_addin_request);
 
 		/* might as well round it off to a multiple of a typical page size */
 		size = add_size(size, 8192 - (size % 8192));
@@ -240,20 +242,21 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	ReplicationSlotsShmemInit();
 	WalSndShmemInit();
 	WalRcvShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
 	 */
 	BTreeShmemInit();
 	SyncScanShmemInit();
 	AsyncShmemInit();
+	FdwXactShmemInit();
 
 #ifdef EXEC_BACKEND
 
 	/*
 	 * Alloc the win32 shared backend array
 	 */
 	if (!IsUnderPostmaster)
 		ShmemBackendArrayAllocation();
 #endif
 
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 18614e7..53a6047 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -196,21 +196,21 @@ static const char *subdirs[] = {
 	"pg_multixact/members",
 	"pg_multixact/offsets",
 	"base",
 	"base/1",
 	"pg_replslot",
 	"pg_tblspc",
 	"pg_stat",
 	"pg_stat_tmp",
 	"pg_logical",
 	"pg_logical/snapshots",
-	"pg_logical/mappings"
+	"pg_logical/mappings",
 };
 
 
 /* path to 'initdb' binary directory */
 static char bin_path[MAXPGPATH];
 static char backend_exec[MAXPGPATH];
 
 static char **replace_token(char **lines,
 			  const char *token, const char *replacement);
 
@@ -225,20 +225,21 @@ static void pre_sync_fname(char *fname, bool isdir);
 static void fsync_fname(char *fname, bool isdir);
 static FILE *popen_check(const char *command, const char *mode);
 static void exit_nicely(void);
 static char *get_id(void);
 static char *get_encoding_id(char *encoding_name);
 static bool mkdatadir(const char *subdir);
 static void set_input(char **dest, char *filename);
 static void check_input(char *path);
 static void write_version_file(char *extrapath);
 static void set_null_conf(void);
+static void set_fdw_xact_file(void);
 static void test_config_settings(void);
 static void setup_config(void);
 static void bootstrap_template1(void);
 static void setup_auth(void);
 static void get_set_pwd(void);
 static void setup_depend(void);
 static void setup_sysviews(void);
 static void setup_description(void);
 static void setup_collation(void);
 static void setup_conversion(void);
@@ -1088,20 +1089,46 @@ set_null_conf(void)
 	if (fclose(conf_file))
 	{
 		fprintf(stderr, _("%s: could not write file \"%s\": %s\n"),
 				progname, path, strerror(errno));
 		exit_nicely();
 	}
 	free(path);
 }
 
 /*
+ * set up an empty config file so we can check config settings by launching
+ * a test backend
+ */
+static void
+set_fdw_xact_file(void)
+{
+	FILE	   *conf_file;
+	char	   *path;
+
+	path = psprintf("%s/fdw_xact", pg_data);
+	conf_file = fopen(path, PG_BINARY_W);
+	if (conf_file == NULL)
+	{
+		fprintf(stderr, _("%s: could not open file \"%s\" for writing: %s\n"),
+				progname, path, strerror(errno));
+		exit_nicely();
+	}
+	if (fclose(conf_file))
+	{
+		fprintf(stderr, _("%s: could not write file \"%s\": %s\n"),
+				progname, path, strerror(errno));
+		exit_nicely();
+	}
+	free(path);
+}
+/*
  * Determine which dynamic shared memory implementation should be used on
  * this platform.  POSIX shared memory is preferable because the default
  * allocation limits are much higher than the limits for System V on most
  * systems that support both, but the fact that a platform has shm_open
  * doesn't guarantee that that call will succeed when attempted.  So, we
  * attempt to reproduce what the postmaster will do when allocating a POSIX
  * segment in dsm_impl.c; if it doesn't work, we assume it won't work for
  * the postmaster either, and configure the cluster for System V shared
  * memory instead.
  */
@@ -3497,20 +3524,21 @@ initialize_data_directory(void)
 	}
 
 	check_ok();
 
 	/* Top level PG_VERSION is checked by bootstrapper, so make it first */
 	write_version_file(NULL);
 
 	/* Select suitable configuration settings */
 	set_null_conf();
 	test_config_settings();
+	set_fdw_xact_file();
 
 	/* Now create all the text config files */
 	setup_config();
 
 	/* Bootstrap template1 */
 	bootstrap_template1();
 
 	/*
 	 * Make the per-database PG_VERSION for template1 only after init'ing it
 	 */
diff --git a/src/include/access/fdw_xact.h b/src/include/access/fdw_xact.h
new file mode 100644
index 0000000..be92eed
--- /dev/null
+++ b/src/include/access/fdw_xact.h
@@ -0,0 +1,51 @@
+/*
+ * fdw_xact.h 
+ *
+ * PostgreSQL distributed transaction manager
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/fdw_xact.h
+ */
+#ifndef FDW_XACT_H 
+#define FDW_XACT_H 
+
+#define MAX_PREP_XACT_INFO_LEN	256
+typedef struct
+{
+	Oid		dboid;				/* database oid where to find foreign server and
+								 * user mapping
+								 */
+	TransactionId	local_xid;
+	Oid		serveroid;			/* foreign server where transaction takes place */
+	Oid		userid;				/* user who initiated the foreign transaction */
+	int		fdw_xact_status;	/* The state of the foreign transaction */
+	uint8	prep_name_len;		/* Length of the prepared transaction data */ 
+	/* 
+	 * TODO: Restricting the size of prepared transaction information may not
+	 * suit all FDWs. One possible idea is to create a file for every foreign
+	 * transaction entry to contain the prepared transaction information
+	 * required by FDW. While resolving the transaction, the information from
+	 * the file would be passed to the FDW routine.
+	 */
+	char	prep_xact_info[MAX_PREP_XACT_INFO_LEN];	/* The prepared transaction data starts here */
+} FdwXactData;
+
+typedef enum
+{
+	FDW_XACT_UNKNOWN = 0,
+	FDW_XACT_PREPARING,
+	FDW_XACT_PREPARED,
+	FDW_XACT_COMMITTING,
+	FDW_XACT_ABORTING
+} FdwXactStatus;
+
+extern Size FdwXactShmemSize(void);
+extern void FdwXactShmemInit(void);
+extern int insert_fdw_xact(Oid dboid, TransactionId xid, Oid foreign_server, Oid user_mapping,
+							int fdw_xact_id_len, char *fdw_xact_id, int fdw_xact_status);
+extern int update_fdw_xact(int fdw_xact_id, int fdw_xact_status);
+extern void remove_fdw_xact(int fdw_xact_id);
+
+#endif /* FDW_XACT_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 8205504..2adad46 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -252,12 +252,14 @@ extern bool IsInTransactionChain(bool isTopLevel);
 extern void RegisterXactCallback(XactCallback callback, void *arg);
 extern void UnregisterXactCallback(XactCallback callback, void *arg);
 extern void RegisterSubXactCallback(SubXactCallback callback, void *arg);
 extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
 
 extern int	xactGetCommittedChildren(TransactionId **ptr);
 
 extern void xact_redo(XLogReaderState *record);
 extern void xact_desc(StringInfo buf, XLogReaderState *record);
 extern const char *xact_identify(uint8 info);
+extern void RegisterXactForeignServer(Oid serveroid, bool can_prepare);
+extern bool FdwTwoPhaseNeeded();
 
 #endif   /* XACT_H */
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 9edfdb8..a00afdb 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5135,20 +5135,24 @@ DESCR("fractional rank of hypothetical row");
 DATA(insert OID = 3989 ( percent_rank_final PGNSP PGUID 12 1 0 2276 0 f f f f f f i 2 0 701 "2281 2276" "{2281,2276}" "{i,v}" _null_ _null_ hypothetical_percent_rank_final _null_ _null_ _null_ ));
 DESCR("aggregate final function");
 DATA(insert OID = 3990 ( cume_dist			PGNSP PGUID 12 1 0 2276 0 t f f f f f i 1 0 701 "2276" "{2276}" "{v}" _null_ _null_ aggregate_dummy _null_ _null_ _null_ ));
 DESCR("cumulative distribution of hypothetical row");
 DATA(insert OID = 3991 ( cume_dist_final	PGNSP PGUID 12 1 0 2276 0 f f f f f f i 2 0 701 "2281 2276" "{2281,2276}" "{i,v}" _null_ _null_ hypothetical_cume_dist_final _null_ _null_ _null_ ));
 DESCR("aggregate final function");
 DATA(insert OID = 3992 ( dense_rank			PGNSP PGUID 12 1 0 2276 0 t f f f f f i 1 0 20 "2276" "{2276}" "{v}" _null_ _null_	aggregate_dummy _null_ _null_ _null_ ));
 DESCR("rank of hypothetical row without gaps");
 DATA(insert OID = 3993 ( dense_rank_final	PGNSP PGUID 12 1 0 2276 0 f f f f f f i 2 0 20 "2281 2276" "{2281,2276}" "{i,v}" _null_ _null_	hypothetical_dense_rank_final _null_ _null_ _null_ ));
 DESCR("aggregate final function");
+DATA(insert OID = 4063 (  pg_fdw_xact PGNSP PGUID 12 1 1000 0 0 f f f f t t v 0 0 2249 "" "{26, 28,26,26,20,25}" "{o,o,o,o,o,o}" "{database oid, local transaction,foreign server oid,user mapping oid,status,info}" _null_ pg_fdw_xact _null_ _null_ _null_ ));
+DESCR("view foreign transactions");
+DATA(insert OID = 4071 (  pg_fdw_resolve PGNSP PGUID 12 1 1000 0 0 f f f f f f v 0 0 2278 "" _null_ _null_ _null_  _null_ pg_fdw_resolve _null_ _null_ _null_ ));
+DESCR("resolve foreign transactions");
 
 
 /*
  * Symbolic values for provolatile column: these indicate whether the result
  * of a function is dependent *only* on the values of its explicit arguments,
  * or can change due to outside factors (such as parameter variables or
  * table contents).  NOTE: functions having side-effects, such as setval(),
  * must be labeled volatile to ensure they will not get optimized away,
  * even if the actual return value is not changeable.
  */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 1d76841..6211738 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -95,20 +95,23 @@ typedef int (*AcquireSampleRowsFunc) (Relation relation, int elevel,
 											   HeapTuple *rows, int targrows,
 												  double *totalrows,
 												  double *totaldeadrows);
 
 typedef bool (*AnalyzeForeignTable_function) (Relation relation,
 												 AcquireSampleRowsFunc *func,
 													BlockNumber *totalpages);
 
 typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt,
 														   Oid serverOid);
+typedef bool (*ResolvePreparedTransaction_function) (Oid serverOid, Oid user_mapping,
+														int prep_info_len,
+														char *prep_info, int resolution);
 
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
  * function.  It provides pointers to the callback functions needed by the
  * planner and executor.
  *
  * More function pointers are likely to be added in the future.  Therefore
  * it's recommended that the handler initialize the struct with
  * makeNode(FdwRoutine) so that all fields are set to NULL.  This will
  * ensure that no fields are accidentally left undefined.
@@ -143,20 +146,23 @@ typedef struct FdwRoutine
 
 	/* Support functions for EXPLAIN */
 	ExplainForeignScan_function ExplainForeignScan;
 	ExplainForeignModify_function ExplainForeignModify;
 
 	/* Support functions for ANALYZE */
 	AnalyzeForeignTable_function AnalyzeForeignTable;
 
 	/* Support functions for IMPORT FOREIGN SCHEMA */
 	ImportForeignSchema_function ImportForeignSchema;
+
+	/* Support functions for prepared transaction resolution */
+	ResolvePreparedTransaction_function ResolvePreparedTransaction;
 } FdwRoutine;
 
 
 /* Functions in foreign/foreign.c */
 extern FdwRoutine *GetFdwRoutine(Oid fdwhandler);
 extern FdwRoutine *GetFdwRoutineByRelId(Oid relid);
 extern FdwRoutine *GetFdwRoutineForRelation(Relation relation, bool makecopy);
 extern bool IsImportableForeignTable(const char *tablename,
 						 ImportForeignSchemaStmt *stmt);
 
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index e3c2efc..da056b6 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -127,22 +127,23 @@ extern PGDLLIMPORT LWLockPadded *MainLWLockArray;
 #define SerializablePredicateLockListLock	(&MainLWLockArray[30].lock)
 #define OldSerXidLock				(&MainLWLockArray[31].lock)
 #define SyncRepLock					(&MainLWLockArray[32].lock)
 #define BackgroundWorkerLock		(&MainLWLockArray[33].lock)
 #define DynamicSharedMemoryControlLock		(&MainLWLockArray[34].lock)
 #define AutoFileLock				(&MainLWLockArray[35].lock)
 #define ReplicationSlotAllocationLock	(&MainLWLockArray[36].lock)
 #define ReplicationSlotControlLock		(&MainLWLockArray[37].lock)
 #define CommitTsControlLock			(&MainLWLockArray[38].lock)
 #define CommitTsLock				(&MainLWLockArray[39].lock)
+#define FdwXactLock					(&MainLWLockArray[40].lock)
 
-#define NUM_INDIVIDUAL_LWLOCKS		40
+#define NUM_INDIVIDUAL_LWLOCKS		41
 
 /*
  * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
  * here, but we need them to figure out offsets within MainLWLockArray, and
  * having this file include lock.h or bufmgr.h would be backwards.
  */
 
 /* Number of partitions of the shared buffer mapping hashtable */
 #define NUM_BUFFER_PARTITIONS  128
 
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index bc4517d..4783c2b 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1216,11 +1216,14 @@ extern Datum pg_available_extensions(PG_FUNCTION_ARGS);
 extern Datum pg_available_extension_versions(PG_FUNCTION_ARGS);
 extern Datum pg_extension_update_paths(PG_FUNCTION_ARGS);
 extern Datum pg_extension_config_dump(PG_FUNCTION_ARGS);
 
 /* commands/prepare.c */
 extern Datum pg_prepared_statement(PG_FUNCTION_ARGS);
 
 /* utils/mmgr/portalmem.c */
 extern Datum pg_cursor(PG_FUNCTION_ARGS);
 
+/* access/transam/fdw_xact.c */
+extern Datum pg_fdw_xact(PG_FUNCTION_ARGS);
+extern Datum pg_fdw_resolve(PG_FUNCTION_ARGS);
 #endif   /* BUILTINS_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to