On 2015-04-12 22:02:38 +0300, Heikki Linnakangas wrote:
> This needs to be weighed against removing the padding bytes
> altogether.

Hrmpf. Says the person that used a lot of padding, without much
discussion, for the WAL level infrastructure making pg_rewind more
maintainable. And you deemed to be perfectly ok to use them up to avoid
*increasing* the WAL size with the *additional data* (which so far
nothing but pg_rewind needs in that way). While it perfectly well could
have been used to shrink the WAL size to less than it now is. And that's
*far*, *far* harder to back out/refactor changes than this (which are
pretty localized and thus can easily be changed); to the point that I
think it's infeasible to do so...


If you want to shrink the WAL size, send in a patch independently. Not
as a way to block somebody else implementing something.


> I'm surprised there's such a big difference between the "extern" and
> "padding" versions above. At a quick approximation, storing the ID as a
> separate "fragment", along with XLogRecordDataHeaderShort and
> XLogRecordDataHeaderLong, should add one byte of overhead plus the ID
> itself. So that would be 3 extra bytes for 2-byte identifiers, or 5 bytes
> for 4-byte identifiers. Does that mean that the average record length is
> only about 30 bytes?

Yes, nearly. That's  xlogdump --stats=record from the above scenario with
replication identifiers used and reusing the padding:

Type                                           N      (%)          Record size  
    (%)             FPI size      (%)        Combined size      (%)
----                                           -      ---          -----------  
    ---             --------      ---        -------------      ---
Transaction/COMMIT                         50003 ( 16.89)              2600496 
( 23.38)                    0 (  -nan)              2600496 ( 23.38)
CLOG/ZEROPAGE                                  1 (  0.00)                   28 
(  0.00)                    0 (  -nan)                   28 (  0.00)
Standby/RUNNING_XACTS                          5 (  0.00)                  248 
(  0.00)                    0 (  -nan)                  248 (  0.00)
Heap2/CLEAN                                46034 ( 15.55)              1473088 
( 13.24)                    0 (  -nan)              1473088 ( 13.24)
Heap2/VISIBLE                                  2 (  0.00)                   56 
(  0.00)                    0 (  -nan)                   56 (  0.00)
Heap/INSERT                                49682 ( 16.78)              1341414 
( 12.06)                    0 (  -nan)              1341414 ( 12.06)
Heap/HOT_UPDATE                           150013 ( 50.67)              5700494 
( 51.24)                    0 (  -nan)              5700494 ( 51.24)
Heap/INPLACE                                   5 (  0.00)                  130 
(  0.00)                    0 (  -nan)                  130 (  0.00)
Heap/INSERT+INIT                             318 (  0.11)                 8586 
(  0.08)                    0 (  -nan)                 8586 (  0.08)
Btree/VACUUM                                   2 (  0.00)                   56 
(  0.00)                    0 (  -nan)                   56 (  0.00)
                                        --------                      --------  
                    --------                      --------
Total                                     296065                      11124596 
[100.00%]                   0 [0.00%]              11124596 [100%

(The FPI percentage display above is arguably borked. Interesting.)

So the average record size is ~37.5 bytes including the increased commit
record size due to the origin information (which is the part that
increases the size for that version that reuses the padding).

This *most definitely* isn't representative of every workload. But it
*is* *a* common type of workload.

Note that --stats will *not* show the size difference in xlog records
when adding data as an additional chunk vs. padding as it uses
XLogRecGetDataLen() to compute the record length... That confused me for
a while.

> That doesn't sound right, 30 bytes is very little.

Well, it's mostly HOT_UPDATES and INSERTS into not indexed tables. So
that's not too surprising. Obviously that'd look different with FPIs
enabled.

> Perhaps the size
> of the records created by pgbench happen to cross a 8-byte alignment
> boundary at that point, making a big difference. In another workload,
> there might be no difference at all, due to alignment.

Right.

> Also, you don't need to tag every record type with the replication ID. All
> indexam records can skip it, for starters, since logical decoding doesn't
> care about them. That should remove a fair amount of bloat.

Yes. I mentioned that. It's additional complexity because now the
decision has to be made at each xlog insertion callsite. Making
refactoring this into a different representation a bit harder. I don't
think it will make that much of a differenced in the above workload
(just CLEAN will be smaller); but it clearly might in others.

I've attached a rebased patch, that adds decision about origin logging
to the relevant XLogInsert() callsites for "external" 2 byte identifiers
and removes the pad-reusing version in the interest of moving forward. I
still don't see a point in using 4 byte identifiers atm, given the above
numbers that just seems like a waste for unrealistic use cases (>2^16
nodes). It's just two lines to change if we feel the need in the future.

Working on fixing the issue with WAL logging of deletions and
rearranging docs as Petr suggested. Not sure if the latter will really
look good, but I guess we'll see ;)

Greetings,

Andres Freund

-- 
 Andres Freund                     http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
>From 841733fff1394eaafb25272e12cee92d4c94906c Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 9 Apr 2015 15:01:00 +0200
Subject: [PATCH] Introduce replication identifiers: v1.1

Replication identifiers are used to identify nodes in a replication
setup, identify changes that are created due to replication and to keep
track of replication progress.

Primarily this is useful because solving these in other ways is
possible, but ends up being much less efficient and more complicated. We
don't want to require replication solutions to reimplement logic for
this independently. The infrastructure is intended to be generic enough
to be reusable.

This infrastructure replaces the 'nodeid' infrastructure of commit
timestamps. Except that there's only 2^16 identifiers, the
infrastructure provided here integrates with logical replication and is
available via SQL. Since the commit timestamp infrastructure has also
been introduced in 9.5 that's not a problem.

For now the number of nodes whose replication progress can be tracked is
determined by the max_replication_slots GUC. It's not perfect to reuse
that GUC, but there doesn't seem to be sufficient reason to introduce a
separate new one.

Bumps both catversion and wal page magic.

Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer
Reviewed-By: Robert Haas, Heikki Linnakangas, Steve Singer
Discussion: 20150216002155.gi15...@awork2.anarazel.de,
    20140923182422.ga15...@alap3.anarazel.de,
    20131114172632.ge7...@alap2.anarazel.de
---
 contrib/test_decoding/Makefile                     |    3 +-
 contrib/test_decoding/expected/replident.out       |  127 ++
 contrib/test_decoding/sql/replident.sql            |   58 +
 contrib/test_decoding/test_decoding.c              |   28 +
 doc/src/sgml/catalogs.sgml                         |  124 ++
 doc/src/sgml/filelist.sgml                         |    1 +
 doc/src/sgml/func.sgml                             |  162 ++-
 doc/src/sgml/logicaldecoding.sgml                  |   35 +-
 doc/src/sgml/postgres.sgml                         |    1 +
 doc/src/sgml/replication-identifiers.sgml          |   89 ++
 src/backend/access/heap/heapam.c                   |   19 +
 src/backend/access/rmgrdesc/xactdesc.c             |   24 +-
 src/backend/access/transam/commit_ts.c             |   53 +-
 src/backend/access/transam/xact.c                  |   72 +-
 src/backend/access/transam/xlog.c                  |    8 +
 src/backend/access/transam/xloginsert.c            |   32 +-
 src/backend/access/transam/xlogreader.c            |    6 +
 src/backend/catalog/Makefile                       |    2 +-
 src/backend/catalog/catalog.c                      |    8 +-
 src/backend/catalog/system_views.sql               |    7 +
 src/backend/replication/logical/Makefile           |    3 +-
 src/backend/replication/logical/decode.c           |   48 +-
 src/backend/replication/logical/logical.c          |   33 +
 src/backend/replication/logical/reorderbuffer.c    |    5 +-
 .../replication/logical/replication_identifier.c   | 1296 ++++++++++++++++++++
 src/backend/storage/ipc/ipci.c                     |    3 +
 src/backend/utils/cache/syscache.c                 |   23 +
 src/bin/pg_resetxlog/pg_resetxlog.c                |    3 +
 src/include/access/commit_ts.h                     |   14 +-
 src/include/access/xact.h                          |   11 +
 src/include/access/xlog.h                          |    1 +
 src/include/access/xlog_internal.h                 |    2 +-
 src/include/access/xlogdefs.h                      |    6 +
 src/include/access/xloginsert.h                    |    1 +
 src/include/access/xlogreader.h                    |    3 +
 src/include/access/xlogrecord.h                    |    3 +
 src/include/catalog/catversion.h                   |    2 +-
 src/include/catalog/indexing.h                     |    6 +
 src/include/catalog/pg_proc.h                      |   30 +
 src/include/catalog/pg_replication_identifier.h    |   74 ++
 src/include/replication/logical.h                  |    2 +
 src/include/replication/output_plugin.h            |    8 +
 src/include/replication/reorderbuffer.h            |    8 +-
 src/include/replication/replication_identifier.h   |   62 +
 src/include/storage/lwlock.h                       |    3 +-
 src/include/utils/syscache.h                       |    2 +
 src/test/regress/expected/rules.out                |    5 +
 src/test/regress/expected/sanity_check.out         |    1 +
 48 files changed, 2432 insertions(+), 85 deletions(-)
 create mode 100644 contrib/test_decoding/expected/replident.out
 create mode 100644 contrib/test_decoding/sql/replident.sql
 create mode 100644 doc/src/sgml/replication-identifiers.sgml
 create mode 100644 src/backend/replication/logical/replication_identifier.c
 create mode 100644 src/include/catalog/pg_replication_identifier.h
 create mode 100644 src/include/replication/replication_identifier.h

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 438be44..f8334cc 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -37,7 +37,8 @@ submake-isolation:
 submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
-REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared
+REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
+	binary prepared replident
 
 regresscheck: all | submake-regress submake-test_decoding
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/replident.out b/contrib/test_decoding/expected/replident.out
new file mode 100644
index 0000000..f6dc404
--- /dev/null
+++ b/contrib/test_decoding/expected/replident.out
@@ -0,0 +1,127 @@
+-- predictability
+SET synchronous_commit = on;
+CREATE TABLE origin_tbl(id serial primary key, data text);
+CREATE TABLE target_tbl(id serial primary key, data text);
+SELECT pg_replication_identifier_create('test_decoding: regression_slot');
+ pg_replication_identifier_create 
+----------------------------------
+                                1
+(1 row)
+
+-- ensure duplicate creations fail
+SELECT pg_replication_identifier_create('test_decoding: regression_slot');
+ERROR:  duplicate key value violates unique constraint "pg_replication_identifier_riname_index"
+DETAIL:  Key (riname)=(test_decoding: regression_slot) already exists.
+--ensure deletions work (once)
+SELECT pg_replication_identifier_create('test_decoding: temp');
+ pg_replication_identifier_create 
+----------------------------------
+                                2
+(1 row)
+
+SELECT pg_replication_identifier_drop('test_decoding: temp');
+ pg_replication_identifier_drop 
+--------------------------------
+ 
+(1 row)
+
+SELECT pg_replication_identifier_drop('test_decoding: temp');
+ERROR:  cache lookup failed for replication identifier named test_decoding: temp
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- origin tx
+INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+-- as is normal, the insert into target_tbl shows up
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                                                                    data                                                                                    
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN'
+ table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again'''
+ table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT'
+ COMMIT
+(5 rows)
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
+-- mark session as replaying
+SELECT pg_replication_identifier_setup_replaying_from('test_decoding: regression_slot');
+ pg_replication_identifier_setup_replaying_from 
+------------------------------------------------
+ 
+(1 row)
+
+-- ensure we prevent duplicate setup
+SELECT pg_replication_identifier_setup_replaying_from('test_decoding: regression_slot');
+ERROR:  cannot setup replication origin when one is already setup
+BEGIN;
+-- setup transaction origins
+SELECT pg_replication_identifier_setup_tx_origin('0/ffffffff', '2013-01-01 00:00');
+ pg_replication_identifier_setup_tx_origin 
+-------------------------------------------
+ 
+(1 row)
+
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+COMMIT;
+SELECT pg_replication_identifier_reset_replaying_from();
+ pg_replication_identifier_reset_replaying_from 
+------------------------------------------------
+ 
+(1 row)
+
+SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_identifier_progress;
+ local_id |          external_id           | remote_lsn | ?column? 
+----------+--------------------------------+------------+----------
+        1 | test_decoding: regression_slot | 0/FFFFFFFF | t
+(1 row)
+
+SELECT pg_replication_identifier_progress('test_decoding: regression_slot', false);
+ pg_replication_identifier_progress 
+------------------------------------
+ 0/FFFFFFFF
+(1 row)
+
+SELECT pg_replication_identifier_progress('test_decoding: regression_slot', true);
+ pg_replication_identifier_progress 
+------------------------------------
+ 0/FFFFFFFF
+(1 row)
+
+-- ensure reset requires previously setup state
+SELECT pg_replication_identifier_reset_replaying_from();
+ERROR:  no replication identifier is set up
+-- and magically the replayed xact will be filtered!
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+ data 
+------
+(0 rows)
+
+--but new original changes still show up
+INSERT INTO origin_tbl(data) VALUES ('will be replicated');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1',  'only-local', '1');
+                                      data                                      
+--------------------------------------------------------------------------------
+ BEGIN
+ table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated'
+ COMMIT
+(3 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_replication_identifier_drop('test_decoding: regression_slot');
+ pg_replication_identifier_drop 
+--------------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/replident.sql b/contrib/test_decoding/sql/replident.sql
new file mode 100644
index 0000000..d5ba486
--- /dev/null
+++ b/contrib/test_decoding/sql/replident.sql
@@ -0,0 +1,58 @@
+-- predictability
+SET synchronous_commit = on;
+
+CREATE TABLE origin_tbl(id serial primary key, data text);
+CREATE TABLE target_tbl(id serial primary key, data text);
+
+SELECT pg_replication_identifier_create('test_decoding: regression_slot');
+-- ensure duplicate creations fail
+SELECT pg_replication_identifier_create('test_decoding: regression_slot');
+
+--ensure deletions work (once)
+SELECT pg_replication_identifier_create('test_decoding: temp');
+SELECT pg_replication_identifier_drop('test_decoding: temp');
+SELECT pg_replication_identifier_drop('test_decoding: temp');
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- origin tx
+INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- as is normal, the insert into target_tbl shows up
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
+
+-- mark session as replaying
+SELECT pg_replication_identifier_setup_replaying_from('test_decoding: regression_slot');
+
+-- ensure we prevent duplicate setup
+SELECT pg_replication_identifier_setup_replaying_from('test_decoding: regression_slot');
+
+BEGIN;
+-- setup transaction origins
+SELECT pg_replication_identifier_setup_tx_origin('0/ffffffff', '2013-01-01 00:00');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+COMMIT;
+
+SELECT pg_replication_identifier_reset_replaying_from();
+
+SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_identifier_progress;
+SELECT pg_replication_identifier_progress('test_decoding: regression_slot', false);
+SELECT pg_replication_identifier_progress('test_decoding: regression_slot', true);
+
+-- ensure reset requires previously setup state
+SELECT pg_replication_identifier_reset_replaying_from();
+
+-- and magically the replayed xact will be filtered!
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+
+--but new original changes still show up
+INSERT INTO origin_tbl(data) VALUES ('will be replicated');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1',  'only-local', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_replication_identifier_drop('test_decoding: regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 963d5df..2ec3001 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -21,6 +21,7 @@
 
 #include "replication/output_plugin.h"
 #include "replication/logical.h"
+#include "replication/replication_identifier.h"
 
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -43,6 +44,7 @@ typedef struct
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
+	bool		only_local;
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -59,6 +61,8 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
 static void pg_decode_change(LogicalDecodingContext *ctx,
 				 ReorderBufferTXN *txn, Relation rel,
 				 ReorderBufferChange *change);
+static bool pg_decode_filter(LogicalDecodingContext *ctx,
+							 RepNodeId origin_id);
 
 void
 _PG_init(void)
@@ -76,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->begin_cb = pg_decode_begin_txn;
 	cb->change_cb = pg_decode_change;
 	cb->commit_cb = pg_decode_commit_txn;
+	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 }
 
@@ -97,6 +102,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_xids = true;
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
+	data->only_local = false;
 
 	ctx->output_plugin_private = data;
 
@@ -155,6 +161,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
 						 strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "only-local") == 0)
+		{
+
+			if (elem->arg == NULL)
+				data->only_local = true;
+			else if (!parse_bool(strVal(elem->arg), &data->only_local))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -223,6 +240,17 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+static bool
+pg_decode_filter(LogicalDecodingContext *ctx,
+				 RepNodeId origin_id)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->only_local && origin_id != InvalidRepNodeId)
+		return true;
+	return false;
+}
+
 /*
  * Print literal `outputstr' already represented as string of type `typid'
  * into stringbuf `s'.
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index d0b78f2..f5ee567 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -239,6 +239,16 @@
      </row>
 
      <row>
+      <entry><link linkend="catalog-pg-replication-identifier"><structname>pg_replication_identifier</structname></link></entry>
+      <entry>registered replication identifiers</entry>
+     </row>
+
+     <row>
+      <entry><link linkend="catalog-pg-replication-identifier-progress"><structname>pg_replication_identifier_progress</structname></link></entry>
+      <entry>information about logical replication progress</entry>
+     </row>
+
+     <row>
       <entry><link linkend="catalog-pg-replication-slots"><structname>pg_replication_slots</structname></link></entry>
       <entry>replication slot information</entry>
      </row>
@@ -5323,6 +5333,120 @@
 
  </sect1>
 
+ <sect1 id="catalog-pg-replication-identifier">
+  <title><structname>pg_replication_identifier</structname></title>
+
+  <indexterm zone="catalog-pg-replication-identifier">
+   <primary>pg_replication_identifier</primary>
+  </indexterm>
+
+  <para>
+   The <structname>pg_replication_identifier</structname> catalog
+   contains all replication identifiers created.  For more on
+   replication identifiers
+   see <xref linkend="replication-identifiers">.
+  </para>
+
+  <table>
+
+   <title><structname>pg_replication_identifier</structname> Columns</title>
+
+   <tgroup cols="4">
+    <thead>
+     <row>
+      <entry>Name</entry>
+      <entry>Type</entry>
+      <entry>References</entry>
+      <entry>Description</entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry><structfield>riident</structfield></entry>
+      <entry><type>Oid</type></entry>
+      <entry></entry>
+      <entry>A unique, cluster-wide identifier for the replication
+      identifier. Should never leave the system.</entry>
+     </row>
+
+     <row>
+      <entry><structfield>riname</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+      <entry>The external, user defined, name of a replication
+      identifier.</entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+ </sect1>
+
+  <sect1 id="catalog-pg-replication-identifier-progress">
+  <title><structname>pg_replication_identifier_progress</structname></title>
+
+  <indexterm zone="catalog-pg-replication-identifier-progress">
+   <primary>pg_replication_identifier_progress</primary>
+  </indexterm>
+
+  <para>
+   The <structname>pg_replication_identifier_progress</structname>
+   view contains information about how far replication for a certain
+   replication identifier has progressed.  For more on replication
+   identifiers see <xref linkend="replication-identifiers">.
+  </para>
+
+  <table>
+
+   <title><structname>pg_replication_identifier_progress</structname> Columns</title>
+
+   <tgroup cols="4">
+    <thead>
+     <row>
+      <entry>Name</entry>
+      <entry>Type</entry>
+      <entry>References</entry>
+      <entry>Description</entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry><structfield>local_id</structfield></entry>
+      <entry><type>Oid</type></entry>
+      <entry><literal><link linkend="catalog-pg-replication-identifier"><structname>pg_replication_identifier</structname></link>.riident</literal></entry>
+      <entry>internal node identifier</entry>
+     </row>
+
+     <row>
+      <entry><structfield>external_id</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry><literal><link linkend="catalog-pg-replication-identifier"><structname>pg_replication_identifier</structname></link>.riname</literal></entry>
+      <entry>external node identifier</entry>
+     </row>
+
+     <row>
+      <entry><structfield>remote_lsn</structfield></entry>
+      <entry><type>pg_lsn</type></entry>
+      <entry></entry>
+      <entry>The origin node's LSN up to which data has been replicated.</entry>
+     </row>
+
+
+     <row>
+      <entry><structfield>local_lsn</structfield></entry>
+      <entry><type>pg_lsn</type></entry>
+      <entry></entry>
+      <entry>This node's LSN that at
+      which <literal>remote_lsn</literal> has been replicated. Used to
+      flush commit records before persisting data to disk when using
+      asynchronous commits.</entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+ </sect1>
+
  <sect1 id="catalog-pg-replication-slots">
   <title><structname>pg_replication_slots</structname></title>
 
diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml
index 2d7514c..00cc456 100644
--- a/doc/src/sgml/filelist.sgml
+++ b/doc/src/sgml/filelist.sgml
@@ -95,6 +95,7 @@
 <!ENTITY fdwhandler SYSTEM "fdwhandler.sgml">
 <!ENTITY custom-scan SYSTEM "custom-scan.sgml">
 <!ENTITY logicaldecoding SYSTEM "logicaldecoding.sgml">
+<!ENTITY replication-identifiers SYSTEM "replication-identifiers.sgml">
 <!ENTITY protocol   SYSTEM "protocol.sgml">
 <!ENTITY sources    SYSTEM "sources.sgml">
 <!ENTITY storage    SYSTEM "storage.sgml">
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 5f7bf6a..8cce9a3 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -16876,9 +16876,10 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
    <para>
     The functions shown in <xref linkend="functions-replication-table"> are
     for controlling and interacting with replication features.
-    See <xref linkend="streaming-replication">
-    and <xref linkend="streaming-replication-slots"> for information about the
-    underlying features.  Use of these functions is restricted to superusers.
+    See <xref linkend="streaming-replication">,
+    <xref linkend="streaming-replication-slots">, <xref linkend="replication-identifiers">
+    for information about the underlying features.  Use of these
+    functions is restricted to superusers.
    </para>
 
    <para>
@@ -17035,6 +17036,161 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         on future calls.
        </entry>
       </row>
+
+      <row id="replication-identifier-create">
+       <entry>
+        <indexterm>
+         <primary>pg_replication_identifier_create</primary>
+        </indexterm>
+        <literal><function>pg_replication_identifier_create(<parameter>node_name</parameter> <type>text</type>)</function></literal>
+       </entry>
+       <entry>
+        <parameter>internal_id</parameter> <type>oid</type>
+       </entry>
+       <entry>
+        Create a replication identifier based on the passed in
+        external name, and create an internal id for it.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_replication_identifier_get</primary>
+        </indexterm>
+        <literal><function>pg_replication_identifier_get(<parameter>node_name</parameter> <type>text</type>)</function></literal>
+       </entry>
+       <entry>
+        <parameter>internal_id</parameter> <type>oid</type>
+       </entry>
+       <entry>
+        Lookup replication identifier and return the internal id. If
+        no replication identifier is found a error is thrown.
+       </entry>
+      </row>
+
+      <row id="replication-identifier-setup-replaying-from">
+       <entry>
+        <indexterm>
+         <primary>pg_replication_identifier_setup_replaying_from</primary>
+        </indexterm>
+        <literal><function>pg_replication_identifier_setup_replaying_from(<parameter>node_name</parameter> <type>text</type>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Signal that the current session is replaying from the passed
+        in node. This will mark changes and transactions emitted by
+        session to be marked as originating from that node. Normal
+        operation can be resumed using
+        <function>pg_replication_identifier_reset_replaying_from</function>. Can
+        only be used if no previous origin is configured.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_replication_identifier_reset_replaying_from</primary>
+        </indexterm>
+        <literal><function>pg_replication_identifier_reset_replaying_from(<parameter>node_name</parameter> <type>text</type>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Teardown configured replication identifier setup by
+        <function>pg_replication_identifier_setup_replaying_from</function>.
+       </entry>
+      </row>
+
+      <row id="replication-identifier-setup-tx-origin">
+       <entry>
+        <indexterm>
+         <primary>pg_replication_identifier_setup_tx_origin</primary>
+        </indexterm>
+        <literal><function>pg_replication_identifier_setup_tx_origin(<parameter>origin_lsn</parameter> <type>pg_lsn</type>, <parameter>origin_timestamp</parameter> <type>timestamptz</type>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Mark the current transaction to be replication a transaction
+        that has committed at the passed in <acronym>LSN</acronym> and
+        timestamp. Can only be called when a replication origin has
+        previously been configured using
+        <function>pg_replication_identifier_setup_replaying_from</function>.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_replication_identifier_is_replaying</primary>
+        </indexterm>
+        <literal><function>pg_replication_identifier_is_replaying()</function></literal>
+       </entry>
+       <entry>
+        bool
+       </entry>
+       <entry>
+        Has a replication identifer been setup in the current session?
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_replication_identifier_advance</primary>
+        </indexterm>
+        <literal>pg_replication_identifier_advance<function>(<parameter>node_name</parameter> <type>text</type>, <parameter>pos</parameter> <type>pg_lsn</type>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Set replication progress for the passed in node to the passed
+        in position. This primarily is useful for setting up the
+        initial position or a new position after configuration changes
+        and similar. Be aware that careless use of this function can
+        lead to inconsistently replicated data.
+       </entry>
+      </row>
+
+      <row id="replication-identifier-drop">
+       <entry>
+        <indexterm>
+         <primary>pg_replication_identifier_drop</primary>
+        </indexterm>
+        <literal><function>pg_replication_identifier_drop(<parameter>node_name</parameter> <type>text</type>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Delete a previously created replication identifier.
+       </entry>
+      </row>
+
+      <row id="replication-identifier-progress">
+       <entry>
+        <indexterm>
+         <primary>pg_replication_identifier_progress</primary>
+        </indexterm>
+        <literal><function>pg_replication_identifier_progress(<parameter>node_name</parameter> <type>text</type>, <parameter>flush</parameter> <type>bool</type>)</function></literal>
+       </entry>
+       <entry>
+        pg_lsn
+       </entry>
+       <entry>
+        Return the replay position for the passed in replication
+        identifier. The parameter <parameter>flush</parameter>
+        determines whether the corresponding local transaction will be
+        guaranteed to have been flushed to disk or not.
+       </entry>
+      </row>
+
      </tbody>
     </tgroup>
    </table>
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 3650567..c84a1769 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -363,6 +363,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeBeginCB begin_cb;
     LogicalDecodeChangeCB change_cb;
     LogicalDecodeCommitCB commit_cb;
+    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
 
@@ -370,7 +371,8 @@ typedef void (*LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb);
 </programlisting>
      The <function>begin_cb</function>, <function>change_cb</function>
      and <function>commit_cb</function> callbacks are required,
-     while <function>startup_cb</function>
+     while <function>startup_cb</function>,
+     <function>filter_by_origin_cb</function>
      and <function>shutdown_cb</function> are optional.
     </para>
    </sect2>
@@ -569,6 +571,37 @@ typedef void (*LogicalDecodeChangeCB) (
       </para>
      </note>
     </sect3>
+
+     <sect3 id="logicaldecoding-output-plugin-filter-by-origin">
+     <title>Origin Filter Callback</title>
+
+     <para>
+       The optional <function>filter_by_origin_cb</function> callback
+       is called to determine wheter data that has been replayed
+       from <parameter>origin_id</parameter> is of interest to the
+       output plugin.
+<programlisting>
+typedef bool (*LogicalDecodeChangeCB) (
+    struct LogicalDecodingContext *ctx,
+    RepNodeId origin_id
+);
+</programlisting>
+      The <parameter>ctx</parameter> parameter has the same contents
+      as for the other callbacks. No information but the origin is
+      available. To signal that changes originating on the passed in
+      node are irrelevant, return true, causing them to be filtered
+      away; false otherwise. The other callbacks will not be called
+      for transactions and changes that have been filtered away.
+     </para>
+     <para>
+       This is useful when implementing cascading or multi directional
+       replication solutions. Filtering by the origin allows to
+       prevent replicating the same changes back and forth in such
+       setups.  While transactions and changes also carry information
+       about the origin, filtering via this callback is noticeably
+       more efficient.
+     </para>
+     </sect3>
    </sect2>
 
    <sect2 id="logicaldecoding-output-plugin-output">
diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml
index e378d69..5e2eacb 100644
--- a/doc/src/sgml/postgres.sgml
+++ b/doc/src/sgml/postgres.sgml
@@ -220,6 +220,7 @@
   &spi;
   &bgworker;
   &logicaldecoding;
+  &replication-identifiers;
 
  </part>
 
diff --git a/doc/src/sgml/replication-identifiers.sgml b/doc/src/sgml/replication-identifiers.sgml
new file mode 100644
index 0000000..707a4e5
--- /dev/null
+++ b/doc/src/sgml/replication-identifiers.sgml
@@ -0,0 +1,89 @@
+<!-- doc/src/sgml/replication-identifiers.sgml -->
+<chapter id="replication-identifiers">
+ <title>Replication Identifiers</title>
+ <indexterm zone="replication-identifiers">
+  <primary>Replication Identifiers</primary>
+ </indexterm>
+
+ <para>
+  Replication identifiers are intended to make it easier to implement
+  logical replication solutions on top
+  of <xref linkend="logicaldecoding">. They provide a solution to two
+  common problems:
+  <itemizedlist>
+   <listitem><para>How to safely keep track of replication progress</para></listitem>
+   <listitem><para>How to change replication behavior, based on the
+   origin of a row; e.g. to avoid loops in bi-directional replication
+   setups</para></listitem>
+  </itemizedlist>
+ </para>
+
+ <para>
+  Replication identifiers consist out of a external name, and a
+  internal identifier. The external identifier is free-form. It should
+  be used in a way that makes conflicts between replication
+  identifiers created by different replication solutions unlikely;
+  e.g. by prefixing the replication solution's name.  The internal
+  identifier is used only to avoid having to store the long version in
+  situations where space efficiency is important. It should never be
+  shared between systems.
+ </para>
+
+ <para>
+  Replication identifiers can be created using the
+  <link linkend="replication-identifier-create"><function>pg_replication_identifier_create()</function></link>;
+  dropped using
+  <link linkend="replication-identifier-drop"><function>pg_replication_identifier_drop()</function></link>;
+  and seen in the
+  <link linkend="catalog-pg-replication-identifier"><structname>pg_replication_identifier</structname></link>
+  catalog.
+ </para>
+
+ <para>
+  When replicating from one system to another (independent of the fact
+  that those two might be in the same cluster, or even same database)
+  one nontrivial part of building a replication solution is to keep
+  track of replication progress. When the applying process or the
+  whole cluster dies, it needs to be able to find out up to where data
+  has successfully been replicated. Naive solutions to this like
+  updating a row in a table for every replayed transaction have
+  problems like bloat.
+ </para>
+
+ <para>
+  Using the replication identifier infrastructure a session can be
+  marked as replaying from a remote node (using the
+  <link linkend="replication-identifier-setup-replaying-from"><function>pg_replication_identifier_setup_replaying_from()</function></link>
+  function. Additionally the <acronym>LSN</acronym> and commit
+  timestamp of every source transaction can be configured on a per
+  transaction basis using
+  <link linkend="replication-identifier-setup-tx-origin"><function>pg_replication_identifier_setup_tx_origin()</function></link>.
+  If that's done replication progress will be persist in a crash safe
+  manner. Replication progress for all replication identifiers can be
+  seen in the
+  <link linkend="catalog-pg-replication-identifier-progress">
+   <structname>pg_replication_progress</structname>
+  </link> view. A individual identifier's progress, e.g. when resuming
+  replication, can be acquired using
+  <link linkend="replication-identifier-progress"><function>pg_replication_identifier_progress()</function></link>
+ </para>
+
+ <para>
+  In more complex replication topologies than replication from exactly
+  one system to one other another problem can be that it's hard to
+  avoid replicating replicated rows again. That can lead both to
+  cycles in the replication and inefficiencies. Replication
+  identifiers provide a, optional, mechanism to recognize and prevent
+  that. When setup using the functions referenced in the previous
+  paragraph every change and transaction passed to output plugin
+  callbacks (see <xref linkend="logicaldecoding-output-plugin">)
+  generated by the session is tagged with the replication identifier
+  of the generating session.  This allows to treat them differently in
+  the output plugin, e.g. ignoring all but locally originating rows.
+  Additionally the <link linkend="logicaldecoding-output-plugin-filter-by-origin">
+  <function>filter_by_origin_cb</function></link> callback can be used
+  to filter the logical decoding change stream based on the
+  source. While less flexible, filtering via that callback is
+  considerably more efficient.
+ </para>
+</chapter>
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 457cd70..b504ccd 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2189,6 +2189,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 							(char *) heaptup->t_data + SizeofHeapTupleHeader,
 							heaptup->t_len - SizeofHeapTupleHeader);
 
+		/* filtering by origin on a row level is much more efficient */
+		XLogIncludeOrigin();
+
 		recptr = XLogInsert(RM_HEAP_ID, info);
 
 		PageSetLSN(page, recptr);
@@ -2499,6 +2502,10 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 			XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
 
 			XLogRegisterBufData(0, tupledata, totaldatalen);
+
+			/* filtering by origin on a row level is much more efficient */
+			XLogIncludeOrigin();
+
 			recptr = XLogInsert(RM_HEAP2_ID, info);
 
 			PageSetLSN(page, recptr);
@@ -2920,6 +2927,9 @@ l1:
 							 - SizeofHeapTupleHeader);
 		}
 
+		/* filtering by origin on a row level is much more efficient */
+		XLogIncludeOrigin();
+
 		recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE);
 
 		PageSetLSN(page, recptr);
@@ -4650,6 +4660,8 @@ failed:
 											  tuple->t_data->t_infomask2);
 		XLogRegisterData((char *) &xlrec, SizeOfHeapLock);
 
+		/* we don't decode row locks atm, so no need to log the origin */
+
 		recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);
 
 		PageSetLSN(page, recptr);
@@ -5429,6 +5441,8 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
 		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
 		XLogRegisterBufData(0, (char *) htup + htup->t_hoff, newlen);
 
+		/* inplace updates aren't decoded atm, don't log the origin */
+
 		recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE);
 
 		PageSetLSN(page, recptr);
@@ -6787,6 +6801,9 @@ log_heap_update(Relation reln, Buffer oldbuf,
 						 old_key_tuple->t_len - SizeofHeapTupleHeader);
 	}
 
+	/* filtering by origin on a row level is much more efficient */
+	XLogIncludeOrigin();
+
 	recptr = XLogInsert(RM_HEAP_ID, info);
 
 	return recptr;
@@ -6860,6 +6877,8 @@ log_heap_new_cid(Relation relation, HeapTuple tup)
 	XLogBeginInsert();
 	XLogRegisterData((char *) &xlrec, SizeOfHeapNewCid);
 
+	/* will be looked at irrespective of origin */
+
 	recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID);
 
 	return recptr;
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index b036b6d..4df0bce 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -101,6 +101,16 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 
 		data += sizeof(xl_xact_twophase);
 	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		xl_xact_origin *xl_origin = (xl_xact_origin *) data;
+
+		parsed->origin_lsn = xl_origin->origin_lsn;
+		parsed->origin_timestamp = xl_origin->origin_timestamp;
+
+		data += sizeof(xl_xact_origin);
+	}
 }
 
 void
@@ -156,7 +166,7 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 }
 
 static void
-xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
+xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepNodeId origin_id)
 {
 	xl_xact_parsed_commit parsed;
 	int			i;
@@ -218,6 +228,15 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
 
 	if (XactCompletionForceSyncCommit(parsed.xinfo))
 		appendStringInfo(buf, "; sync");
+
+	if (parsed.xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		appendStringInfo(buf, "; origin: node %u, lsn %X/%X, at %s",
+						 origin_id,
+						 (uint32)(parsed.origin_lsn >> 32),
+						 (uint32)parsed.origin_lsn,
+						 timestamptz_to_str(parsed.origin_timestamp));
+	}
 }
 
 static void
@@ -274,7 +293,8 @@ xact_desc(StringInfo buf, XLogReaderState *record)
 	{
 		xl_xact_commit *xlrec = (xl_xact_commit *) rec;
 
-		xact_desc_commit(buf, XLogRecGetInfo(record), xlrec);
+		xact_desc_commit(buf, XLogRecGetInfo(record), xlrec,
+						 XLogRecGetOrigin(record));
 	}
 	else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
 	{
diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c
index dc23ab2..ffc3466 100644
--- a/src/backend/access/transam/commit_ts.c
+++ b/src/backend/access/transam/commit_ts.c
@@ -49,18 +49,18 @@
  */
 
 /*
- * We need 8+4 bytes per xact.  Note that enlarging this struct might mean
+ * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
  * the largest possible file name is more than 5 chars long; see
  * SlruScanDirectory.
  */
 typedef struct CommitTimestampEntry
 {
 	TimestampTz		time;
-	CommitTsNodeId	nodeid;
+	RepNodeId		nodeid;
 } CommitTimestampEntry;
 
 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
-									sizeof(CommitTsNodeId))
+									sizeof(RepNodeId))
 
 #define COMMIT_TS_XACTS_PER_PAGE \
 	(BLCKSZ / SizeOfCommitTimestampEntry)
@@ -93,43 +93,18 @@ CommitTimestampShared	*commitTsShared;
 /* GUC variable */
 bool	track_commit_timestamp;
 
-static CommitTsNodeId default_node_id = InvalidCommitTsNodeId;
-
 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
 					 TransactionId *subxids, TimestampTz ts,
-					 CommitTsNodeId nodeid, int pageno);
+					 RepNodeId nodeid, int pageno);
 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
-						  CommitTsNodeId nodeid, int slotno);
+						  RepNodeId nodeid, int slotno);
 static int	ZeroCommitTsPage(int pageno, bool writeXlog);
 static bool CommitTsPagePrecedes(int page1, int page2);
 static void WriteZeroPageXlogRec(int pageno);
 static void WriteTruncateXlogRec(int pageno);
 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
 						 TransactionId *subxids, TimestampTz timestamp,
-						 CommitTsNodeId nodeid);
-
-
-/*
- * CommitTsSetDefaultNodeId
- *
- * Set default nodeid for current backend.
- */
-void
-CommitTsSetDefaultNodeId(CommitTsNodeId nodeid)
-{
-	default_node_id = nodeid;
-}
-
-/*
- * CommitTsGetDefaultNodeId
- *
- * Set default nodeid for current backend.
- */
-CommitTsNodeId
-CommitTsGetDefaultNodeId(void)
-{
-	return default_node_id;
-}
+						 RepNodeId nodeid);
 
 /*
  * TransactionTreeSetCommitTsData
@@ -156,7 +131,7 @@ CommitTsGetDefaultNodeId(void)
 void
 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
 							   TransactionId *subxids, TimestampTz timestamp,
-							   CommitTsNodeId nodeid, bool do_xlog)
+							   RepNodeId nodeid, bool do_xlog)
 {
 	int			i;
 	TransactionId headxid;
@@ -234,7 +209,7 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
 static void
 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
 					 TransactionId *subxids, TimestampTz ts,
-					 CommitTsNodeId nodeid, int pageno)
+					 RepNodeId nodeid, int pageno)
 {
 	int			slotno;
 	int			i;
@@ -259,7 +234,7 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
  */
 static void
 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
-						 CommitTsNodeId nodeid, int slotno)
+						 RepNodeId nodeid, int slotno)
 {
 	int			entryno = TransactionIdToCTsEntry(xid);
 	CommitTimestampEntry entry;
@@ -282,7 +257,7 @@ TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
  */
 bool
 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
-							 CommitTsNodeId *nodeid)
+							 RepNodeId *nodeid)
 {
 	int			pageno = TransactionIdToCTsPage(xid);
 	int			entryno = TransactionIdToCTsEntry(xid);
@@ -322,7 +297,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
 		if (ts)
 			*ts = 0;
 		if (nodeid)
-			*nodeid = InvalidCommitTsNodeId;
+			*nodeid = InvalidRepNodeId;
 		return false;
 	}
 
@@ -373,7 +348,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
  * as NULL if not wanted.
  */
 TransactionId
-GetLatestCommitTsData(TimestampTz *ts, CommitTsNodeId *nodeid)
+GetLatestCommitTsData(TimestampTz *ts, RepNodeId *nodeid)
 {
 	TransactionId	xid;
 
@@ -503,7 +478,7 @@ CommitTsShmemInit(void)
 
 		commitTsShared->xidLastCommit = InvalidTransactionId;
 		TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
-		commitTsShared->dataLastCommit.nodeid = InvalidCommitTsNodeId;
+		commitTsShared->dataLastCommit.nodeid = InvalidRepNodeId;
 	}
 	else
 		Assert(found);
@@ -857,7 +832,7 @@ WriteTruncateXlogRec(int pageno)
 static void
 WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
 						 TransactionId *subxids, TimestampTz timestamp,
-						 CommitTsNodeId nodeid)
+						 RepNodeId nodeid)
 {
 	xl_commit_ts_set	record;
 
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 1495bb4..a9c5a73 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -40,8 +40,10 @@
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/logical.h"
 #include "replication/walsender.h"
 #include "replication/syncrep.h"
+#include "replication/replication_identifier.h"
 #include "storage/fd.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
@@ -1073,21 +1075,22 @@ RecordTransactionCommit(void)
 							nmsgs, invalMessages,
 							RelcacheInitFileInval, forceSyncCommit,
 							InvalidTransactionId /* plain commit */);
-	}
 
-	/*
-	 * We only need to log the commit timestamp separately if the node
-	 * identifier is a valid value; the commit record above already contains
-	 * the timestamp info otherwise, and will be used to load it.
-	 */
-	if (markXidCommitted)
-	{
-		CommitTsNodeId		node_id;
+		/* record plain commit ts if not replaying remote actions */
+		if (replication_origin_id == InvalidRepNodeId ||
+			replication_origin_id == DoNotReplicateRepNodeId)
+			replication_origin_timestamp = xactStopTimestamp;
+		else
+			AdvanceCachedReplicationIdentifier(replication_origin_lsn,
+											   XactLastRecEnd);
 
-		node_id = CommitTsGetDefaultNodeId();
+		/*
+		 * We don't need to WAL log here, the commit record contains all the
+		 * necessary information and will redo the SET action during replay.
+		 */
 		TransactionTreeSetCommitTsData(xid, nchildren, children,
-									   xactStopTimestamp,
-									   node_id, node_id != InvalidCommitTsNodeId);
+									   replication_origin_timestamp,
+									   replication_origin_id, false);
 	}
 
 	/*
@@ -1176,9 +1179,11 @@ RecordTransactionCommit(void)
 	if (wrote_xlog && markXidCommitted)
 		SyncRepWaitForLSN(XactLastRecEnd);
 
+	/* remember end of last commit record */
+	XactLastCommitEnd = XactLastRecEnd;
+
 	/* Reset XactLastRecEnd until the next transaction writes something */
 	XactLastRecEnd = 0;
-
 cleanup:
 	/* Clean up local data */
 	if (rels)
@@ -4611,6 +4616,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_relfilenodes xl_relfilenodes;
 	xl_xact_invals		xl_invals;
 	xl_xact_twophase	xl_twophase;
+	xl_xact_origin		xl_origin;
 
 	uint8				info;
 
@@ -4668,6 +4674,15 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_twophase.xid = twophase_xid;
 	}
 
+	/* dump transaction origin information */
+	if (replication_origin_id != InvalidRepNodeId)
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+
+		xl_origin.origin_lsn = replication_origin_lsn;
+		xl_origin.origin_timestamp = replication_origin_timestamp;
+	}
+
 	if (xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
@@ -4709,6 +4724,12 @@ XactLogCommitRecord(TimestampTz commit_time,
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
 		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+
+	/* we allow filtering by xacts */
+	XLogIncludeOrigin();
+
 	return XLogInsert(RM_XACT_ID, info);
 }
 
@@ -4806,10 +4827,12 @@ XactLogAbortRecord(TimestampTz abort_time,
 static void
 xact_redo_commit(xl_xact_parsed_commit *parsed,
 				 TransactionId xid,
-				 XLogRecPtr lsn)
+				 XLogRecPtr lsn,
+				 RepNodeId origin_id)
 {
 	TransactionId max_xid;
 	int			i;
+	TimestampTz	commit_time;
 
 	max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
 
@@ -4829,9 +4852,16 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 		LWLockRelease(XidGenLock);
 	}
 
+	Assert(!!(parsed->xinfo & XACT_XINFO_HAS_ORIGIN) == (origin_id != InvalidRepNodeId));
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+		commit_time = parsed->origin_timestamp;
+	else
+		commit_time = parsed->xact_time;
+
 	/* Set the transaction commit timestamp and metadata */
 	TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts,
-								   parsed->xact_time, InvalidCommitTsNodeId,
+								   commit_time, origin_id,
 								   false);
 
 	if (standbyState == STANDBY_DISABLED)
@@ -4892,6 +4922,14 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 		StandbyReleaseLockTree(xid, 0, NULL);
 	}
 
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		/* recover apply progress */
+		AdvanceReplicationIdentifier(origin_id,
+									 parsed->origin_lsn,
+									 lsn);
+	}
+
 	/* Make sure files supposed to be dropped are dropped */
 	if (parsed->nrels > 0)
 	{
@@ -5047,13 +5085,13 @@ xact_redo(XLogReaderState *record)
 		{
 			Assert(!TransactionIdIsValid(parsed.twophase_xid));
 			xact_redo_commit(&parsed, XLogRecGetXid(record),
-							 record->EndRecPtr);
+							 record->EndRecPtr, XLogRecGetOrigin(record));
 		}
 		else
 		{
 			Assert(TransactionIdIsValid(parsed.twophase_xid));
 			xact_redo_commit(&parsed, parsed.twophase_xid,
-							 record->EndRecPtr);
+							 record->EndRecPtr, XLogRecGetOrigin(record));
 			RemoveTwoPhaseFile(parsed.twophase_xid, false);
 		}
 	}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2580996..10fab1e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -44,6 +44,7 @@
 #include "postmaster/startup.h"
 #include "replication/logical.h"
 #include "replication/slot.h"
+#include "replication/replication_identifier.h"
 #include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
@@ -295,6 +296,7 @@ static TimeLineID curFileTLI;
 static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
 
 XLogRecPtr	XactLastRecEnd = InvalidXLogRecPtr;
+XLogRecPtr	XactLastCommitEnd = InvalidXLogRecPtr;
 
 /*
  * RedoRecPtr is this backend's local copy of the REDO record pointer
@@ -6212,6 +6214,11 @@ StartupXLOG(void)
 	StartupMultiXact();
 
 	/*
+	 * Recover knowledge about replay progress of known replication partners.
+	 */
+	StartupReplicationIdentifier();
+
+	/*
 	 * Initialize unlogged LSN. On a clean shutdown, it's restored from the
 	 * control file. On recovery, all unlogged relations are blown away, so
 	 * the unlogged LSN counter can be reset too.
@@ -8394,6 +8401,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 	CheckPointSnapBuild();
 	CheckPointLogicalRewriteHeap();
 	CheckPointBuffers(flags);	/* performs all required fsyncs */
+	CheckPointReplicationIdentifier();
 	/* We deliberately delay 2PC checkpointing as long as possible */
 	CheckPointTwoPhase(checkPointRedo);
 }
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 618f879..cf56124 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -26,6 +26,7 @@
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
 #include "miscadmin.h"
+#include "replication/replication_identifier.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
 #include "utils/memutils.h"
@@ -72,6 +73,9 @@ static XLogRecData *mainrdata_head;
 static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
 static uint32 mainrdata_len;	/* total # of bytes in chain */
 
+/* Should te in-progress insertion log the origin */
+static bool include_origin = false;
+
 /*
  * These are used to hold the record header while constructing a record.
  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
@@ -83,10 +87,12 @@ static uint32 mainrdata_len;	/* total # of bytes in chain */
 static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
+#define SizeOfXlogOrigin	(sizeof(RepNodeId) + sizeof(char))
+
 #define HEADER_SCRATCH_SIZE \
 	(SizeOfXLogRecord + \
 	 MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
-	 SizeOfXLogRecordDataHeaderLong)
+	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
 
 /*
  * An array of XLogRecData structs, to hold registered data.
@@ -193,6 +199,7 @@ XLogResetInsertion(void)
 	max_registered_block_id = 0;
 	mainrdata_len = 0;
 	mainrdata_last = (XLogRecData *) &mainrdata_head;
+	include_origin = false;
 	begininsert_called = false;
 }
 
@@ -375,6 +382,16 @@ XLogRegisterBufData(uint8 block_id, char *data, int len)
 }
 
 /*
+ * Should this record include the replication origin if one is set up?
+ */
+void
+XLogIncludeOrigin(void)
+{
+	Assert(begininsert_called);
+	include_origin = true;
+}
+
+/*
  * Insert an XLOG record having the specified RMID and info bytes, with the
  * body of the record being the data and buffer references registered earlier
  * with XLogRegister* calls.
@@ -678,6 +695,16 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		scratch += sizeof(BlockNumber);
 	}
 
+#ifndef REPLICATION_IDENTIFIER_REUSE_PADDING
+	/* followed by the record's origin, if any */
+	if (include_origin && replication_origin_id != InvalidRepNodeId)
+	{
+		*(scratch++) = XLR_BLOCK_ID_ORIGIN;
+		memcpy(scratch, &replication_origin_id, sizeof(replication_origin_id));
+		scratch += sizeof(replication_origin_id);
+	}
+#endif
+
 	/* followed by main data, if any */
 	if (mainrdata_len > 0)
 	{
@@ -723,6 +750,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	rechdr->xl_tot_len = total_len;
 	rechdr->xl_info = info;
 	rechdr->xl_rmid = rmid;
+#ifdef REPLICATION_IDENTIFIER_REUSE_PADDING
+	rechdr->xl_origin_id = replication_origin_id;
+#endif
 	rechdr->xl_prev = InvalidXLogRecPtr;
 	rechdr->xl_crc = rdata_crc;
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 77be1b8..17880d7 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -21,6 +21,7 @@
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
+#include "replication/replication_identifier.h"
 
 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
 
@@ -975,6 +976,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 	ResetDecoder(state);
 
 	state->decoded_record = record;
+	state->record_origin = InvalidRepNodeId;
 
 	ptr = (char *) record;
 	ptr += SizeOfXLogRecord;
@@ -1009,6 +1011,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 			break;				/* by convention, the main data fragment is
 								 * always last */
 		}
+		else if (block_id == XLR_BLOCK_ID_ORIGIN)
+		{
+			COPY_HEADER_FIELD(&state->record_origin, sizeof(RepNodeId));
+		}
 		else if (block_id <= XLR_MAX_BLOCK_ID)
 		{
 			/* XLogRecordBlockHeader */
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index a403c64..5b04550 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -39,7 +39,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
 	pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \
 	pg_ts_parser.h pg_ts_template.h pg_extension.h \
 	pg_foreign_data_wrapper.h pg_foreign_server.h pg_user_mapping.h \
-	pg_foreign_table.h pg_policy.h \
+	pg_foreign_table.h pg_policy.h pg_replication_identifier.h \
 	pg_default_acl.h pg_seclabel.h pg_shseclabel.h pg_collation.h pg_range.h \
 	toasting.h indexing.h \
     )
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index e9d3cdc..00c4393 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -32,6 +32,7 @@
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_pltemplate.h"
 #include "catalog/pg_db_role_setting.h"
+#include "catalog/pg_replication_identifier.h"
 #include "catalog/pg_shdepend.h"
 #include "catalog/pg_shdescription.h"
 #include "catalog/pg_shseclabel.h"
@@ -224,7 +225,8 @@ IsSharedRelation(Oid relationId)
 		relationId == SharedDependRelationId ||
 		relationId == SharedSecLabelRelationId ||
 		relationId == TableSpaceRelationId ||
-		relationId == DbRoleSettingRelationId)
+		relationId == DbRoleSettingRelationId ||
+		relationId == ReplicationIdentifierRelationId)
 		return true;
 	/* These are their indexes (see indexing.h) */
 	if (relationId == AuthIdRolnameIndexId ||
@@ -240,7 +242,9 @@ IsSharedRelation(Oid relationId)
 		relationId == SharedSecLabelObjectIndexId ||
 		relationId == TablespaceOidIndexId ||
 		relationId == TablespaceNameIndexId ||
-		relationId == DbRoleSettingDatidRolidIndexId)
+		relationId == DbRoleSettingDatidRolidIndexId ||
+		relationId ==  ReplicationLocalIdentIndex ||
+		relationId ==  ReplicationExternalIdentIndex)
 		return true;
 	/* These are their toast tables and toast indexes (see toasting.h) */
 	if (relationId == PgShdescriptionToastTable ||
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index a4fd88f..3ecc16c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -777,6 +777,13 @@ CREATE VIEW pg_user_mappings AS
 
 REVOKE ALL on pg_user_mapping FROM public;
 
+
+CREATE VIEW pg_replication_identifier_progress AS
+    SELECT *
+    FROM pg_get_replication_identifier_progress();
+
+REVOKE ALL ON pg_replication_identifier_progress FROM public;
+
 --
 -- We have a few function definitions in here, too.
 -- At some point there might be enough to justify breaking them out into
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 310a45c..95bcffb 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o snapbuild.o
+OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o replication_identifier.o \
+	snapbuild.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index eb7293f..5003e59 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -40,6 +40,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
+#include "replication/replication_identifier.h"
 #include "replication/snapbuild.h"
 
 #include "storage/standby.h"
@@ -422,6 +423,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 }
 
+static inline bool
+FilterByOrigin(LogicalDecodingContext *ctx, RepNodeId origin_id)
+{
+	if (ctx->callbacks.filter_by_origin_cb == NULL)
+		return false;
+
+	return filter_by_origin_cb_wrapper(ctx, origin_id);
+}
+
 /*
  * Consolidated commit record handling between the different form of commit
  * records.
@@ -430,8 +440,17 @@ static void
 DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			 xl_xact_parsed_commit *parsed, TransactionId xid)
 {
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
+	XLogRecPtr	commit_time = InvalidXLogRecPtr;
+	XLogRecPtr	origin_id = InvalidRepNodeId;
 	int			i;
 
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		origin_lsn = parsed->origin_lsn;
+		commit_time = parsed->origin_timestamp;
+	}
+
 	/*
 	 * Process invalidation messages, even if we're not interested in the
 	 * transaction's contents, since the various caches need to always be
@@ -452,12 +471,13 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * the reorderbuffer to forget the content of the (sub-)transactions
 	 * if not.
 	 *
-	 * There basically two reasons we might not be interested in this
+	 * There can be several reasons we might not be interested in this
 	 * transaction:
 	 * 1) We might not be interested in decoding transactions up to this
 	 *	  LSN. This can happen because we previously decoded it and now just
 	 *	  are restarting or if we haven't assembled a consistent snapshot yet.
 	 * 2) The transaction happened in another database.
+	 * 3) The output plugin is not interested in the origin.
 	 *
 	 * We can't just use ReorderBufferAbort() here, because we need to execute
 	 * the transaction's invalidations.  This currently won't be needed if
@@ -472,7 +492,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * ---
 	 */
 	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
-		(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database))
+		(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+		FilterByOrigin(ctx, origin_id))
 	{
 		for (i = 0; i < parsed->nsubxacts; i++)
 		{
@@ -492,7 +513,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 
 	/* replay actions of all transaction + subtransactions in order */
 	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
-						parsed->xact_time);
+						commit_time, origin_id, origin_lsn);
 }
 
 /*
@@ -537,8 +558,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (target_node.dbNode != ctx->slot->data.database)
 		return;
 
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_INSERT;
+	change->origin_id = XLogRecGetOrigin(r);
 	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
 
 	if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@@ -579,8 +605,13 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (target_node.dbNode != ctx->slot->data.database)
 		return;
 
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_UPDATE;
+	change->origin_id = XLogRecGetOrigin(r);
 	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
 
 	if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@@ -628,8 +659,13 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (target_node.dbNode != ctx->slot->data.database)
 		return;
 
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_DELETE;
+	change->origin_id = XLogRecGetOrigin(r);
 
 	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
 
@@ -673,6 +709,10 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (rnode.dbNode != ctx->slot->data.database)
 		return;
 
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
 	tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
 
 	data = tupledata;
@@ -685,6 +725,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 		change = ReorderBufferGetChange(ctx->reorder);
 		change->action = REORDER_BUFFER_CHANGE_INSERT;
+		change->origin_id = XLogRecGetOrigin(r);
+
 		memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
 
 		/*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 774ebbc..b60a1df 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -39,6 +39,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
+#include "replication/replication_identifier.h"
 #include "replication/snapbuild.h"
 
 #include "storage/proc.h"
@@ -46,6 +47,10 @@
 
 #include "utils/memutils.h"
 
+RepNodeId	replication_origin_id = InvalidRepNodeId; /* assumed identity */
+XLogRecPtr	replication_origin_lsn;
+TimestampTz	replication_origin_timestamp;
+
 /* data for errcontext callback */
 typedef struct LogicalErrorCallbackState
 {
@@ -720,6 +725,34 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+bool
+filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepNodeId origin_id)
+{
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool ret;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "shutdown";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 /*
  * Set the required catalog xmin horizon for historic snapshots in the current
  * replication slot.
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index dc85583..e37a736 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1255,7 +1255,8 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
 void
 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-					TimestampTz commit_time)
+					TimestampTz commit_time,
+					RepNodeId origin_id, XLogRecPtr origin_lsn)
 {
 	ReorderBufferTXN *txn;
 	volatile Snapshot snapshot_now;
@@ -1273,6 +1274,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	txn->final_lsn = commit_lsn;
 	txn->end_lsn = end_lsn;
 	txn->commit_time = commit_time;
+	txn->origin_id = origin_id;
+	txn->origin_lsn = origin_lsn;
 
 	/* serialize the last bunch of changes if we need start earlier anyway */
 	if (txn->nentries_mem != txn->nentries)
diff --git a/src/backend/replication/logical/replication_identifier.c b/src/backend/replication/logical/replication_identifier.c
new file mode 100644
index 0000000..ef3f511
--- /dev/null
+++ b/src/backend/replication/logical/replication_identifier.c
@@ -0,0 +1,1296 @@
+/*-------------------------------------------------------------------------
+ *
+ * replication_identifier.c
+ *	  Logical Replication Node Identifier and replication progress persistency
+ *	  support.
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/replication_identifier.c
+ *
+ * NOTES
+ *
+ * This file provides the following:
+ * * Interface functions for naming nodes in a replication setup
+ * * A facility to efficiently store and persist replication progress in a
+ *   efficient and durable manner.
+ *
+ * Replication identifiers consist out of a descriptive, user defined,
+ * external name and a short, thus space efficient, internal 2 byte one. This
+ * split exists because replication identifiers have to be stored in WAL and
+ * shared memory and long descriptors would be inefficient.  For now only use
+ * 2 bytes for the internal id of a replication identifier as it seems
+ * unlikely that there soon will be more than 65k nodes in one replication
+ * setup; and using only two bytes allow us to be more space efficient.
+ *
+ * Replication progress is tracked in a shared memory table
+ * (ReplicationStates) that's dumped to disk every checkpoint. Entries
+ * ('slots') in this table are identified by the internal id. That's the case
+ * because it allows to increase replication progress during crash
+ * recovery. To allow doing so we store the original LSN (from the originating
+ * system) of a transaction in the commit record. That allows to recover the
+ * precise replayed state after crash recovery; without requiring synchronous
+ * commits. Allowing logical replication to use asynchronous commit is
+ * generally good for performance, but especially important as it allows a
+ * single threaded replay process to keep up with a source that has multiple
+ * backends generating changes concurrently.  For efficiency and simplicity
+ * reasons a backend can setup a replication identifier as its origin (a
+ * "cached replication identifier") that's from then on the source of changes
+ * produced by the backend, until reset again.
+ *
+ * This infrastructure is intended to be used in cooperation with logical
+ * decoding. When replaying from a remote system the configured origin is
+ * provided to output plugins, allowing filtering and such.
+ *
+ *
+ * There are several levels of locking at work:
+ *
+ * * To create and drop replication identifiers a exclusive lock on
+ *   pg_replication_slot is required for the duration. That allows us to
+ *   safely and conflict free assign new identifiers using a dirty snapshot.
+ *
+ * * When creating a in-memory replication progress slot the
+ *   ReplicationIdentifier LWLock has to be held exclusively; when iterating
+ *   over the replication progress a shared lock has to be held, the same when
+ *   advancing the replication progress of a individual backend that has not
+ *   setup as the backend's cached replication identifier.
+ *
+ * * When manipulating or looking at the remote_lsn and local_lsn fields of a
+ *   replication progress slot that slot's spinlock has to be held. That's
+ *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
+ *   all our platforms, but it also simplifies memory ordering concerns
+ *   between the remote and local lsn.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "funcapi.h"
+#include "miscadmin.h"
+
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+
+#include "catalog/indexing.h"
+
+#include "nodes/execnodes.h"
+
+#include "replication/replication_identifier.h"
+#include "replication/logical.h"
+
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/copydir.h"
+#include "storage/spin.h"
+
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/pg_lsn.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/tqual.h"
+
+/*
+ * Replay progress of a single remote node.
+ */
+typedef struct ReplicationState
+{
+	/*
+	 * Local identifier for the remote node.
+	 */
+	RepNodeId	local_identifier;
+
+	/*
+	 * Location of the latest commit from the remote side.
+	 */
+	XLogRecPtr	remote_lsn;
+
+	/*
+	 * Remember the local lsn of the commit record so we can XLogFlush() to it
+	 * during a checkpoint so we know the commit record actually is safe on
+	 * disk.
+	 */
+	XLogRecPtr	local_lsn;
+
+	/*
+	 * Slot is setup in backend?
+	 */
+	pid_t		acquired_by;
+
+	/*
+	 * Spinlock protecting remote_lsn and local_lsn.
+	 */
+	slock_t		mutex;
+} ReplicationState;
+
+/*
+ * On disk version of ReplicationState.
+ */
+typedef struct ReplicationStateOnDisk
+{
+	RepNodeId	local_identifier;
+	XLogRecPtr	remote_lsn;
+} ReplicationStateOnDisk;
+
+
+/*
+ * Base address into a shared memory array of replication states of size
+ * max_replication_slots.
+ *
+ * XXX: Should we use a separate variable to size this rather than
+ * max_replication_slots?
+ */
+static ReplicationState *ReplicationStates;
+
+/*
+ * Backend-local, cached element from ReplicationStates for use in a backend
+ * replaying remote commits, so we don't have to search ReplicationStates for
+ * the backends current RepNodeId.
+ */
+static ReplicationState *cached_replication_state = NULL;
+
+/* Magic for on disk files. */
+#define REPLICATION_STATE_MAGIC ((uint32)0x1257DADE)
+
+static void
+CheckReplicationIdentifierPrerequisites(bool check_slots)
+{
+	if (!superuser())
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 errmsg("only superusers can query or manipulate replication identifiers")));
+
+	if (check_slots && max_replication_slots == 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot query or manipulate replication identifiers when max_replication_slots = 0")));
+
+}
+
+
+/* ---------------------------------------------------------------------------
+ * Functions for working with replication identifiers themselves.
+ * ---------------------------------------------------------------------------
+ */
+
+/*
+ * Check for a persistent repication identifier identified by the replication
+ * identifier's external name..
+ *
+ * Returns InvalidOid if the node isn't known yet.
+ */
+RepNodeId
+GetReplicationIdentifier(char *riname, bool missing_ok)
+{
+	Form_pg_replication_identifier ident;
+	Oid		riident = InvalidOid;
+	HeapTuple tuple;
+	Datum	riname_d;
+
+	riname_d = CStringGetTextDatum(riname);
+
+	tuple = SearchSysCache1(REPLIDREMOTE, riname_d);
+	if (HeapTupleIsValid(tuple))
+	{
+		ident = (Form_pg_replication_identifier) GETSTRUCT(tuple);
+		riident = ident->riident;
+		ReleaseSysCache(tuple);
+	}
+	else if (!missing_ok)
+		elog(ERROR, "cache lookup failed for replication identifier named %s",
+			riname);
+
+	return riident;
+}
+
+/*
+ * Create a persistent replication identifier.
+ *
+ * Needs to be called in a transaction.
+ */
+RepNodeId
+CreateReplicationIdentifier(char *riname)
+{
+	Oid		riident;
+	HeapTuple tuple = NULL;
+	Relation rel;
+	Datum	riname_d;
+	SnapshotData SnapshotDirty;
+	SysScanDesc scan;
+	ScanKeyData key;
+
+	riname_d = CStringGetTextDatum(riname);
+
+	Assert(IsTransactionState());
+
+	/*
+	 * We need the numeric replication identifiers to be 16bit wide, so we
+	 * cannot rely on the normal oid allocation. So we simply scan
+	 * pg_replication_identifier for the first unused id. That's not
+	 * particularly efficient, but this should be an fairly infrequent
+	 * operation - we can easily spend a bit more code on this when it turns
+	 * out it needs to be faster.
+	 *
+	 * We handle concurrency by taking an exclusive lock (allowing reads!)
+	 * over the table for the duration of the search. Because we use a "dirty
+	 * snapshot" we can read rows that other in-progress sessions have
+	 * written, even though they would be invisible with normal snapshots. Due
+	 * to the exclusive lock there's no danger that new rows can appear while
+	 * we're checking.
+	 */
+	InitDirtySnapshot(SnapshotDirty);
+
+	rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock);
+
+	for (riident = InvalidOid + 1; riident < UINT16_MAX; riident++)
+	{
+		bool		nulls[Natts_pg_replication_identifier];
+		Datum		values[Natts_pg_replication_identifier];
+		bool		collides;
+		CHECK_FOR_INTERRUPTS();
+
+		ScanKeyInit(&key,
+					Anum_pg_replication_riident,
+					BTEqualStrategyNumber, F_OIDEQ,
+					ObjectIdGetDatum(riident));
+
+		scan = systable_beginscan(rel, ReplicationLocalIdentIndex,
+								  true /* indexOK */,
+								  &SnapshotDirty,
+								  1, &key);
+
+		collides = HeapTupleIsValid(systable_getnext(scan));
+
+		systable_endscan(scan);
+
+		if (!collides)
+		{
+			/*
+			 * Ok, found an unused riident, insert the new row and do a CCI,
+			 * so our callers can look it up if they want to.
+			 */
+			memset(&nulls, 0, sizeof(nulls));
+
+			values[Anum_pg_replication_riident -1] = ObjectIdGetDatum(riident);
+			values[Anum_pg_replication_riname - 1] = riname_d;
+
+			tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+			simple_heap_insert(rel, tuple);
+			CatalogUpdateIndexes(rel, tuple);
+			CommandCounterIncrement();
+			break;
+		}
+	}
+
+	/* now release lock again,  */
+	heap_close(rel, ExclusiveLock);
+
+	if (tuple == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+				 errmsg("no free replication id could be found")));
+
+	heap_freetuple(tuple);
+	return riident;
+}
+
+
+/*
+ * Create a persistent replication identifier.
+ *
+ * Needs to be called in a transaction.
+ */
+void
+DropReplicationIdentifier(RepNodeId riident)
+{
+	HeapTuple tuple = NULL;
+	Relation rel;
+	int			i;
+
+	Assert(IsTransactionState());
+
+	rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock);
+
+	/* cleanup the slot state info */
+	LWLockAcquire(ReplicationIdentifierLock, LW_EXCLUSIVE);
+
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationState *state = &ReplicationStates[i];
+
+		/* found our slot */
+		if (state->local_identifier == riident)
+		{
+			if (state->acquired_by != 0)
+			{
+				elog(ERROR, "cannot drop slot that is setup in backend %d",
+					 state->acquired_by);
+			}
+			/* reset entry */
+			state->local_identifier = InvalidRepNodeId;
+			state->remote_lsn = InvalidXLogRecPtr;
+			state->local_lsn = InvalidXLogRecPtr;
+			break;
+		}
+	}
+	LWLockRelease(ReplicationIdentifierLock);
+
+	tuple = SearchSysCache1(REPLIDIDENT, ObjectIdGetDatum(riident));
+	simple_heap_delete(rel, &tuple->t_self);
+	ReleaseSysCache(tuple);
+
+	CommandCounterIncrement();
+
+	/* now release lock again,  */
+	heap_close(rel, ExclusiveLock);
+}
+
+
+/*
+ * Lookup pg_replication_identifier via riident and return the external name.
+ *
+ * The external name is palloc'd in the calling context.
+ *
+ * Returns true if the identifier is known, false otherwise.
+ */
+bool
+GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok, char **riname)
+{
+	HeapTuple tuple;
+	Form_pg_replication_identifier ric;
+
+	Assert(OidIsValid((Oid) riident));
+	Assert(riident != InvalidRepNodeId);
+	Assert(riident != DoNotReplicateRepNodeId);
+
+	tuple = SearchSysCache1(REPLIDIDENT,
+							ObjectIdGetDatum((Oid) riident));
+
+	if (HeapTupleIsValid(tuple))
+	{
+		ric = (Form_pg_replication_identifier) GETSTRUCT(tuple);
+		*riname = text_to_cstring(&ric->riname);
+		ReleaseSysCache(tuple);
+
+		return true;
+	}
+	else
+	{
+		*riname = NULL;
+
+		if (!missing_ok)
+			elog(ERROR, "cache lookup failed for replication identifier id: %u",
+				 riident);
+
+		return false;
+	}
+}
+
+
+/* ---------------------------------------------------------------------------
+ * Functions for handling replication progress.
+ * ---------------------------------------------------------------------------
+ */
+
+Size
+ReplicationIdentifierShmemSize(void)
+{
+	Size		size = 0;
+
+	/*
+	 * XXX: max_replication_slots is arguablethe wrong thing to use here, here
+	 * we keep the replay state of *remote* transactions. But for now it seems
+	 * sufficient to reuse it, lest we introduce a separate guc.
+	 */
+	if (max_replication_slots == 0)
+		return size;
+
+	size = add_size(size,
+					mul_size(max_replication_slots, sizeof(ReplicationState)));
+	return size;
+}
+
+void
+ReplicationIdentifierShmemInit(void)
+{
+	bool		found;
+
+	if (max_replication_slots == 0)
+		return;
+
+	ReplicationStates = (ReplicationState *)
+		ShmemInitStruct("ReplicationIdentifierState",
+						ReplicationIdentifierShmemSize(),
+						&found);
+
+	if (!found)
+	{
+		int i;
+
+		MemSet(ReplicationStates, 0, ReplicationIdentifierShmemSize());
+
+		for (i = 0; i < max_replication_slots; i++)
+			SpinLockInit(&ReplicationStates[i].mutex);
+	}
+}
+
+/* ---------------------------------------------------------------------------
+ * Perform a checkpoint of replication identifier's progress with respect to
+ * the replayed remote_lsn. Make sure that all transactions we refer to in the
+ * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
+ * if the transactions were originally committed asynchronously.
+ *
+ * We store checkpoints in the following format:
+ * +-------+------------------------+------------------+-----+--------+
+ * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
+ * +-------+------------------------+------------------+-----+--------+
+ *
+ * So its just the magic, followed by the statically sized
+ * ReplicationStateOnDisk structs. Note that the maximum number of
+ * ReplicationStates is determined by max_replication_slots.
+ * ---------------------------------------------------------------------------
+ */
+void
+CheckPointReplicationIdentifier(void)
+{
+	const char *tmppath = "pg_logical/replident_checkpoint.tmp";
+	const char *path = "pg_logical/replident_checkpoint";
+	int			tmpfd;
+	int			i;
+	uint32		magic = REPLICATION_STATE_MAGIC;
+	pg_crc32c	crc;
+
+	if (max_replication_slots == 0)
+		return;
+
+	INIT_CRC32C(crc);
+
+	/* make sure no old temp file is remaining */
+	if (unlink(tmppath) < 0 && errno != ENOENT)
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not remove file \"%s\": %m",
+						path)));
+
+	/*
+	 * no other backend can perform this at the same time, we're protected by
+	 * CheckpointLock.
+	 */
+	tmpfd = OpenTransientFile((char *) tmppath,
+							  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+							  S_IRUSR | S_IWUSR);
+	if (tmpfd < 0)
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not create file \"%s\": %m",
+						tmppath)));
+
+	/* write magic */
+	if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
+	{
+		CloseTransientFile(tmpfd);
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not write to file \"%s\": %m",
+						tmppath)));
+	}
+	COMP_CRC32C(crc, &magic, sizeof(magic));
+
+	/* prevent concurrent creations/drops */
+	LWLockAcquire(ReplicationIdentifierLock, LW_SHARED);
+
+	/* write actual data */
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationStateOnDisk disk_state;
+		ReplicationState *curstate = &ReplicationStates[i];
+		XLogRecPtr local_lsn;
+
+		if (curstate->local_identifier == InvalidRepNodeId)
+			continue;
+
+		disk_state.local_identifier = curstate->local_identifier;
+
+		SpinLockAcquire(&curstate->mutex);
+		disk_state.remote_lsn = curstate->remote_lsn;
+		local_lsn = curstate->local_lsn;
+		SpinLockRelease(&curstate->mutex);
+
+		/* make sure we only write out a commit that's persistent */
+		XLogFlush(local_lsn);
+
+		if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
+			sizeof(disk_state))
+		{
+			CloseTransientFile(tmpfd);
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not write to file \"%s\": %m",
+							tmppath)));
+		}
+
+		COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
+	}
+
+	LWLockRelease(ReplicationIdentifierLock);
+
+	/* write out the CRC */
+	FIN_CRC32C(crc);
+	if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
+	{
+		CloseTransientFile(tmpfd);
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not write to file \"%s\": %m",
+						tmppath)));
+	}
+
+	/* fsync the temporary file */
+	if (pg_fsync(tmpfd) != 0)
+	{
+		CloseTransientFile(tmpfd);
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not fsync file \"%s\": %m",
+						tmppath)));
+	}
+
+	CloseTransientFile(tmpfd);
+
+	/* rename to permanent file, fsync file and directory */
+	if (rename(tmppath, path) != 0)
+	{
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not rename file \"%s\" to \"%s\": %m",
+						tmppath, path)));
+	}
+
+	fsync_fname((char *) path, false);
+	fsync_fname("pg_logical", true);
+}
+
+/*
+ * Recover replication replay status from checkpoint data saved earlier by
+ * CheckPointReplicationIdentifier.
+ *
+ * This only needs to be called at startup and *not* during every checkpoint
+ * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
+ * state thereafter can be recovered by looking at commit records.
+ */
+void
+StartupReplicationIdentifier(void)
+{
+	const char *path = "pg_logical/replident_checkpoint";
+	int fd;
+	int readBytes;
+	uint32 magic = REPLICATION_STATE_MAGIC;
+	int last_state = 0;
+	pg_crc32c file_crc;
+	pg_crc32c crc;
+
+	/* don't want to overwrite already existing state */
+#ifdef USE_ASSERT_CHECKING
+	static bool already_started = false;
+	Assert(!already_started);
+	already_started = true;
+#endif
+
+	if (max_replication_slots == 0)
+		return;
+
+	INIT_CRC32C(crc);
+
+	elog(LOG, "starting up replication identifiers");
+
+	fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
+
+	/*
+	 * might have had max_replication_slots == 0 last run, or we just brought up a
+	 * standby.
+	 */
+	if (fd < 0 && errno == ENOENT)
+		return;
+	else if (fd < 0)
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m",
+						path)));
+
+	/* verify magic, thats written even if nothing was active */
+	readBytes = read(fd, &magic, sizeof(magic));
+	if (readBytes != sizeof(magic))
+		ereport(PANIC,
+				(errmsg("could not read file \"%s\": %m",
+						path)));
+	COMP_CRC32C(crc, &magic, sizeof(magic));
+
+	if (magic != REPLICATION_STATE_MAGIC)
+		ereport(PANIC,
+				(errmsg("replication checkpoint has wrong magic %u instead of %u",
+						magic, REPLICATION_STATE_MAGIC)));
+
+	/* we can skip locking here, no other access is possible */
+
+	/* recover individual states, until there are no more to be found */
+	while (true)
+	{
+		ReplicationStateOnDisk disk_state;
+
+		readBytes = read(fd, &disk_state, sizeof(disk_state));
+
+		/* no further data */
+		if (readBytes == sizeof(crc))
+		{
+			/* not pretty, but simple ... */
+			file_crc = *(pg_crc32c*) &disk_state;
+			break;
+		}
+
+		if (readBytes < 0)
+		{
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not read file \"%s\": %m",
+							path)));
+		}
+
+		if (readBytes != sizeof(disk_state))
+		{
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not read file \"%s\": read %d of %zu",
+							path, readBytes, sizeof(disk_state))));
+		}
+
+		COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
+
+		if (last_state == max_replication_slots)
+			ereport(PANIC,
+					(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+					 errmsg("no free replication state could be found, increase max_replication_slots")));
+
+		/* copy data to shared memory */
+		ReplicationStates[last_state].local_identifier = disk_state.local_identifier;
+		ReplicationStates[last_state].remote_lsn = disk_state.remote_lsn;
+		last_state++;
+
+		elog(LOG, "recovered replication state of node %u to %X/%X",
+			 disk_state.local_identifier,
+			 (uint32)(disk_state.remote_lsn >> 32),
+			 (uint32)disk_state.remote_lsn);
+	}
+
+	/* now check checksum */
+	FIN_CRC32C(crc);
+	if (file_crc != crc)
+		ereport(PANIC,
+				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+				 errmsg("replication_slot_checkpoint has wrong checksum %u, expected %u",
+						crc, file_crc)));
+
+	CloseTransientFile(fd);
+}
+
+/*
+ * Tell the replication identifier machinery that a commit from 'node' that
+ * originated at the LSN remote_commit on the remote node was replayed
+ * successfully and that we don't need to do so again. In combination with
+ * setting up replication_origin_lsn and replication_origin_id that ensures we
+ * won't loose knowledge about that after a crash if the the transaction had a
+ * persistent effect (think of asynchronous commits).
+ *
+ * local_commit needs to be a local LSN of the commit so that we can make sure
+ * uppon a checkpoint that enough WAL has been persisted to disk.
+ *
+ * Needs to be called with a RowExclusiveLock on pg_replication_identifier,
+ * unless running in recovery.
+ */
+void
+AdvanceReplicationIdentifier(RepNodeId node,
+							 XLogRecPtr remote_commit,
+							 XLogRecPtr local_commit)
+{
+	int i;
+	int free_slot = -1;
+	ReplicationState *replication_state = NULL;
+
+	Assert(node != InvalidRepNodeId);
+
+	/* we don't track DoNotReplicateRepNodeId */
+	if (node == DoNotReplicateRepNodeId)
+		return;
+
+	/*
+	 * XXX: should we restore into a hashtable and dump into shmem only after
+	 * recovery finished?
+	 */
+
+	/* Lock exclusively, as we may have to create a new table entry. */
+	LWLockAcquire(ReplicationIdentifierLock, LW_EXCLUSIVE);
+
+	/*
+	 * Search for either an existing slot for that identifier or a free one we
+	 * can use.
+	 */
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationState *curstate = &ReplicationStates[i];
+
+		/* remember where to insert if necessary */
+		if (curstate->local_identifier == InvalidRepNodeId &&
+			free_slot == -1)
+		{
+			free_slot = i;
+			continue;
+		}
+
+		/* not our slot */
+		if (curstate->local_identifier != node)
+			continue;
+
+		if (curstate->acquired_by != 0)
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_IN_USE),
+					 errmsg("replication identiefer %d is already active for pid %d",
+							curstate->local_identifier, curstate->acquired_by)));
+		}
+
+		/* ok, found slot */
+		replication_state = curstate;
+		break;
+	}
+
+	if (replication_state == NULL && free_slot == -1)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+				 errmsg("no free replication state slot could be found for replication identifier %u",
+						node),
+				 errhint("Increase max_replication_slots and try again.")));
+	else if (replication_state == NULL)
+	{
+		/* initialize new slot */
+		replication_state = &ReplicationStates[free_slot];
+		Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
+		Assert(replication_state->local_lsn == InvalidXLogRecPtr);
+		replication_state->local_identifier = node;
+	}
+
+	Assert(replication_state->local_identifier != InvalidRepNodeId);
+
+	/*
+	 * Due to - harmless - race conditions during a checkpoint we could see
+	 * values here that are older than the ones we already have in
+	 * memory. Don't overwrite those.
+	 */
+	SpinLockAcquire(&replication_state->mutex);
+	if (replication_state->remote_lsn < remote_commit)
+		replication_state->remote_lsn = remote_commit;
+	if (replication_state->local_lsn < local_commit)
+		replication_state->local_lsn = local_commit;
+	SpinLockRelease(&replication_state->mutex);
+
+	/*
+	 * Release *after* changing the LSNs, slot isn't acquired and thus could
+	 * otherwise be dropped anytime.
+	 */
+	LWLockRelease(ReplicationIdentifierLock);
+}
+
+
+XLogRecPtr
+ReplicationIdentifierProgress(RepNodeId node, bool flush)
+{
+	int			i;
+	XLogRecPtr	local_lsn = InvalidXLogRecPtr;
+	XLogRecPtr	remote_lsn = InvalidXLogRecPtr;
+
+	/* prevent slots from being concurrently dropped */
+	LWLockAcquire(ReplicationIdentifierLock, LW_SHARED);
+
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationState *state;
+
+		state = &ReplicationStates[i];
+
+		if (state->local_identifier == node)
+		{
+			SpinLockAcquire(&state->mutex);
+			remote_lsn = state->remote_lsn;
+			local_lsn = state->local_lsn;
+			SpinLockRelease(&state->mutex);
+			break;
+		}
+	}
+
+	LWLockRelease(ReplicationIdentifierLock);
+
+	if (flush && local_lsn != InvalidXLogRecPtr)
+		XLogFlush(local_lsn);
+
+	return remote_lsn;
+}
+
+/*
+ * Tear down a (possibly) cached replication identifier during process exit.
+ */
+static void
+ReplicationIdentifierExitCleanup(int code, Datum arg)
+{
+
+	LWLockAcquire(ReplicationIdentifierLock, LW_EXCLUSIVE);
+
+	if (cached_replication_state != NULL &&
+		cached_replication_state->acquired_by == MyProcPid)
+	{
+		cached_replication_state->acquired_by = 0;
+		cached_replication_state = NULL;
+	}
+
+	LWLockRelease(ReplicationIdentifierLock);
+}
+
+/*
+ * Setup a replication identifier in the shared memory struct if it doesn't
+ * already exists and cache access to the specific ReplicationSlot so the
+ * array doesn't have to be searched when calling
+ * AdvanceCachedReplicationIdentifier().
+ *
+ * Obviously only one such cached identifier can exist per process and the
+ * current cached value can only be set again after the previous value is torn
+ * down with TeardownCachedReplicationIdentifier().
+ */
+void
+SetupCachedReplicationIdentifier(RepNodeId node)
+{
+	static bool registered_cleanup;
+	int		i;
+	int		free_slot = -1;
+
+	if (!registered_cleanup)
+	{
+		on_shmem_exit(ReplicationIdentifierExitCleanup, 0);
+		registered_cleanup = true;
+	}
+
+	Assert(max_replication_slots > 0);
+
+	if (cached_replication_state != NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot setup replication origin when one is already setup")));
+
+	/* Lock exclusively, as we may have to create a new table entry. */
+	LWLockAcquire(ReplicationIdentifierLock, LW_EXCLUSIVE);
+
+	/*
+	 * Search for either an existing slot for that identifier or a free one we
+	 * can use.
+	 */
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationState *curstate = &ReplicationStates[i];
+
+		/* remember where to insert if necessary */
+		if (curstate->local_identifier == InvalidRepNodeId &&
+			free_slot == -1)
+		{
+			free_slot = i;
+			continue;
+		}
+
+		/* not our slot */
+		if (curstate->local_identifier != node)
+			continue;
+
+		else if (curstate->acquired_by != 0)
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_IN_USE),
+					 errmsg("replication identiefer %d is already active for pid %d",
+							curstate->local_identifier, curstate->acquired_by)));
+		}
+
+		/* ok, found slot */
+		cached_replication_state = curstate;
+	}
+
+
+	if (cached_replication_state == NULL && free_slot == -1)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+				 errmsg("no free replication state slot could be found for replication identifier %u",
+						node),
+				 errhint("Increase max_replication_slots and try again.")));
+	else if (cached_replication_state == NULL)
+	{
+		/* initialize new slot */
+		cached_replication_state = &ReplicationStates[free_slot];
+		Assert(cached_replication_state->remote_lsn == InvalidXLogRecPtr);
+		Assert(cached_replication_state->local_lsn == InvalidXLogRecPtr);
+		cached_replication_state->local_identifier = node;
+	}
+
+
+	Assert(cached_replication_state->local_identifier != InvalidRepNodeId);
+
+	cached_replication_state->acquired_by = MyProcPid;
+
+	LWLockRelease(ReplicationIdentifierLock);
+}
+
+/*
+ * Make currently cached replication identifier unavailable so a new one can
+ * be setup with SetupCachedReplicationIdentifier().
+ *
+ * This function may only be called if a previous identifier was setup with
+ * SetupCachedReplicationIdentifier().
+ */
+void
+TeardownCachedReplicationIdentifier(void)
+{
+	Assert(max_replication_slots != 0);
+
+	if (cached_replication_state == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("no replication identifier is set up")));
+
+	LWLockAcquire(ReplicationIdentifierLock, LW_EXCLUSIVE);
+
+	cached_replication_state->acquired_by = 0;
+	cached_replication_state = NULL;
+
+	LWLockRelease(ReplicationIdentifierLock);
+}
+
+/*
+ * Do the same work AdvanceReplicationIdentifier() does, just on a pre-cached
+ * identifier. This is noticeably cheaper if you only ever work on a single
+ * replication identifier.
+ */
+void
+AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+								   XLogRecPtr local_commit)
+{
+	Assert(cached_replication_state != NULL);
+	Assert(cached_replication_state->local_identifier != InvalidRepNodeId);
+
+	SpinLockAcquire(&cached_replication_state->mutex);
+	if (cached_replication_state->local_lsn < local_commit)
+		cached_replication_state->local_lsn = local_commit;
+	if (cached_replication_state->remote_lsn < remote_commit)
+		cached_replication_state->remote_lsn = remote_commit;
+	SpinLockRelease(&cached_replication_state->mutex);
+}
+
+/*
+ * Ask the machinery about the point up to which we successfully replayed
+ * changes from a already setup & cached replication identifier.
+ */
+XLogRecPtr
+CachedReplicationIdentifierProgress(void)
+{
+	XLogRecPtr remote_lsn;
+
+	Assert(cached_replication_state != NULL);
+
+	SpinLockAcquire(&cached_replication_state->mutex);
+	remote_lsn = cached_replication_state->remote_lsn;
+	SpinLockRelease(&cached_replication_state->mutex);
+
+	return remote_lsn;
+}
+
+
+
+/* ---------------------------------------------------------------------------
+ * SQL functions for working with replication identifiers.
+ *
+ * These mostly should be fairly short wrappers around more generic functions.
+ * ---------------------------------------------------------------------------
+ */
+
+/*
+ * Return the internal replication identifier for the passed in external one.
+ */
+Datum
+pg_replication_identifier_get(PG_FUNCTION_ARGS)
+{
+	char *name;
+	RepNodeId riident;
+
+	CheckReplicationIdentifierPrerequisites(false);
+
+	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+	riident = GetReplicationIdentifier(name, true);
+
+	pfree(name);
+
+	if (OidIsValid(riident))
+		PG_RETURN_OID(riident);
+	PG_RETURN_NULL();
+}
+
+/*
+ * Create a replication identifier with the passed in name, and return the
+ * assigned internal identifier.
+ */
+Datum
+pg_replication_identifier_create(PG_FUNCTION_ARGS)
+{
+	char *name;
+	RepNodeId riident;
+
+	CheckReplicationIdentifierPrerequisites(false);
+
+	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+	riident = CreateReplicationIdentifier(name);
+
+	pfree(name);
+
+	PG_RETURN_OID(riident);
+}
+
+/*
+ * Setup a cached replication identifier in the current session.
+ */
+Datum
+pg_replication_identifier_setup_replaying_from(PG_FUNCTION_ARGS)
+{
+	char *name;
+	RepNodeId origin;
+
+	CheckReplicationIdentifierPrerequisites(true);
+
+	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+	origin = GetReplicationIdentifier(name, false);
+	SetupCachedReplicationIdentifier(origin);
+
+	replication_origin_id = origin;
+
+	pfree(name);
+
+	PG_RETURN_VOID();
+}
+
+Datum
+pg_replication_identifier_is_replaying(PG_FUNCTION_ARGS)
+{
+	CheckReplicationIdentifierPrerequisites(false);
+
+	PG_RETURN_BOOL(replication_origin_id != InvalidRepNodeId);
+}
+
+Datum
+pg_replication_identifier_reset_replaying_from(PG_FUNCTION_ARGS)
+{
+	CheckReplicationIdentifierPrerequisites(true);
+
+	TeardownCachedReplicationIdentifier();
+
+	replication_origin_id = InvalidRepNodeId;
+
+	PG_RETURN_VOID();
+}
+
+Datum
+pg_replication_identifier_setup_tx_origin(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	location = PG_GETARG_LSN(0);
+
+	CheckReplicationIdentifierPrerequisites(true);
+
+	if (cached_replication_state == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("need to setup the origin id first")));
+
+	replication_origin_lsn = location;
+	replication_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
+
+	PG_RETURN_VOID();
+}
+
+Datum
+pg_replication_identifier_advance(PG_FUNCTION_ARGS)
+{
+	text	   *name = PG_GETARG_TEXT_P(0);
+	XLogRecPtr remote_commit = PG_GETARG_LSN(1);
+	RepNodeId  node;
+
+	CheckReplicationIdentifierPrerequisites(true);
+
+	/* lock to prevent the replication identifier from vanishing */
+	LockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock);
+
+	node = GetReplicationIdentifier(text_to_cstring(name), false);
+
+	/*
+	 * Can't sensibly pass a local commit to be flushed at checkpoint - this
+	 * xact hasn't committed yet. This is why this function should be used to
+	 * set up the intial replication state, but not for replay.
+	 */
+	AdvanceReplicationIdentifier(node, remote_commit, InvalidXLogRecPtr);
+
+	UnlockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock);
+
+	PG_RETURN_VOID();
+}
+
+Datum
+pg_replication_identifier_drop(PG_FUNCTION_ARGS)
+{
+	char *name;
+	RepNodeId riident;
+
+	CheckReplicationIdentifierPrerequisites(false);
+
+	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+
+	riident = GetReplicationIdentifier(name, false);
+	Assert(OidIsValid(riident));
+
+	DropReplicationIdentifier(riident);
+
+	pfree(name);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Return the replication progress for an individual replication identifier.
+ *
+ * If 'flush' is set to true it is ensured that the returned value corresponds
+ * to a local transaction that has been flushed. this is useful if asychronous
+ * commits are used when replaying replicated transactions.
+ */
+Datum
+pg_replication_identifier_progress(PG_FUNCTION_ARGS)
+{
+	char	   *name;
+	bool		flush;
+	RepNodeId	riident;
+	XLogRecPtr	remote_lsn = InvalidXLogRecPtr;
+
+	CheckReplicationIdentifierPrerequisites(true);
+
+	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+	flush = PG_GETARG_BOOL(1);
+
+	riident = GetReplicationIdentifier(name, false);
+	Assert(OidIsValid(riident));
+
+	remote_lsn = ReplicationIdentifierProgress(riident, flush);
+
+	if (remote_lsn == InvalidXLogRecPtr)
+		PG_RETURN_NULL();
+
+	PG_RETURN_LSN(remote_lsn);
+}
+
+
+Datum
+pg_get_replication_identifier_progress(PG_FUNCTION_ARGS)
+{
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+	int			i;
+#define REPLICATION_IDENTIFIER_PROGRESS_COLS 4
+
+	/* we we want to return 0 rows if slot is set to zero */
+	CheckReplicationIdentifierPrerequisites(false);
+
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not allowed in this context")));
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	if (tupdesc->natts != REPLICATION_IDENTIFIER_PROGRESS_COLS)
+		elog(ERROR, "wrong function definition");
+
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+
+	/* prevent slots from being concurrently dropped */
+	LWLockAcquire(ReplicationIdentifierLock, LW_SHARED);
+
+	/*
+	 * Iterate through all possible ReplicationStates, display if they are
+	 * filled. Note that we do not take any locks, so slightly corrupted/out
+	 * of date values are a possibility.
+	 */
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationState *state;
+		Datum		values[REPLICATION_IDENTIFIER_PROGRESS_COLS];
+		bool		nulls[REPLICATION_IDENTIFIER_PROGRESS_COLS];
+		char	   *riname;
+
+		state = &ReplicationStates[i];
+
+		/* unused slot, nothing to display */
+		if (state->local_identifier == InvalidRepNodeId)
+			continue;
+
+		memset(values, 0, sizeof(values));
+		memset(nulls, 0, sizeof(nulls));
+
+		values[ 0] = ObjectIdGetDatum(state->local_identifier);
+
+		/*
+		 * We're not preventing the identifier to be dropped concurrently, so
+		 * silently accept that it might be gone.
+		 */
+		if (!GetReplicationInfoByIdentifier(state->local_identifier, true,
+											&riname))
+			continue;
+
+		values[ 1] = CStringGetTextDatum(riname);
+
+		SpinLockAcquire(&state->mutex);
+
+		values[ 2] = LSNGetDatum(state->remote_lsn);
+
+		values[ 3] = LSNGetDatum(state->local_lsn);
+
+		SpinLockRelease(&state->mutex);
+
+		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	}
+
+	tuplestore_donestoring(tupstore);
+
+	LWLockRelease(ReplicationIdentifierLock);
+
+#undef REPLICATION_IDENTIFIER_PROGRESS_COLS
+
+	return (Datum) 0;
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 16b9808..e927698 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -31,6 +31,7 @@
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "replication/replication_identifier.h"
 #include "storage/bufmgr.h"
 #include "storage/dsm.h"
 #include "storage/ipc.h"
@@ -132,6 +133,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, CheckpointerShmemSize());
 		size = add_size(size, AutoVacuumShmemSize());
 		size = add_size(size, ReplicationSlotsShmemSize());
+		size = add_size(size, ReplicationIdentifierShmemSize());
 		size = add_size(size, WalSndShmemSize());
 		size = add_size(size, WalRcvShmemSize());
 		size = add_size(size, BTreeShmemSize());
@@ -238,6 +240,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	CheckpointerShmemInit();
 	AutoVacuumShmemInit();
 	ReplicationSlotsShmemInit();
+	ReplicationIdentifierShmemInit();
 	WalSndShmemInit();
 	WalRcvShmemInit();
 
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index bd27168..fdccb95 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -54,6 +54,7 @@
 #include "catalog/pg_shdepend.h"
 #include "catalog/pg_shdescription.h"
 #include "catalog/pg_shseclabel.h"
+#include "catalog/pg_replication_identifier.h"
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/pg_ts_config.h"
@@ -620,6 +621,28 @@ static const struct cachedesc cacheinfo[] = {
 		},
 		128
 	},
+	{ReplicationIdentifierRelationId,		/* REPLIDIDENT */
+		ReplicationLocalIdentIndex,
+		1,
+		{
+			Anum_pg_replication_riident,
+			0,
+			0,
+			0
+		},
+		16
+	},
+	{ReplicationIdentifierRelationId,		/* REPLIDREMOTE */
+		ReplicationExternalIdentIndex,
+		1,
+		{
+			Anum_pg_replication_riname,
+			0,
+			0,
+			0
+		},
+		16
+	},
 	{RewriteRelationId,			/* RULERELNAME */
 		RewriteRelRulenameIndexId,
 		2,
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index a0805d8..c9a7e7a 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -56,6 +56,8 @@
 #include "common/restricted_token.h"
 #include "storage/large_object.h"
 #include "pg_getopt.h"
+#include "replication/logical.h"
+#include "replication/replication_identifier.h"
 
 
 static ControlFileData ControlFile;		/* pg_control values */
@@ -1091,6 +1093,7 @@ WriteEmptyXLOG(void)
 	record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint);
 	record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
 	record->xl_rmid = RM_XLOG_ID;
+
 	recptr += SizeOfXLogRecord;
 	*(recptr++) = XLR_BLOCK_ID_DATA_SHORT;
 	*(recptr++) = sizeof(CheckPoint);
diff --git a/src/include/access/commit_ts.h b/src/include/access/commit_ts.h
index 93d1217..578513d 100644
--- a/src/include/access/commit_ts.h
+++ b/src/include/access/commit_ts.h
@@ -13,6 +13,7 @@
 
 #include "access/xlog.h"
 #include "datatype/timestamp.h"
+#include "replication/replication_identifier.h"
 #include "utils/guc.h"
 
 
@@ -21,18 +22,13 @@ extern PGDLLIMPORT bool	track_commit_timestamp;
 extern bool check_track_commit_timestamp(bool *newval, void **extra,
 							 GucSource source);
 
-typedef uint32 CommitTsNodeId;
-#define InvalidCommitTsNodeId 0
-
-extern void CommitTsSetDefaultNodeId(CommitTsNodeId nodeid);
-extern CommitTsNodeId CommitTsGetDefaultNodeId(void);
 extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
 							   TransactionId *subxids, TimestampTz timestamp,
-							   CommitTsNodeId nodeid, bool do_xlog);
+							   RepNodeId nodeid, bool do_xlog);
 extern bool TransactionIdGetCommitTsData(TransactionId xid,
-							 TimestampTz *ts, CommitTsNodeId *nodeid);
+							 TimestampTz *ts, RepNodeId *nodeid);
 extern TransactionId GetLatestCommitTsData(TimestampTz *ts,
-					  CommitTsNodeId *nodeid);
+					  RepNodeId *nodeid);
 
 extern Size CommitTsShmemBuffers(void);
 extern Size CommitTsShmemSize(void);
@@ -58,7 +54,7 @@ extern void AdvanceOldestCommitTs(TransactionId oldestXact);
 typedef struct xl_commit_ts_set
 {
 	TimestampTz		timestamp;
-	CommitTsNodeId	nodeid;
+	RepNodeId		nodeid;
 	TransactionId	mainxid;
 	/* subxact Xids follow */
 } xl_commit_ts_set;
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index fdf3ea3..9e78403 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -131,6 +131,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XACT_XINFO_HAS_RELFILENODES		(1U << 2)
 #define XACT_XINFO_HAS_INVALS			(1U << 3)
 #define XACT_XINFO_HAS_TWOPHASE			(1U << 4)
+#define XACT_XINFO_HAS_ORIGIN			(1U << 5)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -217,6 +218,12 @@ typedef struct xl_xact_twophase
 } xl_xact_twophase;
 #define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
 
+typedef struct xl_xact_origin
+{
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
+} xl_xact_origin;
+
 typedef struct xl_xact_commit
 {
 	TimestampTz xact_time;		/* time of commit */
@@ -227,6 +234,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
 	/* xl_xact_invals follows if XINFO_HAS_INVALS */
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
+	/* xl_xact_origin follows if XINFO_HAS_ORIGIN */
 } xl_xact_commit;
 #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
@@ -267,6 +275,9 @@ typedef struct xl_xact_parsed_commit
 	SharedInvalidationMessage *msgs;
 
 	TransactionId	twophase_xid;	/* only for 2PC */
+
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
 } xl_xact_parsed_commit;
 
 typedef struct xl_xact_parsed_abort
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 2b1f423..f08b676 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -85,6 +85,7 @@ typedef enum
 } RecoveryTargetType;
 
 extern XLogRecPtr XactLastRecEnd;
+extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;
 
 extern bool reachedConsistency;
 
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index deca1de..75cf435 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -31,7 +31,7 @@
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD083	/* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD085	/* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 6638c1d..bd8dd70 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -45,6 +45,12 @@ typedef uint64 XLogSegNo;
 typedef uint32 TimeLineID;
 
 /*
+ * Denotes the node on which the action causing a wal record to be logged
+ * originated on.
+ */
+typedef uint16 RepNodeId;
+
+/*
  *	Because O_DIRECT bypasses the kernel buffers, and because we never
  *	read those buffers except during crash recovery or if wal_level != minimal,
  *	it is a win to use it in all cases where we sync on each write().  We could
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index 6864c95..ac60929 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -39,6 +39,7 @@
 
 /* prototypes for public functions in xloginsert.c: */
 extern void XLogBeginInsert(void);
+extern void XLogIncludeOrigin(void);
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info);
 extern void XLogEnsureRecordSpace(int nbuffers, int ndatas);
 extern void XLogRegisterData(char *data, int len);
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 609bfe3..efebbf0 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -127,6 +127,8 @@ struct XLogReaderState
 	uint32		main_data_len;	/* main data portion's length */
 	uint32		main_data_bufsz;	/* allocated size of the buffer */
 
+	RepNodeId	record_origin;
+
 	/* information about blocks referenced by the record. */
 	DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
 
@@ -186,6 +188,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
 #define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info)
 #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
 #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
+#define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
 #define XLogRecGetData(decoder) ((decoder)->main_data)
 #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
 #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index b487ae0..bf6fd41 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -212,5 +212,8 @@ typedef struct XLogRecordDataHeaderLong
 
 #define XLR_BLOCK_ID_DATA_SHORT		255
 #define XLR_BLOCK_ID_DATA_LONG		254
+#ifndef REPLICATION_IDENTIFIER_REUSE_PADDING
+#define XLR_BLOCK_ID_ORIGIN			253
+#endif
 
 #endif   /* XLOGRECORD_H */
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 8ecd5fd..dce7eaf 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	201504121
+#define CATALOG_VERSION_NO	201504122
 
 #endif
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index a680229..405528d 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -305,6 +305,12 @@ DECLARE_UNIQUE_INDEX(pg_policy_oid_index, 3257, on pg_policy using btree(oid oid
 DECLARE_UNIQUE_INDEX(pg_policy_polrelid_polname_index, 3258, on pg_policy using btree(polrelid oid_ops, polname name_ops));
 #define PolicyPolrelidPolnameIndexId				3258
 
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_riiident_index, 6001, on pg_replication_identifier using btree(riident oid_ops));
+#define ReplicationLocalIdentIndex 6001
+
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_riname_index, 6002, on pg_replication_identifier using btree(riname varchar_pattern_ops));
+#define ReplicationExternalIdentIndex 6002
+
 /* last step of initialization script: build the indexes declared above */
 BUILD_INDICES
 
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 8469c82..575ca36 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5201,6 +5201,36 @@ DESCR("for use by pg_upgrade");
 DATA(insert OID = 3591 ( binary_upgrade_create_empty_extension PGNSP PGUID  12 1 0 0 0 f f f f t f v 7 0 2278 "25 25 16 25 1028 1009 1009" _null_ _null_ _null_ _null_ binary_upgrade_create_empty_extension _null_ _null_ _null_ ));
 DESCR("for use by pg_upgrade");
 
+/* replication_identifier.h */
+DATA(insert OID = 6003 (  pg_replication_identifier_create PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 26 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_create _null_ _null_ _null_ ));
+DESCR("create local replication identifier for the passed external one");
+
+DATA(insert OID = 6004 (  pg_replication_identifier_drop PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_drop _null_ _null_ _null_ ));
+DESCR("drop existing replication identifier");
+
+DATA(insert OID = 6005 (  pg_replication_identifier_get PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 26 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_get _null_ _null_ _null_ ));
+DESCR("translate the external node identifier to a local one");
+
+DATA(insert OID = 6006 (  pg_replication_identifier_setup_replaying_from PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_setup_replaying_from _null_ _null_ _null_ ));
+DESCR("setup from which node we are replaying transactions from currently");
+
+DATA(insert OID = 6007 (  pg_replication_identifier_reset_replaying_from PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2278 "" _null_ _null_ _null_ _null_ pg_replication_identifier_reset_replaying_from _null_ _null_ _null_ ));
+DESCR("teardown configured replication identity");
+
+DATA(insert OID = 6008 (  pg_replication_identifier_setup_tx_origin PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "3220 1184" _null_ _null_ _null_ _null_ pg_replication_identifier_setup_tx_origin _null_ _null_ _null_ ));
+DESCR("setup transaction timestamp and origin lsn");
+
+DATA(insert OID = 6009 (  pg_replication_identifier_is_replaying PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_replication_identifier_is_replaying _null_ _null_ _null_ ));
+DESCR("is a replication identifier setup");
+
+DATA(insert OID = 6010 (  pg_replication_identifier_advance PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "25 3220" _null_ _null_ _null_ _null_ pg_replication_identifier_advance _null_ _null_ _null_ ));
+DESCR("advance replication itentifier to specific location");
+
+DATA(insert OID = 6011 (  pg_replication_identifier_progress PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 3220 "25 16" _null_ _null_ _null_ _null_ pg_replication_identifier_progress _null_ _null_ _null_ ));
+DESCR("get an individualreplication identifier's replication progress");
+
+DATA(insert OID = 6012 (  pg_get_replication_identifier_progress PGNSP PGUID 12 1 100 0 0 f f f f f t v 0 0 2249 "" "{26,25,3220,3220}" "{o,o,o,o}" "{local_id, external_id, remote_lsn, local_lsn}" _null_ pg_get_replication_identifier_progress _null_ _null_ _null_ ));
+DESCR("get progress for all replication identifiers");
 
 /*
  * Symbolic values for provolatile column: these indicate whether the result
diff --git a/src/include/catalog/pg_replication_identifier.h b/src/include/catalog/pg_replication_identifier.h
new file mode 100644
index 0000000..d72c839
--- /dev/null
+++ b/src/include/catalog/pg_replication_identifier.h
@@ -0,0 +1,74 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_replication_identifier.h
+ *	  Persistent Replication Node Identifiers
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_replication_identifier.h
+ *
+ * NOTES
+ *	  the genbki.pl script reads this file and generates .bki
+ *	  information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_REPLICATION_IDENTIFIER_H
+#define PG_REPLICATION_IDENTIFIER_H
+
+#include "catalog/genbki.h"
+#include "access/xlogdefs.h"
+
+/* ----------------
+ *		pg_replication_identifier.  cpp turns this into
+ *		typedef struct FormData_pg_replication_identifier
+ * ----------------
+ */
+#define ReplicationIdentifierRelationId 6000
+
+CATALOG(pg_replication_identifier,6000) BKI_SHARED_RELATION BKI_WITHOUT_OIDS
+{
+	/*
+	 * Locally known identifier that get included into WAL.
+	 *
+	 * This should never leave the system.
+	 *
+	 * Needs to fit into a uint16, so we don't waste too much space in WAL
+	 * records. For this reason we don't use a normal Oid column here, since
+	 * we need to handle allocation of new values manually.
+	 */
+	Oid		riident;
+
+	/*
+	 * Variable-length fields start here, but we allow direct access to
+	 * riname.
+	 */
+
+	/* external, free-format, identifier */
+	text	riname BKI_FORCE_NOT_NULL;
+#ifdef CATALOG_VARLEN		/* further variable-length fields */
+#endif
+} FormData_pg_replication_identifier;
+
+/* ----------------
+ *		Form_pg_extension corresponds to a pointer to a tuple with
+ *		the format of pg_extension relation.
+ * ----------------
+ */
+typedef FormData_pg_replication_identifier *Form_pg_replication_identifier;
+
+/* ----------------
+ *		compiler constants for pg_replication_identifier
+ * ----------------
+ */
+#define Natts_pg_replication_identifier		2
+#define Anum_pg_replication_riident			1
+#define Anum_pg_replication_riname			2
+
+/* ----------------
+ *		pg_replication_identifier has no initial contents
+ * ----------------
+ */
+
+#endif   /* PG_REPLICTION_IDENTIFIER_H */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index cce4394..f78fb8f 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -97,4 +97,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 									  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
+extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepNodeId origin_id);
+
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 0935c1b..26095b1 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -74,6 +74,13 @@ typedef void (*LogicalDecodeCommitCB) (
 												   XLogRecPtr commit_lsn);
 
 /*
+ * Filter changes by origin.
+ */
+typedef bool (*LogicalDecodeFilterByOriginCB) (
+											 struct LogicalDecodingContext *,
+												   RepNodeId origin_id);
+
+/*
  * Called to shutdown an output plugin.
  */
 typedef void (*LogicalDecodeShutdownCB) (
@@ -89,6 +96,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeBeginCB begin_cb;
 	LogicalDecodeChangeCB change_cb;
 	LogicalDecodeCommitCB commit_cb;
+	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index f1e0f57..0c13fca 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -68,6 +68,8 @@ typedef struct ReorderBufferChange
 	/* The type of change. */
 	enum ReorderBufferChangeType action;
 
+	RepNodeId origin_id;
+
 	/*
 	 * Context data for the change, which part of the union is valid depends
 	 * on action/action_internal.
@@ -166,6 +168,10 @@ typedef struct ReorderBufferTXN
 	 */
 	XLogRecPtr	restart_decoding_lsn;
 
+	/* origin of the change that caused this transaction */
+	RepNodeId origin_id;
+	XLogRecPtr origin_lsn;
+
 	/*
 	 * Commit time, only known when we read the actual commit record.
 	 */
@@ -339,7 +345,7 @@ void		ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
 void		ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
 void ReorderBufferCommit(ReorderBuffer *, TransactionId,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-					TimestampTz commit_time);
+					TimestampTz commit_time, RepNodeId origin_id, XLogRecPtr origin_lsn);
 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
 void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
 						 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
diff --git a/src/include/replication/replication_identifier.h b/src/include/replication/replication_identifier.h
new file mode 100644
index 0000000..47cc032
--- /dev/null
+++ b/src/include/replication/replication_identifier.h
@@ -0,0 +1,62 @@
+/*-------------------------------------------------------------------------
+ * replication_identifier.h
+ *     Exports from replication/logical/replication_identifier.c
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ * src/include/replication/replication_identifier.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef REPLICATION_IDENTIFIER_H
+#define REPLICATION_IDENTIFIER_H
+
+#include "access/xlogdefs.h"
+#include "catalog/pg_replication_identifier.h"
+#include "replication/logical.h"
+
+#define InvalidRepNodeId 0
+#define DoNotReplicateRepNodeId UINT16_MAX
+
+extern PGDLLIMPORT RepNodeId replication_origin_id;
+extern PGDLLIMPORT XLogRecPtr replication_origin_lsn;
+extern PGDLLIMPORT TimestampTz replication_origin_timestamp;
+
+/* API for querying & manipulating replication identifiers */
+extern RepNodeId GetReplicationIdentifier(char *name, bool missing_ok);
+extern RepNodeId CreateReplicationIdentifier(char *name);
+extern bool GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok,
+										   char **riname);
+extern void DropReplicationIdentifier(RepNodeId riident);
+
+/* API for querying & manipulating replication progress */
+extern void AdvanceReplicationIdentifier(RepNodeId node,
+										 XLogRecPtr remote_commit,
+										 XLogRecPtr local_commit);
+extern XLogRecPtr ReplicationIdentifierProgress(RepNodeId node, bool flush);
+extern void AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+											   XLogRecPtr local_commit);
+extern void SetupCachedReplicationIdentifier(RepNodeId node);
+extern void TeardownCachedReplicationIdentifier(void);
+extern XLogRecPtr CachedReplicationIdentifierProgress(void);
+
+/* crash recovery support */
+extern void CheckPointReplicationIdentifier(void);
+extern void StartupReplicationIdentifier(void);
+
+/* internals */
+extern Size ReplicationIdentifierShmemSize(void);
+extern void ReplicationIdentifierShmemInit(void);
+
+/* SQL callable functions */
+extern Datum pg_replication_identifier_get(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_create(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_drop(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_setup_replaying_from(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_reset_replaying_from(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_is_replaying(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_setup_tx_origin(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_progress(PG_FUNCTION_ARGS);
+extern Datum pg_get_replication_identifier_progress(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_advance(PG_FUNCTION_ARGS);
+
+#endif
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index e3c2efc..919708b 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -134,8 +134,9 @@ extern PGDLLIMPORT LWLockPadded *MainLWLockArray;
 #define ReplicationSlotControlLock		(&MainLWLockArray[37].lock)
 #define CommitTsControlLock			(&MainLWLockArray[38].lock)
 #define CommitTsLock				(&MainLWLockArray[39].lock)
+#define ReplicationIdentifierLock	(&MainLWLockArray[40].lock)
 
-#define NUM_INDIVIDUAL_LWLOCKS		40
+#define NUM_INDIVIDUAL_LWLOCKS		41
 
 /*
  * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index ba0b090..d7be45a 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -77,6 +77,8 @@ enum SysCacheIdentifier
 	RANGETYPE,
 	RELNAMENSP,
 	RELOID,
+	REPLIDIDENT,
+	REPLIDREMOTE,
 	RULERELNAME,
 	STATRELATTINH,
 	TABLESPACEOID,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 71fa44a..5030f9a 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1390,6 +1390,11 @@ pg_prepared_xacts| SELECT p.transaction,
    FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
      LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
      LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_replication_identifier_progress| SELECT pg_get_replication_identifier_progress.local_id,
+    pg_get_replication_identifier_progress.external_id,
+    pg_get_replication_identifier_progress.remote_lsn,
+    pg_get_replication_identifier_progress.local_lsn
+   FROM pg_get_replication_identifier_progress() pg_get_replication_identifier_progress(local_id, external_id, remote_lsn, local_lsn);
 pg_replication_slots| SELECT l.slot_name,
     l.plugin,
     l.slot_type,
diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out
index c7be273..400cba3 100644
--- a/src/test/regress/expected/sanity_check.out
+++ b/src/test/regress/expected/sanity_check.out
@@ -121,6 +121,7 @@ pg_pltemplate|t
 pg_policy|t
 pg_proc|t
 pg_range|t
+pg_replication_identifier|t
 pg_rewrite|t
 pg_seclabel|t
 pg_shdepend|t
-- 
2.4.0.rc2.1.g3d6bc9a

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to