commit 8dae80ad11cb2ee0382aeb9533029b2e2090ef5a
Author: movead <movead.li@highgo.ca>
Date:   Tue Jul 7 09:41:34 2020 +0800
Subject: add functions to get origin information from commit_ts

---
 doc/src/sgml/func.sgml                                     | 20 ++++++++++++++++++--
 src/backend/access/transam/commit_ts.c                     | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
 src/include/catalog/pg_proc.dat                            | 12 +++++++++---
 src/test/modules/commit_ts/expected/commit_timestamp.out   | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/test/modules/commit_ts/expected/commit_timestamp_1.out | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/test/modules/commit_ts/sql/commit_timestamp.sql        | 30 ++++++++++++++++++++++++++++++
 6 files changed, 254 insertions(+), 9 deletions(-)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index f065856535..079c697266 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -23397,6 +23397,21 @@ SELECT collation for ('foo' COLLATE "de_DE");
        </para></entry>
       </row>
 
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_xact_commit_timestamp_origin</primary>
+        </indexterm>
+        <function>pg_xact_commit_timestamp_origin</function> ()
+        <returnvalue>record</returnvalue>
+        ( <parameter>timestamp</parameter> <type>timestamp with time zone</type>,
+         <parameter>Oid</parameter> <type>origin</type>)
+       </para>
+       <para>
+         Returns the commit timestamp and origin of a transaction.
+       </para></entry>
+      </row>
+
       <row>
        <entry role="func_table_entry"><para role="func_signature">
         <indexterm>
@@ -23405,10 +23420,11 @@ SELECT collation for ('foo' COLLATE "de_DE");
         <function>pg_last_committed_xact</function> ()
         <returnvalue>record</returnvalue>
         ( <parameter>xid</parameter> <type>xid</type>,
-        <parameter>timestamp</parameter> <type>timestamp with time zone</type> )
+        <parameter>timestamp</parameter> <type>timestamp with time zone</type>,
+        <parameter>Oid</parameter> <type>origin</type> )
        </para>
        <para>
-        Returns the transaction ID and commit timestamp of the latest
+        Returns the transaction ID, commit timestamp and origin of the latest
         committed transaction.
        </para></entry>
       </row>
diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c
index 9cdb136435..47518b04f9 100644
--- a/src/backend/access/transam/commit_ts.c
+++ b/src/backend/access/transam/commit_ts.c
@@ -421,24 +421,27 @@ Datum
 pg_last_committed_xact(PG_FUNCTION_ARGS)
 {
 	TransactionId xid;
+	RepOriginId	nodeid;
 	TimestampTz ts;
-	Datum		values[2];
-	bool		nulls[2];
+	Datum		values[3];
+	bool		nulls[3];
 	TupleDesc	tupdesc;
 	HeapTuple	htup;
 
 	/* and construct a tuple with our data */
-	xid = GetLatestCommitTsData(&ts, NULL);
+	xid = GetLatestCommitTsData(&ts, &nodeid);
 
 	/*
 	 * Construct a tuple descriptor for the result row.  This must match this
 	 * function's pg_proc entry!
 	 */
-	tupdesc = CreateTemplateTupleDesc(2);
+	tupdesc = CreateTemplateTupleDesc(3);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
 					   XIDOID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
 					   TIMESTAMPTZOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "origin",
+					   OIDOID, -1, 0);
 	tupdesc = BlessTupleDesc(tupdesc);
 
 	if (!TransactionIdIsNormal(xid))
@@ -452,6 +455,9 @@ pg_last_committed_xact(PG_FUNCTION_ARGS)
 
 		values[1] = TimestampTzGetDatum(ts);
 		nulls[1] = false;
+
+		values[2] =ObjectIdGetDatum(nodeid);
+		nulls[2] = false;
 	}
 
 	htup = heap_form_tuple(tupdesc, values, nulls);
@@ -459,6 +465,52 @@ pg_last_committed_xact(PG_FUNCTION_ARGS)
 	PG_RETURN_DATUM(HeapTupleGetDatum(htup));
 }
 
+/*
+ * SQL-callable wrapper to obtain commit timestamp and origin of
+ * a transaction
+ */
+Datum
+pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
+{
+	TransactionId 	xid = PG_GETARG_UINT32(0);
+	RepOriginId		nodeid;
+	TimestampTz 	ts;
+	Datum			values[2];
+	bool			nulls[2];
+	TupleDesc		tupdesc;
+	HeapTuple		htup;
+	bool			found;
+
+	found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
+
+	/*
+	 * Construct a tuple descriptor for the result row.  This must match this
+	 * function's pg_proc entry!
+	 */
+	tupdesc = CreateTemplateTupleDesc(2);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp",
+					   TIMESTAMPTZOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "origin",
+					   OIDOID, -1, 0);
+	tupdesc = BlessTupleDesc(tupdesc);
+
+	if(!found)
+	{
+		nulls[0] = true;
+		nulls[1] = true;
+	}
+	else
+	{
+		values[0] = TimestampTzGetDatum(ts);
+		nulls[0] = false;
+		values[1] = ObjectIdGetDatum(nodeid);
+		nulls[1] = false;
+	}
+
+	htup = heap_form_tuple(tupdesc, values, nulls);
+
+	PG_RETURN_DATUM(HeapTupleGetDatum(htup));
+}
 
 /*
  * Number of shared CommitTS buffers.
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 38295aca48..f9f5bebe0e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5946,12 +5946,18 @@
   prorettype => 'timestamptz', proargtypes => 'xid',
   prosrc => 'pg_xact_commit_timestamp' },
 
+{ oid => '8000', descr => 'get commit timestamp and origin of a transaction',
+  proname => 'pg_xact_commit_timestamp_origin', provolatile => 'v',
+  prorettype => 'record', proargtypes => 'xid',
+  proallargtypes => '{xid,timestamptz,oid}', proargmodes => '{i,o,o}',
+  proargnames => '{xid,timestamp,origin}', prosrc => 'pg_xact_commit_timestamp_origin' },
+
 { oid => '3583',
-  descr => 'get transaction Id and commit timestamp of latest transaction commit',
+  descr => 'get transaction Id, commit timestamp and origin of latest transaction commit',
   proname => 'pg_last_committed_xact', provolatile => 'v',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{xid,timestamptz}', proargmodes => '{o,o}',
-  proargnames => '{xid,timestamp}', prosrc => 'pg_last_committed_xact' },
+  proallargtypes => '{xid,timestamptz,oid}', proargmodes => '{o,o,o}',
+  proargnames => '{xid,timestamp,origin}', prosrc => 'pg_last_committed_xact' },
 
 { oid => '3537', descr => 'get identification of SQL object',
   proname => 'pg_describe_object', provolatile => 's', prorettype => 'text',
diff --git a/src/test/modules/commit_ts/expected/commit_timestamp.out b/src/test/modules/commit_ts/expected/commit_timestamp.out
index 5b7783b58f..afbcf5e055 100644
--- a/src/test/modules/commit_ts/expected/commit_timestamp.out
+++ b/src/test/modules/commit_ts/expected/commit_timestamp.out
@@ -45,3 +45,81 @@ SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timest
  t        | t        | t
 (1 row)
 
+-- Test non-normal transaction id.
+SELECT origin FROM  pg_xact_commit_timestamp_origin('0'::xid);
+ERROR:  cannot retrieve commit timestamp for transaction 0
+SELECT origin FROM  pg_xact_commit_timestamp_origin('1'::xid);
+ origin 
+--------
+       
+(1 row)
+
+SELECT origin FROM  pg_xact_commit_timestamp_origin('2'::xid);
+ origin 
+--------
+       
+(1 row)
+
+SELECT pg_replication_origin_create('test_commit_ts: get_origin');
+ pg_replication_origin_create 
+------------------------------
+                            1
+(1 row)
+
+-- Test transaction without origin
+SELECT txid_current() as txid_no_origin \gset
+SELECT x.timestamp > '-infinity'::timestamptz, x.timestamp <= now(), x.origin
+FROM pg_xact_commit_timestamp_origin(:'txid_no_origin') x;
+ ?column? | ?column? | origin 
+----------+----------+--------
+ t        | t        |      0
+(1 row)
+
+SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now(), x.origin
+FROM pg_last_committed_xact() x;
+ ?column? | ?column? | ?column? | origin 
+----------+----------+----------+--------
+ t        | t        | t        |      0
+(1 row)
+
+-- Test transaction with origin
+SELECT pg_replication_origin_session_setup('test_commit_ts: get_origin');
+ pg_replication_origin_session_setup 
+-------------------------------------
+ 
+(1 row)
+
+SELECT txid_current() as txid_set_origin \gset
+SELECT x.timestamp > '-infinity'::timestamptz, x.timestamp <= now(), x.origin
+FROM pg_xact_commit_timestamp_origin(:'txid_set_origin') x;
+ ?column? | ?column? | origin 
+----------+----------+--------
+ t        | t        |      1
+(1 row)
+
+SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now(), x.origin
+FROM pg_last_committed_xact() x;
+ ?column? | ?column? | ?column? | origin 
+----------+----------+----------+--------
+ t        | t        | t        |      1
+(1 row)
+
+-- Test when it can not find the transaction
+SELECT * FROM pg_xact_commit_timestamp_origin((:'txid_set_origin'::text::int + 10)::text::xid) x;
+ timestamp | origin 
+-----------+--------
+           |       
+(1 row)
+
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset 
+-------------------------------------
+ 
+(1 row)
+
+SELECT pg_replication_origin_drop('test_commit_ts: get_origin');
+ pg_replication_origin_drop 
+----------------------------
+ 
+(1 row)
+
diff --git a/src/test/modules/commit_ts/expected/commit_timestamp_1.out b/src/test/modules/commit_ts/expected/commit_timestamp_1.out
index c10b0abc2b..4305895d8b 100644
--- a/src/test/modules/commit_ts/expected/commit_timestamp_1.out
+++ b/src/test/modules/commit_ts/expected/commit_timestamp_1.out
@@ -37,3 +37,66 @@ SELECT pg_xact_commit_timestamp('2'::xid);
 SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now() FROM pg_last_committed_xact() x;
 ERROR:  could not get commit timestamp data
 HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+-- Test non-normal transaction id.
+SELECT origin FROM  pg_xact_commit_timestamp_origin('0'::xid);
+ERROR:  cannot retrieve commit timestamp for transaction 0
+SELECT origin FROM  pg_xact_commit_timestamp_origin('1'::xid);
+ origin 
+--------
+       
+(1 row)
+
+SELECT origin FROM  pg_xact_commit_timestamp_origin('2'::xid);
+ origin 
+--------
+       
+(1 row)
+
+SELECT pg_replication_origin_create('test_commit_ts: get_origin');
+ pg_replication_origin_create 
+------------------------------
+                            1
+(1 row)
+
+-- Test transaction without origin
+SELECT txid_current() as txid_no_origin \gset
+SELECT x.timestamp > '-infinity'::timestamptz, x.timestamp <= now(), x.origin
+FROM pg_xact_commit_timestamp_origin(:'txid_no_origin') x;
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now(), x.origin
+FROM pg_last_committed_xact() x;
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+-- Test transaction with origin
+SELECT pg_replication_origin_session_setup('test_commit_ts: get_origin');
+ pg_replication_origin_session_setup 
+-------------------------------------
+ 
+(1 row)
+
+SELECT txid_current() as txid_set_origin \gset
+SELECT x.timestamp > '-infinity'::timestamptz, x.timestamp <= now(), x.origin
+FROM pg_xact_commit_timestamp_origin(:'txid_set_origin') x;
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now(), x.origin
+FROM pg_last_committed_xact() x;
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+-- Test when it can not find the transaction
+SELECT * FROM pg_xact_commit_timestamp_origin((:'txid_set_origin'::text::int + 10)::text::xid) x;
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset 
+-------------------------------------
+ 
+(1 row)
+
+SELECT pg_replication_origin_drop('test_commit_ts: get_origin');
+ pg_replication_origin_drop 
+----------------------------
+ 
+(1 row)
+
diff --git a/src/test/modules/commit_ts/sql/commit_timestamp.sql b/src/test/modules/commit_ts/sql/commit_timestamp.sql
index 4e041a5347..bec9194a9b 100644
--- a/src/test/modules/commit_ts/sql/commit_timestamp.sql
+++ b/src/test/modules/commit_ts/sql/commit_timestamp.sql
@@ -22,3 +22,33 @@ SELECT pg_xact_commit_timestamp('1'::xid);
 SELECT pg_xact_commit_timestamp('2'::xid);
 
 SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now() FROM pg_last_committed_xact() x;
+
+
+-- Test non-normal transaction id.
+SELECT origin FROM  pg_xact_commit_timestamp_origin('0'::xid);
+SELECT origin FROM  pg_xact_commit_timestamp_origin('1'::xid);
+SELECT origin FROM  pg_xact_commit_timestamp_origin('2'::xid);
+SELECT pg_replication_origin_create('test_commit_ts: get_origin');
+
+
+-- Test transaction without origin
+SELECT txid_current() as txid_no_origin \gset
+
+SELECT x.timestamp > '-infinity'::timestamptz, x.timestamp <= now(), x.origin
+FROM pg_xact_commit_timestamp_origin(:'txid_no_origin') x;
+SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now(), x.origin
+FROM pg_last_committed_xact() x;
+
+-- Test transaction with origin
+SELECT pg_replication_origin_session_setup('test_commit_ts: get_origin');
+SELECT txid_current() as txid_set_origin \gset
+SELECT x.timestamp > '-infinity'::timestamptz, x.timestamp <= now(), x.origin
+FROM pg_xact_commit_timestamp_origin(:'txid_set_origin') x;
+SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now(), x.origin
+FROM pg_last_committed_xact() x;
+
+-- Test when it can not find the transaction
+SELECT * FROM pg_xact_commit_timestamp_origin((:'txid_set_origin'::text::int + 10)::text::xid) x;
+
+SELECT pg_replication_origin_session_reset();
+SELECT pg_replication_origin_drop('test_commit_ts: get_origin');
