Re: fix crash with Python 3.11

2022-06-24 Thread Markus Wanner

On 6/24/22 00:54, Tom Lane wrote:

Does such code exist?  I don't see any other calls in Debian code search,
and I find it hard to believe that anyone would think such a thing is
maintainable.


Such a thing does exist within PGLogical and BDR, yes.

Thanks for your concern about maintainability. So far, that part was not 
posing any trouble. Looking at e.g. postgres.c, the sigsetjmp handler 
there didn't change all that much in recent years. Much of the code 
there is from around 2004 written by you.


However, that shouldn't be your concern at all. Postgres refusing to 
start after a minor upgrade probably should, especially when it's due to 
an API change in a stable branch.


Regards

Markus




Re: fix crash with Python 3.11

2022-06-23 Thread Markus Wanner

On 6/23/22 15:34, Tom Lane wrote:

Under what circumstances would it be OK for outside code to call
SPICleanup?


For the same reasons previous Postgres versions called SPICleanup: from 
a sigsetjmp handler that duplicates most of what Postgres does in such a 
situation.


However, I think that's the wrong question to ask for a stable branch. 
Postgres did export this function in previous versions. Removing it 
altogether constitutes an API change and makes extensions that link to 
it fail to even load, which is a bad way to fail after a patch version 
upgrade. Even if its original use was not sound in the first place.


Ofc my proposed patch is not meant for master, only for stable branches.

Best Regards

Markus




Re: fix crash with Python 3.11

2022-06-23 Thread Markus Wanner


On 6/21/22 18:33, Tom Lane wrote:

My inclination at this point is to not back-patch the second change
12d768e70 ("Don't use static storage for SaveTransactionCharacteristics").
It's not clear that the benefit would be worth even a small risk of
somebody being unhappy about the API break.


Actually, the backport of 2e517818f ("Fix SPI's handling of errors") 
already broke the API for code using SPICleanup, as that function had 
been removed. Granted, it's not documented, but still exported.


I propose to re-introduce a no-op placeholder similar to what we have 
for SPI_start_transaction, somewhat like the attached patch.


Regards

Markusdiff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index dd5ef762707..f73c1e79e18 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -422,6 +422,16 @@ SPI_rollback_and_chain(void)
 	_SPI_rollback(true);
 }
 
+/*
+ * SPICleanup is a no-op, kept for backwards compatibility. We rely on
+ * AtEOXact_SPI to cleanup. Extensions should not (need to) fiddle with the
+ * internal SPI state directly.
+ */
+void
+SPICleanup(void)
+{
+}
+
 /*
  * Clean up SPI state at transaction commit or abort.
  */
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 153eb5c7ad5..1e66a7d2ea0 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -205,6 +205,7 @@ extern void SPI_commit_and_chain(void);
 extern void SPI_rollback(void);
 extern void SPI_rollback_and_chain(void);
 
+extern void SPICleanup(void);
 extern void AtEOXact_SPI(bool isCommit);
 extern void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid);
 extern bool SPI_inside_nonatomic_context(void);


Re: API stability [was: pgsql: Fix possible recovery trouble if TRUNCATE overlaps a checkpoint.]

2022-04-11 Thread Markus Wanner
On Mon, 2022-04-11 at 15:21 -0400, Robert Haas wrote:
> ... before v13, the commit in question actually
> changed the size of PGXACT, which is really quite bad -- it needs to
> be 12 bytes for performance reasons. And there's no spare bytes
> available, so I think we should follow one of the suggestions that he
> had over in that email thread, and put delayChkptEnd in PGPROC even
> though delayChkpt is in PGXACT.

This makes sense to me.  Kudos to Kyotaro for considering this.

At first read, this sounded like a trade-off between compatibility and
performance for PG 12 and older.  But I realize leaving delayChkpt in
PGXACT and adding just delayChkptEnd to PGPROC is compatible and leaves
PGXACT at a size of 12 bytes.  So this sounds like a good approach to
me.

Best Regards

Markus





Re: API stability [was: pgsql: Fix possible recovery trouble if TRUNCATE overlaps a checkpoint.]

2022-04-08 Thread Markus Wanner
On Fri, 2022-04-08 at 08:47 +0900, Michael Paquier wrote:
> On Thu, Apr 07, 2022 at 11:19:15AM -0400, Robert Haas wrote:
> > Here are patches for master and v14 to do things this way.
> > Comments?
> 
> Thanks for the patches.  They look correct.

+1, looks good to me and addresses my specific original concern.

> For ~14, I'd rather avoid
> the code duplication done by GetVirtualXIDsDelayingChkptEnd() and
> HaveVirtualXIDsDelayingChkpt() that could be avoided with an extra
> bool argument to the existing routine.  The same kind of duplication
> happens with GetVirtualXIDsDelayingChkpt() and
> GetVirtualXIDsDelayingChkptEnd().

I agree with Michael, it would be nice to not duplicate the code, but
use a common underlying method.  A modified patch is attached.

Best Regards

Markus

From: Robert Haas 
Date: Thu, 7 Apr 2022 11:15:07 -0400
Subject: [PATCH] Rethink the delay-checkpoint-end mechanism in the
 back-branches.

The back-patch of commit bbace5697df12398e87ffd9879171c39d27f5b33 had
the unfortunate effect of changing the layout of PGPROC in the
back-branches, which could break extensions. This happened because it
changed the delayChkpt from type bool to type int. So, change it back,
and add a new bool delayChkptEnd field instead. The new field should
fall within what used to be padding space within the struct, and so
hopefully won't cause any extensions to break.

Per report from Markus Wanner and discussion with Tom Lane and others.
---
 backend/access/transam/multixact.c  |   6 +-
 backend/access/transam/twophase.c   |  13 ++--
 backend/access/transam/xact.c   |   6 +-
 backend/access/transam/xlog.c   |  10 +--
 backend/access/transam/xloginsert.c |   2 
 backend/catalog/storage.c   |   6 +-
 backend/storage/buffer/bufmgr.c |   6 +-
 backend/storage/ipc/procarray.c | 100 +++-
 include/storage/proc.h  |  35 +---
 include/storage/procarray.h |   7 +-
 10 files changed, 107 insertions(+), 84 deletions(-)

diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c
index 50d8bab9e21..b643564f16a 100644
--- a/src/backend/access/transam/multixact.c
+++ b/src/backend/access/transam/multixact.c
@@ -3075,8 +3075,8 @@ TruncateMultiXact(MultiXactId newOldestMulti, Oid newOldestMultiDB)
 	 * crash/basebackup, even though the state of the data directory would
 	 * require it.
 	 */
-	Assert((MyProc->delayChkpt & DELAY_CHKPT_START) == 0);
-	MyProc->delayChkpt |= DELAY_CHKPT_START;
+	Assert(!MyProc->delayChkpt);
+	MyProc->delayChkpt = true;
 
 	/* WAL log truncation */
 	WriteMTruncateXlogRec(newOldestMultiDB,
@@ -3102,7 +3102,7 @@ TruncateMultiXact(MultiXactId newOldestMulti, Oid newOldestMultiDB)
 	/* Then offsets */
 	PerformOffsetsTruncation(oldestMulti, newOldestMulti);
 
-	MyProc->delayChkpt &= ~DELAY_CHKPT_START;
+	MyProc->delayChkpt = false;
 
 	END_CRIT_SECTION();
 	LWLockRelease(MultiXactTruncationLock);
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index dea3f485f7a..633a6f1747f 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -474,8 +474,9 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
 	}
 	proc->xid = xid;
 	Assert(proc->xmin == InvalidTransactionId);
-	proc->delayChkpt = 0;
+	proc->delayChkpt = false;
 	proc->statusFlags = 0;
+	proc->delayChkptEnd = false;
 	proc->pid = 0;
 	proc->databaseId = databaseid;
 	proc->roleId = owner;
@@ -1165,8 +1166,7 @@ EndPrepare(GlobalTransaction gxact)
 
 	START_CRIT_SECTION();
 
-	Assert((MyProc->delayChkpt & DELAY_CHKPT_START) == 0);
-	MyProc->delayChkpt |= DELAY_CHKPT_START;
+	MyProc->delayChkpt = true;
 
 	XLogBeginInsert();
 	for (record = records.head; record != NULL; record = record->next)
@@ -1209,7 +1209,7 @@ EndPrepare(GlobalTransaction gxact)
 	 * checkpoint starting after this will certainly see the gxact as a
 	 * candidate for fsyncing.
 	 */
-	MyProc->delayChkpt &= ~DELAY_CHKPT_START;
+	MyProc->delayChkpt = false;
 
 	/*
 	 * Remember that we have this GlobalTransaction entry locked for us.  If
@@ -2276,8 +2276,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	START_CRIT_SECTION();
 
 	/* See notes in RecordTransactionCommit */
-	Assert((MyProc->delayChkpt & DELAY_CHKPT_START) == 0);
-	MyProc->delayChkpt |= DELAY_CHKPT_START;
+	MyProc->delayChkpt = true;
 
 	/*
 	 * Emit the XLOG commit record. Note that we mark 2PC commits as
@@ -2325,7 +2324,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	TransactionIdCommitTree(xid, nchildren, children);
 
 	/* Checkpoint can proceed now */
-	MyProc->delayChkpt &= ~DELAY_CHKPT_START;
+	MyProc->delayChkpt = false;
 
 	END_CRIT_SECTION();
 
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/

API stability [was: pgsql: Fix possible recovery trouble if TRUNCATE overlaps a checkpoint.]

2022-04-05 Thread Markus Wanner



On 24.03.22 20:32, Robert Haas wrote:

Fix possible recovery trouble if TRUNCATE overlaps a checkpoint.


This patch changed the delayChkpt field of struct PGPROC from bool to 
int.  Back-porting this change could be considered an API breaking 
change for extensions using this field.


I'm not certain about padding behavior of compilers in general (or 
standards requirements around that), but at least on my machine, it 
seems sizeof(PGPROC) did not change, so padding led to subsequent fields 
still having the same offset.


Nonetheless, the meaning of the field itself changed.  And the 
additional assert now also triggers for the following pseudo-code of the 
extension I'm concerned about:


/*
 * Prevent checkpoints being emitted in between additional
 * information in the logical message and the following
 * prepare record.
 */
MyProc->delayChkpt = true;

LogLogicalMessage(...);

/* Note that this will also reset the delayChkpt flag. */
PrepareTransaction(...);


Now, I'm well aware this is not an official API, it just happens to be 
accessible for extensions.  So I guess the underlying question is:  What 
can extension developers expect?  Which parts are okay to change even in 
stable branches and which can be relied upon to remain stable?


And for this specific case: Is it worth reverting this change and 
applying a fully backwards compatible fix, instead?


Regards

Markus Wanner




Re: pgsql: Remove unused wait events.

2021-10-26 Thread Markus Wanner

On 26.10.21 04:20, Amit Kapila wrote:

I agree with the points raised here and will revert this for v14.


Thanks, Amit.  I appreciate the revert.

Note that the removed events were almost at the end of WaitEventIO enum, 
except for one last entry: WAIT_EVENT_WAL_WRITE.


Just as a data point: Our BDR extension indeed references the wait 
events in question (or at least it used to do so up until that commit).


--
Markus Wanner
EDB: http://www.enterprisedb.com




Re: [PATCH] add concurrent_abort callback for output plugin

2021-03-31 Thread Markus Wanner

On 31.03.21 15:18, Amit Kapila wrote:

On Wed, Mar 31, 2021 at 11:55 AM Markus Wanner

The last sentences there now seems to relate to just the setting of
"concurrent_abort", rather than the whole reason to invoke the
prepare_cb.  And the reference to the "gid" is a bit lost.  Maybe:

 "Thus even in case of a concurrent abort, enough information is
  provided to the output plugin for it to properly deal with the
  ROLLBACK PREPARED once that is decoded."


Okay, Changed the patch accordingly.


That's fine with me.  I didn't necessarily mean to eliminate the hint to 
the concurrent_abort field, but it's more concise that way.  Thank you.


Regards

Markus




Re: [PATCH] add concurrent_abort callback for output plugin

2021-03-30 Thread Markus Wanner

On 31.03.21 06:39, Amit Kapila wrote:

I have slightly adjusted the comments, docs, and commit message. What
do you think about the attached?


Thank you both, Amit and Ajin.  This looks good to me.

Only one minor gripe:


+ a prepared transaction with incomplete changes, in which case the
+ concurrent_abort field of the passed
+ ReorderBufferTXN struct is set. This is done so that
+ eventually when the ROLLBACK PREPARED is decoded, there
+ is a corresponding prepared transaction with a matching gid.


The last sentences there now seems to relate to just the setting of 
"concurrent_abort", rather than the whole reason to invoke the 
prepare_cb.  And the reference to the "gid" is a bit lost.  Maybe:


   "Thus even in case of a concurrent abort, enough information is
provided to the output plugin for it to properly deal with the
ROLLBACK PREPARED once that is decoded."

Alternatively, state that the gid is otherwise missing earlier in the 
docs (similar to how the commit message describes it).


Regards

Markus




Re: [PATCH] add concurrent_abort callback for output plugin

2021-03-30 Thread Markus Wanner

On 30.03.21 11:54, Markus Wanner wrote:
I would recommend this more explicit API and communication over hiding 
the concurrent abort in a prepare callback.


I figured we already have the ReorderBufferTXN's concurrent_abort flag, 
thus I agree the prepare_cb is sufficient and revoke this recommendation 
(and the concurrent_abort callback patch).


Regards

Markus




Re: [PATCH] add concurrent_abort callback for output plugin

2021-03-30 Thread Markus Wanner

On 30.03.21 11:12, Ajin Cherian wrote:
I found some documentation that already was talking about concurrent 
aborts and updated that.


Thanks.

I just noticed as of PG13, concurrent_abort is part of ReorderBufferTXN, 
so it seems the prepare_cb (or stream_prepare_cb) can actually figure a 
concurrent abort happened (and the transaction may be incomplete). 
That's good and indeed makes an additional callback unnecessary.


I recommend giving a hint to that field in the documentation as well.


diff --git a/doc/src/sgml/logicaldecoding.sgml 
b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d..d2f8d39 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -545,12 +545,14 @@ CREATE TABLE another_catalog_table(data text) WITH 
(user_catalog_table = true);
  executed within that transaction. A transaction that is prepared for
  a two-phase commit using PREPARE TRANSACTION will
  also be decoded if the output plugin callbacks needed for decoding
- them are provided. It is possible that the current transaction which
+ them are provided. It is possible that the current prepared transaction 
which
  is being decoded is aborted concurrently via a ROLLBACK 
PREPARED
  command. In that case, the logical decoding of this transaction will
- be aborted too. We will skip all the changes of such a transaction once
- the abort is detected and abort the transaction when we read WAL for
- ROLLBACK PREPARED.
+ be aborted too. All the changes of such a transaction is skipped once


typo: changes [..] *are* skipped, plural.


+ the abort is detected and the prepare_cb callback is 
invoked.
+ This could result in a prepared transaction with incomplete changes.


... "in which case the concurrent_abort field of the 
passed ReorderBufferTXN struct is set.", as a proposal?



+ This is done so that eventually when the ROLLBACK 
PREPARED
+ is decoded, there is a corresponding prepared transaction with a matching 
gid.
 
 
 


Everything else sounds good to me.

Regards

Markus




Re: [PATCH] add concurrent_abort callback for output plugin

2021-03-30 Thread Markus Wanner

On 30.03.21 11:02, Amit Kapila wrote:

On Tue, Mar 30, 2021 at 12:00 PM Markus Wanner

Yes, this replaces the PREPARE I would do from the concurrent_abort
callback in a direct call to rb->prepare.


Because concurrent_abort()
internally trying to prepare transaction seems a bit ugly and not only
that if we want to go via that route, it needs to distinguish between
rollback to savepoint and rollback cases as well.


Just to clarify: of course, the concurrent_abort callback only sends a 
message to the subscriber, which then (in our current implementation) 
upon reception of the concurrent_abort message opts to prepare the 
transaction.  Different implementations would be possible.


I would recommend this more explicit API and communication over hiding 
the concurrent abort in a prepare callback.


Regards

Markus




Re: [PATCH] Provide more information to filter_prepare

2021-03-30 Thread Markus Wanner

On 30.03.21 10:33, Amit Kapila wrote:

Pushed. In the last version, you have named the patch incorrectly.


Thanks a lot, Amit!

Regards

Markus




Re: [PATCH] add concurrent_abort callback for output plugin

2021-03-30 Thread Markus Wanner

On 30.03.21 09:39, Ajin Cherian wrote:
Where do you suggest this be documented? From an externally visible 
point of view, I dont see much of a surprise.


If you start to think about the option of committing a prepared 
transaction from a different node, the danger becomes immediately 
apparent:  A subscriber doesn't even know that the transaction is not 
complete.  How could it possibly know it's futile to COMMIT PREPARE it? 
 I think it's not just surprising, but outright dangerous to pretend 
having prepared the transaction, but potentially miss some of the changes.


(Essentially: do not assume the ROLLBACK PREPARED will make it to the 
subscriber.  There's no such guarantee.  The provider may crash, burn, 
and vanish before that happens.)


So I suggest to document this as a caveat for the prepare callback, 
because with this patch that's the callback which may be invoked for an 
incomplete transaction without the output plugin knowing.


Regards

Markus




Re: [PATCH] add concurrent_abort callback for output plugin

2021-03-29 Thread Markus Wanner

Hello Ajin,

On 30.03.21 06:48, Ajin Cherian wrote:
For now, I've created a patch that addresses the problem reported using 
the existing callbacks.


Thanks.


Do have a look if this fixes the problem reported.


Yes, this replaces the PREPARE I would do from the concurrent_abort 
callback in a direct call to rb->prepare.  However, it misses the most 
important part: documentation.  Because this clearly is a surprising 
behavior for a transaction that's not fully decoded and guaranteed to 
get aborted.


Regards

Markus




Re: [PATCH] Provide more information to filter_prepare

2021-03-29 Thread Markus Wanner

On 29.03.21 14:00, vignesh C wrote:

Have you intentionally not
written any tests as it will be difficult to predict the xid. I just
wanted to confirm my understanding.


Yeah, that's the reason this is hard to test this with a regression 
test.  It might be possible to come up with a TAP test for this, but I 
doubt that's worth it, as it's a pretty trivial addition.


Regards

Markus




Re: [PATCH] Provide more information to filter_prepare

2021-03-29 Thread Markus Wanner

On 29.03.21 13:04, vignesh C wrote:

The above content looks sufficient to me.


Good, thanks.  Based on that, I'm adding v7 of the patch.

Regards

Markus
From: Markus Wanner 
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output plugins

---
 contrib/test_decoding/test_decoding.c |  4 +++-
 doc/src/sgml/logicaldecoding.sgml | 29 ++-
 src/backend/replication/logical/decode.c  | 17 -
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h |  3 ++-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 38 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 			  bool transactional, const char *prefix,
 			  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+	 TransactionId xid,
 	 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 		ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+		 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..da23f89ca32 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,25 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
COMMIT PREPARED time. To signal that
decoding should be skipped, return true;
false otherwise. When the callback is not
-   defined, false is assumed (i.e. nothing is
-   filtered).
+   defined, false is assumed (i.e. no filtering, all
+   transactions using two-phase commit are decoded in two phases as well).
 
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+  TransactionId xid,
   const char *gid);
 
-  The ctx parameter has the same contents as for the
-  other callbacks. The gid is the identifier that later
-  identifies this transaction for COMMIT PREPARED or
-  ROLLBACK PREPARED.
+   The ctx parameter has the same contents as for
+   the other callbacks. The parameters xid
+   and gid provide two different ways to identify
+   the transaction.  The later COMMIT PREPARED or
+   ROLLBACK PREPARED carries both identifiers,
+   providing an output plugin the choice of what to use.
  
  
-  The callback has to provide the same static answer for a given
-  gid every time it is called.
+   The callback may be invoked multiple times per transaction to decode
+   and must provide the same static answer for a given pair of
+   xid and gid every time
+   it is called.
  
  
 
@@ -1219,9 +1224,11 @@ stream_commit_cb(...);  <-- commit of the streamed transaction

 

-Optionally the output plugin can specify a name pattern in the
-filter_prepare_cb and transactions with gid containing
-that name pattern will not be decoded as a two-phase commit transaction.
+Optionally the output plugin can define filtering rules via
+filter_prepare_cb to decode only specific transaction
+in two phases.  This can be achieved by pattern matching on the
+gid or via lookups using the
+xid.

 

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+ TransactionId xid, const char *gid);
 static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
 			  XLogRecordBuffer *buf, Oid dbId,
 			  RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * doesn't filter the transaction at prepare time.
  */
 if (info == XLOG_XACT_COMMIT_PREPARED)
-	two_phas

Re: [PATCH] add concurrent_abort callback for output plugin

2021-03-29 Thread Markus Wanner

On 29.03.21 13:02, Ajin Cherian wrote:

Nice catch, Markus.
Interesting suggestion Amit. Let me try and code this.


Thanks, Ajin.  Please consider this concurrent_abort callback as well. 
I think it provides more flexibility for the output plugin and I would 
therefore prefer it over a solution that hides this.  It clearly makes 
all potential optimizations impossible, as it means the output plugin 
cannot distinguish between a proper PREAPRE and a bail-out PREPARE (that 
does not fully replicate the PREPARE as on the origin node, either, 
which I think is dangerous).


Regards

Markus




Re: [PATCH] Provide more information to filter_prepare

2021-03-29 Thread Markus Wanner

On 29.03.21 12:18, vignesh C wrote:
But in prepare_filter_cb callback, by stating "other systems ..." it is 
not very clear who will change the GID. Are we referring to 
publisher/subscriber decoding?


Thanks for your feedback.  This is not about GIDs at all, but just about 
identifying a transaction.  I'm out of ideas on how else to phrase that. 
 Any suggestion?


Maybe we should not try to give examples and reference other systems, 
but just leave it at:


 The ctx parameter has the same contents as for
 the other callbacks. The parameters xid
 and gid provide two different ways to identify
 the transaction.  The later COMMIT PREPARED or
 ROLLBACK PREPARED carries both identifiers,
 providing an output plugin the choice of what to use.

That is sufficient an explanation in my opinion.   What do you think?

Regards

Markus




Re: [PATCH] Provide more information to filter_prepare

2021-03-29 Thread Markus Wanner

On 29.03.21 11:53, Amit Kapila wrote:

Okay, but just in the previous sentence ("However, reuse of the same
gid for example by a downstream node using
multiple subscriptions may lead to it not being a unique
identifier."), you have explained how sending a GID identifier can
lead to a non-unique identifier for multiple subscriptions.


Maybe the example of the downstream node is a bad one.  I understand 
that can cause confusion.  Let's leave away that example and focus on 
the output plugin side.  v6 attached.



And then
in the next line, the way you are suggesting to generate GID by use of
XID seems to have the same problem, so that caused confusion for me.


It was not intended as a suggestion for how to generate GIDs at all. 
Hopefully leaving away that bad example will make it less likely to 
appear related to GID generation on the subscriber.


Regards

Markus
From: Markus Wanner 
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output plugins

---
 contrib/test_decoding/test_decoding.c |  4 ++-
 doc/src/sgml/logicaldecoding.sgml | 34 +++
 src/backend/replication/logical/decode.c  | 17 
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h |  3 +-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 			  bool transactional, const char *prefix,
 			  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+	 TransactionId xid,
 	 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 		ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+		 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..57f4165d06b 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,30 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
COMMIT PREPARED time. To signal that
decoding should be skipped, return true;
false otherwise. When the callback is not
-   defined, false is assumed (i.e. nothing is
-   filtered).
+   defined, false is assumed (i.e. no filtering, all
+   transactions using two-phase commit are decoded in two phases as well).
 
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+  TransactionId xid,
   const char *gid);
 
-  The ctx parameter has the same contents as for the
-  other callbacks. The gid is the identifier that later
-  identifies this transaction for COMMIT PREPARED or
-  ROLLBACK PREPARED.
+   The ctx parameter has the same contents as for
+   the other callbacks. The parameters xid
+   and gid provide two different ways to identify
+   the transaction.  For some systems, the gid may
+   be sufficient.  However, reuse of the same gid
+   may lead to it not being a unique identifier.  Therefore, other systems
+   combine the xid with an identifier of the origin
+   node to form a globally unique transaction identifier.  The later
+   COMMIT PREPARED or ROLLBACK
+   PREPARED carries both identifiers, providing an output plugin
+   the choice of what to use.
  
  
-  The callback has to provide the same static answer for a given
-  gid every time it is called.
+   The callback may be invoked multiple times per transaction to decode
+   and must provide the same static answer for a given pair of
+   xid and gid every time
+   it is called.
  
  
 
@@ -1219,9 +1229,11 @@ stream_commit_cb(...);  <-- commit of the streamed transaction

 

-Optionally the output plugin can specify a name pattern in the
-filter_prepare_cb and transactions with gid containing
-that name pattern will not be decoded as a two-phase commit transaction.
+Optionally the output plugin can define filtering rules via
+filter_prepare_cb to decode only specific transaction
+in two phases.  This can be achieved by pattern matching on the
+gid or via lookups using the
+x

Re: [PATCH] add concurrent_abort callback for output plugin

2021-03-29 Thread Markus Wanner

On 29.03.21 11:33, Amit Kapila wrote:

You don't need an additional callback for that if we do what I am
suggesting above.


Ah, are you suggesting a different change, then?  To make two-phase 
transactions always send PREPARE even if concurrently aborted?  In that 
case, sorry, I misunderstood.


I'm perfectly fine with that approach as well (even though it removes 
flexibility compared to the concurrent abort callback, as the comment 
above DecodePrepare indicates, i.e. "not impossible to optimize the 
concurrent abort case").



One is you can try to test it, otherwise, there are comments atop
DecodePrepare() ("Note that we don't skip prepare even if have
detected concurrent abort because it is quite possible that ")
which explains this.


Thanks for this pointer, very helpful.

Regards

Markus




Re: [PATCH] Provide more information to filter_prepare

2021-03-29 Thread Markus Wanner

On 29.03.21 11:13, Amit Kapila wrote:

This might or might not be valid for all logical replication solutions
but in the publisher-subscriber model, it would easily lead to
duplicate identifiers and block the replication. For example, when
there are multiple subscriptions (say - 2) for multiple publications
(again say-2), the two subscriptions are on Node-B and two
publications are on Node-A. Say both publications are for different
tables tab-1 and tab-2. Now, a prepared transaction involving
operation on both tables will generate the same GID.


I think you are misunderstanding.  This is about a globally unique 
identifier for a transaction, which has nothing to do with a GID used to 
prepare a transaction.  This *needs* to be the same for what logical is 
the same transaction.


What GID a downsteam subscriber uses when receiving messages from some 
non-Postgres-provided output plugin clearly is out of scope for this 
documentation.  The point is to highlight how the xid can be useful for 
filter_prepare.  And that serves transaction identification purposes.


Regards

Markus




Re: [PATCH] Provide more information to filter_prepare

2021-03-29 Thread Markus Wanner

Sorry, git tricked me.  Here's the patch including actual changes.

Regards

Markus
From: Markus Wanner 
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output plugins

---
 contrib/test_decoding/test_decoding.c |  4 ++-
 doc/src/sgml/logicaldecoding.sgml | 35 ---
 src/backend/replication/logical/decode.c  | 17 +++
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h |  3 +-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 44 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 			  bool transactional, const char *prefix,
 			  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+	 TransactionId xid,
 	 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 		ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+		 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..84717ae93e5 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,31 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
COMMIT PREPARED time. To signal that
decoding should be skipped, return true;
false otherwise. When the callback is not
-   defined, false is assumed (i.e. nothing is
-   filtered).
+   defined, false is assumed (i.e. no filtering, all
+   transactions using two-phase commit are decoded in two phases as well).
 
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+  TransactionId xid,
   const char *gid);
 
-  The ctx parameter has the same contents as for the
-  other callbacks. The gid is the identifier that later
-  identifies this transaction for COMMIT PREPARED or
-  ROLLBACK PREPARED.
+   The ctx parameter has the same contents as for
+   the other callbacks. The parameters xid
+   and gid provide two different ways to identify
+   the transaction.  For some systems, the gid may
+   be sufficient.  However, reuse of the same gid
+   for example by a downstream node using multiple subscriptions may lead
+   to it not being a unique identifier.  Therefore, other systems combine
+   the xid with an identifier of the origin node to
+   form a globally unique transaction identifier.  The later
+   COMMIT PREPARED or ROLLBACK
+   PREPARED carries both identifiers, providing an output plugin
+   the choice of what to use.
  
  
-  The callback has to provide the same static answer for a given
-  gid every time it is called.
+   The callback may be invoked multiple times per transaction to decode
+   and must provide the same static answer for a given pair of
+   xid and gid every time
+   it is called.
  
  
 
@@ -1219,9 +1230,11 @@ stream_commit_cb(...);  <-- commit of the streamed transaction

 

-Optionally the output plugin can specify a name pattern in the
-filter_prepare_cb and transactions with gid containing
-that name pattern will not be decoded as a two-phase commit transaction.
+Optionally the output plugin can define filtering rules via
+filter_prepare_cb to decode only specific transaction
+in two phases.  This can be achieved by pattern matching on the
+gid or via lookups using the
+xid.

 

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+ TransactionId xid, const char *gid);
 static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
 			  XLo

Re: [PATCH] Provide more information to filter_prepare

2021-03-29 Thread Markus Wanner

On 29.03.21 08:23, Amit Kapila wrote:

On Mon, Mar 29, 2021 at 11:42 AM Amit Kapila  wrote:

Why do you think that this callback can be invoked several times per
transaction? I think it could be called at most two times, once at
prepare time, then at commit or rollback time. So, I think using
'multiple' instead of 'several' times is better.


Thank you for reviewing.

That's fine with me, I just wanted to provide an explanation for why the 
callback needs to be stable.  (I would not want to limit us in the docs 
to guarantee it is called only twice.  'multiple' sounds generic enough, 
I changed it to that word.)



What exactly is the node identifier here? Is it a publisher or
subscriber node id? We might want to be a bit more explicit here?


Good point.  I clarified this to speak of the origin node (given this is 
not necessarily the direct provider when using chained replication).


An updated patch is attached.

Regards

Markus
From: Markus Wanner 
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output
 plugins

---
 contrib/test_decoding/test_decoding.c |  4 ++-
 doc/src/sgml/logicaldecoding.sgml | 34 +++
 src/backend/replication/logical/decode.c  | 17 
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h |  3 +-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 			  bool transactional, const char *prefix,
 			  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+	 TransactionId xid,
 	 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 		ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+		 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..f3ac84aa85a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,30 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
COMMIT PREPARED time. To signal that
decoding should be skipped, return true;
false otherwise. When the callback is not
-   defined, false is assumed (i.e. nothing is
-   filtered).
+   defined, false is assumed (i.e. no filtering, all
+   transactions using two-phase commit are decoded in two phases as well).
 
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+  TransactionId xid,
   const char *gid);
 
-  The ctx parameter has the same contents as for the
-  other callbacks. The gid is the identifier that later
-  identifies this transaction for COMMIT PREPARED or
-  ROLLBACK PREPARED.
+   The ctx parameter has the same contents as for
+   the other callbacks. The parameters xid
+   and gid provide two different ways to identify
+   the transaction.  For some systems, the gid may
+   be sufficient.  However, reuse of the same gid
+   for example by a downstream node using multiple subscriptions may lead
+   to it not being a unique identifier.  Therefore, other systems combine
+   the xid with a node identifier to form a
+   globally unique transaction identifier.  The later COMMIT
+   PREPARED or ROLLBACK PREPARED carries both
+   identifiers, providing an output plugin the choice of what to use.
  
  
-  The callback has to provide the same static answer for a given
-  gid every time it is called.
+   The callback may be invoked several times per transaction to decode and
+   must provide the same static answer for a given pair of
+   xid and gid every time
+   it is called.
  
  
 
@@ -1219,9 +1229,11 @@ stream_commit_cb(...);  <-- commit of the streamed transaction

 

-Optionally the output plugin can specify a name pattern in the
-filter_prepare_cb and transactions with gid containing
-that name pattern will not be decoded as a two-phase commit transaction.
+Optionally the output plugin can define filtering rules via
+filter_prepare_cb to dec

Re: [PATCH] add concurrent_abort callback for output plugin

2021-03-29 Thread Markus Wanner

On 27.03.21 07:37, Amit Kapila wrote:

Isn't it better to send prepare from the publisher in such a case so
that subscribers can know about it when rollback prepared arrives?


That's exactly what this callback allows (among other options).  It 
provides a way for the output plugin to react to a transaction aborting 
while it is being decoded.  This would not be possible without this 
additional callback.


Also note that I would like to retain the option to do some basic 
protocol validity checks.  Certain messages only make sense within a 
transaction ('U'pdate, 'C'ommit).  Others are only valid outside of a 
transaction ('B'egin, begin_prepare_cb).  This is only possible if the 
output plugin has a callback for every entry into and exit out of a 
transaction (being decoded).  This used to be the case prior to 2PC 
decoding and this patch re-establishes that.



I think we have already done the same (sent prepare, exactly to handle
the case you have described above) for *streamed* transactions.


Where can I find that?  ISTM streaming transactions have the same issue: 
the output plugin does not (or only implicitly) learn about a concurrent 
abort of the transaction.


Regards

Markus




Re: [PATCH] add concurrent_abort callback for output plugin

2021-03-26 Thread Markus Wanner

On 26.03.21 11:19, Amit Kapila wrote:

No, I am not assuming that. I am just trying to describe you that it
is not necessary that we will be able to detect concurrent abort in
each and every case.


Sure.  Nor am I claiming that would be necessary or that the patch 
changed anything about it.


As it stands, assuming the the output plugin basically just forwards the 
events and the subscriber tries to replicate them as is, the following 
would happen on the subscriber for a concurrently aborted two-phase 
transaction:


 * start a transaction (begin_prepare_cb)
 * apply changes for it (change_cb)
 * digress to other, unrelated transactions (leaving unspecified what
   exactly happens to the opened transaction)
 * attempt to rollback a transaction that has not ever been prepared
   (rollback_prepared_cb)

The point of the patch is for the output plugin to get proper 
transaction entry and exit callbacks.  Even in the unfortunate case of a 
concurrent abort.  It offers the output plugin a clean way to learn that 
the decoder stopped decoding for the current transaction and it won't 
possibly see a prepare_cb for it (despite the decoder having passed the 
PREPARE record in WAL).



The other related thing is it may not be a good idea to finish the
transaction


You're speaking subscriber side here.  And yes, I agree, the subscriber 
should not abort the transaction at a concurrent_abort.  I never claimed 
it should.


If you are curious, in our case I made the subscriber PREPARE the 
transaction at its end when receiving a concurrent_abort notification, 
so that the subscriber:


 * can hop out of that started transaction and safely proceed
   to process events for other transactions, and
 * has the transaction in the appropriate state for processing the
   subsequent rollback_prepared_cb, once that gets through

That's probably not ideal in the sense that subscribers do unnecessary 
work.  However, it pretty closely replicates the transaction's state as 
it was on the origin at any given point in time (by LSN).


Regards

Markus




Re: [PATCH] add concurrent_abort callback for output plugin

2021-03-26 Thread Markus Wanner

On 26.03.21 04:28, Amit Kapila wrote:

I think as you have noted that stream_abort or rollback_prepared will
be sent (the remaining changes in-between will be skipped) as we
decode them from WAL


Yes, but as outlined, too late.  Multiple other transactions may get 
decoded until the decoder reaches the ROLLBACK PREPARED.  Thus, 
effectively, the output plugin currently needs to deduce that a 
transaction got aborted concurrently from one out of half a dozen other 
callbacks that may trigger right after that transaction, because it will 
only get closed properly much later.



so it is not clear to me how it causes any delays
as opposed to where we don't detect concurrent abort say because after
that we didn't access catalog table.


You're assuming very little traffic, where the ROLLBACK ABORT follows 
the PREPARE immediately in WAL.  On a busy system, chances for that to 
happen are rather low.


(I think the same is true for streaming and stream_abort being sent only 
at the time the decoder reaches the ROLLBACK record in WAL.  However, I 
did not try.  Unlike 2PC, where this actually bit me.)


Regards

Markus




Re: [PATCH] add concurrent_abort callback for output plugin

2021-03-25 Thread Markus Wanner

On 25.03.21 21:21, Andres Freund wrote:

... the code added as part of 7259736a6e5b7c75 ...


That's the streaming part, which can be thought of as a more general 
variant of the two-phase decoding in that it allows multiple "flush 
points" (invoking ReorderBufferProcessTXN).  Unlike the PREPARE of a 
two-phase commit, where the reorderbuffer can be sure there's no further 
change to be processed after the PREPARE.  Nor is there any invocation 
of ReorderBufferProcessTXN before that fist one at PREPARE time.  With 
that in mind, I'm surprised support for streaming got committed before 
2PC.  It clearly has different use cases, though.


However, I'm sure your inputs on how to improve and cleanup the 
implementation will be appreciated.  The single tiny problem this patch 
addresses is the same for 2PC and streaming decoding: the output plugin 
currently has no way to learn about a concurrent abort of a transaction 
still being decoded, at the time this happens.


Both, 2PC and streaming do require the reorderbuffer to forward changes 
(possibly) prior to the transaction's commit.  That's the whole point of 
these two features.  Therefore, I don't think we can get around 
concurrent aborts.



You may have only meant it as a shorthand: But imo output plugins have
absolutely no business "invoking actions downstream".


From my point of view, that's the raison d'être for an output plugin. 
Even if it does so merely by forwarding messages.  But yeah, of course a 
whole bunch of other components and changes are needed to implement the 
kind of global two-phase commit system I tried to describe.


I'm open to suggestions on how to reference that use case.


diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index c291b05a423..a6d044b870b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2488,6 +2488,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
errdata = NULL;
curtxn->concurrent_abort = true;
  
+			/*

+* Call the cleanup hook to inform the output plugin 
that the
+* transaction just started had to be aborted.
+*/
+   rb->concurrent_abort(rb, txn, streaming, commit_lsn);
+
/* Reset the TXN so that it is allowed to stream 
remaining data. */
ReorderBufferResetTXN(rb, txn, snapshot_now,
  command_id, 
prev_lsn,


I don't think this would be ok, errors thrown in the callback wouldn't
be handled as they would be in other callbacks.


That's a good point.  Maybe the CATCH block should only set a flag, 
allowing for the callback to be invoked outside of it.


Regards

Markus my-callbacks-do-not-throw-error Wanner




[PATCH] add concurrent_abort callback for output plugin

2021-03-25 Thread Markus Wanner

Hi,

here is another tidbit from our experience with using logical decoding. 
The attached patch adds a callback to notify the output plugin of a 
concurrent abort.  I'll continue to describe the problem in more detail 
and how this additional callback solves it.


Streamed transactions as well as two-phase commit transactions may get 
decoded before they finish.  At the point the begin_cb is invoked and 
first changes are delivered to the output plugin, it is not necessarily 
known whether the transaction will commit or abort.


This leads to the possibility of the transaction getting aborted 
concurrent to logical decoding.  In that case, it is likely for the 
decoder to error on a catalog scan that conflicts with the abort of the 
transaction.  The reorderbuffer sports a PG_CATCH block to cleanup. 
However, it does not currently inform the output plugin.  From its point 
of view, the transaction is left dangling until another one comes along 
or until the final ROLLBACK or ROLLBACK PREPARED record from WAL gets 
decoded.  Therefore, what the output plugin might see in this case is:


* filter_prepare_cb (txn A)
* begin_prepare_cb  (txn A)
* apply_change  (txn A)
* apply_change  (txn A)
* apply_change  (txn A)
* begin_cb  (txn B)

In other words, in this example, only the begin_cb of the following 
transaction implicitly tells the output plugin that txn A could not be 
fully decoded.  And there's no upper time boundary on when that may 
happen.  (It could also be another filter_prepare_cb, if the subsequent 
transaction happens to be a two-phase transaction as well.  Or an 
explicit rollback_prepared_cb or stream_abort if there's no other 
transaction in between.)


An alternative and arguably cleaner approach for streamed transactions 
may be to directly invoke stream_abort.  However, the lsn argument 
passed could not be that of the abort record, as that's not known at the 
point in time of the concurrent abort.  Plus, this seems like a bad fit 
for two-phase commit transactions.


Again, this callback is especially important for output plugins that 
invoke further actions on downstream nodes that delay the COMMIT 
PREPARED of a transaction upstream, e.g. until prepared on other nodes. 
Up until now, the output plugin has no way to learn about a concurrent 
abort of the currently decoded (2PC or streamed) transaction (perhaps 
short of continued polling on the transaction status).


I also think it generally improves the API by allowing the output plugin 
to rely on such a callback, rather than having to implicitly deduce this 
from other callbacks.


Thoughts or comments?  If this is agreed on, I can look into adding 
tests (concurrent aborts are not currently covered, it seems).


Regards

Markus
From: Markus Wanner 
Date: Thu, 11 Feb 2021 13:49:55 +0100
Subject: [PATCH] Add a concurrent_abort callback for the output plugin.

Logical decoding of a prepared or streamed transaction may fail if the
transaction got aborted after invoking the begin_cb (and likely having
sent some changes via change_cb), but before the necessary catalog scans
could be performed.  In this case, decoding the transaction is neither
possible nor necessary (given it got rolled back).

To give the output plugin a chance to cleanup the aborted transaction as
well, introduce a concurrent_abort callback.  It is only ever invoked to
terminate unfinished transactions, not for normal aborts.

Adjust contrib/test_decoding to define a concurrent_abort callback.
---
 contrib/test_decoding/test_decoding.c | 29 
 doc/src/sgml/logicaldecoding.sgml | 43 +-
 src/backend/replication/logical/logical.c | 45 +++
 .../replication/logical/reorderbuffer.c   |  6 +++
 src/include/replication/output_plugin.h   |  9 
 src/include/replication/reorderbuffer.h   |  7 +++
 6 files changed, 138 insertions(+), 1 deletion(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..a5dd80e0957 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -83,6 +83,9 @@ static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
   ReorderBufferTXN *txn,
   XLogRecPtr prepare_lsn);
+static void pg_decode_concurrent_abort_txn(LogicalDecodingContext *ctx,
+		   ReorderBufferTXN *txn, bool streaming,
+		   XLogRecPtr lsn);
 static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
 		  ReorderBufferTXN *txn,
 		  XLogRecPtr commit_lsn);
@@ -137,6 +140,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pg_decode_change;
 	cb->truncate_cb = pg_decode_truncate;
 	cb->commit_cb = pg_decode_commit_txn;
+	cb->concurrent_abort_cb = pg_decode_concurrent_abort_txn;
 	cb->filter_by_origin_cb = 

Re: [PATCH] Provide more information to filter_prepare

2021-03-25 Thread Markus Wanner

On 22.03.21 09:50, Markus Wanner wrote:
thank you for reconsidering this patch.  I updated it to include the 
required adjustments to the documentation.  Please review.


I tweaked the wording in the docs a bit, resulting in a v3 of this patch.

Regards

Markus
From: Markus Wanner 
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output plugins

---
 contrib/test_decoding/test_decoding.c |  4 ++-
 doc/src/sgml/logicaldecoding.sgml | 34 +++
 src/backend/replication/logical/decode.c  | 17 
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h |  3 +-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 			  bool transactional, const char *prefix,
 			  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+	 TransactionId xid,
 	 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 		ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+		 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..f3ac84aa85a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,30 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
COMMIT PREPARED time. To signal that
decoding should be skipped, return true;
false otherwise. When the callback is not
-   defined, false is assumed (i.e. nothing is
-   filtered).
+   defined, false is assumed (i.e. no filtering, all
+   transactions using two-phase commit are decoded in two phases as well).
 
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+  TransactionId xid,
   const char *gid);
 
-  The ctx parameter has the same contents as for the
-  other callbacks. The gid is the identifier that later
-  identifies this transaction for COMMIT PREPARED or
-  ROLLBACK PREPARED.
+   The ctx parameter has the same contents as for
+   the other callbacks. The parameters xid
+   and gid provide two different ways to identify
+   the transaction.  For some systems, the gid may
+   be sufficient.  However, reuse of the same gid
+   for example by a downstream node using multiple subscriptions may lead
+   to it not being a unique identifier.  Therefore, other systems combine
+   the xid with a node identifier to form a
+   globally unique transaction identifier.  The later COMMIT
+   PREPARED or ROLLBACK PREPARED carries both
+   identifiers, providing an output plugin the choice of what to use.
  
  
-  The callback has to provide the same static answer for a given
-  gid every time it is called.
+   The callback may be invoked several times per transaction to decode and
+   must provide the same static answer for a given pair of
+   xid and gid every time
+   it is called.
  
  
 
@@ -1219,9 +1229,11 @@ stream_commit_cb(...);  <-- commit of the streamed transaction

 

-Optionally the output plugin can specify a name pattern in the
-filter_prepare_cb and transactions with gid containing
-that name pattern will not be decoded as a two-phase commit transaction.
+Optionally the output plugin can define filtering rules via
+filter_prepare_cb to decode only specific transaction
+in two phases.  This can be achieved by pattern matching on the
+gid or via lookups using the
+xid.

 

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool 

Re: [PATCH] Provide more information to filter_prepare

2021-03-22 Thread Markus Wanner

Hello Amit,

On 21.03.21 11:53, Amit Kapila wrote:

During a discussion of GID's in the nearby thread [1], it came up that
the replication solutions might want to generate a different GID based
on xid for two-phase transactions, so it seems this patch has a
use-case.


thank you for reconsidering this patch.  I updated it to include the 
required adjustments to the documentation.  Please review.



Markus, feel free to update the docs, you might want to mention about
use-case of XID. Also, feel free to add an open item on PG-14 Open
Items page [2].


Yes, will add.

Regards

Markus
From: Markus Wanner 
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output plugins

---
 contrib/test_decoding/test_decoding.c |  4 ++-
 doc/src/sgml/logicaldecoding.sgml | 33 +++
 src/backend/replication/logical/decode.c  | 17 +++-
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h |  3 ++-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 42 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 			  bool transactional, const char *prefix,
 			  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+	 TransactionId xid,
 	 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 		ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+		 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..16a819a1004 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,29 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
COMMIT PREPARED time. To signal that
decoding should be skipped, return true;
false otherwise. When the callback is not
-   defined, false is assumed (i.e. nothing is
-   filtered).
+   defined, false is assumed (i.e. no filtering, all
+   transactions using two-phase commit are decoded in two phases as well).
 
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+  TransactionId xid,
   const char *gid);
 
-  The ctx parameter has the same contents as for the
-  other callbacks. The gid is the identifier that later
-  identifies this transaction for COMMIT PREPARED or
-  ROLLBACK PREPARED.
+   The ctx parameter has the same contents as for
+   the other callbacks. The parameters xid
+   and gid provide two different ways to identify
+   the transaction.  For some systems, the gid may
+   be sufficient.  Note however that it could be used multiple times,
+   losing its uniqueness.  Therefore, other systems combine
+   the xid with a node identifier to form a
+   globally unique transaction identifier.  The later COMMIT
+   PREPARED or ROLLBACK PREPARED carries both
+   identifiers again, providing an output plugin the choice of what to
+   use.
  
  
-  The callback has to provide the same static answer for a given
-  gid every time it is called.
+   The callback has to provide the same static answer for a given pair
+   of xid and gid every time
+   it is called.
  
  
 
@@ -1219,9 +1228,11 @@ stream_commit_cb(...);  <-- commit of the streamed transaction

 

-Optionally the output plugin can specify a name pattern in the
-filter_prepare_cb and transactions with gid containing
-that name pattern will not be decoded as a two-phase commit transaction.
+Optionally the output plugin can define filtering rules via
+filter_prepare_cb to decode only specific transaction
+in two phases.  This can be achieved by pattern matching on the
+gid or via lookups using the
+xid.

 

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(ch

Re: Logical Replication vs. 2PC

2021-03-21 Thread Markus Wanner

On 20.03.21 16:14, Amit Kapila wrote:

Right, but I guess in our case using user-provided GID will conflict
if we use multiple subscriptions on the same node. So, it is better to
generate a unique identifier like we are discussing here, something
like (origin_id of subscription + xid of the publisher). Do you see
any problem with that?


No, quite the opposite: I'm the one advocating the use of xids to 
identify transactions.  See my patch for filter_prepare.


Regards

Markus




Re: Logical Replication vs. 2PC

2021-03-20 Thread Markus Wanner

On 20.03.21 03:17, Amit Kapila wrote:

Are you saying that users might use the same GID which we have
constructed internally (say by combining origin and xid: originid_xid)
and then there will be conflict while replaying such transactions?


No, I was pondering about a user doing (in short sequence):

..
PREPARE TRANSACTION 'foobar';
COMMIT PREPARED 'foobar';

BEGIN;
...
PREPARE TRANSACTION 'foobar';
COMMIT PREPARED 'foobar';


Right and even for one subscription that can lead to blocking
transactions. But isn't it similar to what we get for a primary key
violation while replaying transactions?


Sure, it's a conflict that prevents application.  A primary key conflict 
may be different in that it does not eventually resolve, though.



In that case, we suggest users
remove conflicting rows, so in such cases, we can recommend users to
commit/rollback such prepared xacts?


Right, if you use gids, you could ask the user to always provide unique 
identifiers and not reuse them on any other node.  That's putting the 
burden of coming up with unique identifiers on the user, but that's a 
perfectly fine and reasonable thing to do.  (Lots of other systems out 
there requiring a unique request id or such, which would get confused if 
you issue requests with duplicate ids.)


Regards

Markus




Re: Logical Replication vs. 2PC

2021-03-19 Thread Markus Wanner

On 18.03.21 10:45, Amit Kapila wrote:

While reviewing/testing subscriber-side work for $SUBJECT [1], I
noticed a problem that seems to need a broader discussion, so started
this thread. We can get prepare for the same GID more than once for
the cases where we have defined multiple subscriptions for
publications on the same server and prepared transaction has
operations on tables subscribed to those subscriptions. For such
cases, one of the prepare will be successful and others will fail in
which case the server will send them again. Once the commit prepared
is done for the first one, the next prepare will be successful. Now,
this is not ideal but will work.


That's assuming you're using the same gid on the subscriber, which does 
not apply to all use cases.  It clearly depends on what you try to 
achieve by decoding in two phases, obviously.


We clearly don't have this issue in BDR, because we're using xids 
(together with a node id) to globally identify transactions and 
construct local (per-node) gids that don't clash.


(Things get even more interesting if you take into account that users 
may reuse the same gid for different transactions.  Lag between 
subscriptions could then lead to blocking between different origin 
transactions...)


Regards

Markus




Re: [PATCH] Provide more information to filter_prepare

2021-03-11 Thread Markus Wanner

On 11.03.21 04:58, Amit Kapila wrote:

But this happens when we are decoding prepare, so it is clear that the
transaction is prepared, why any additional check?


An output plugin cannot assume the transaction is still prepared and 
uncommitted at the point in time it gets to decode the prepare. 
Therefore, the transaction may or may not be still in progress. 
However, my point is that the xid is the more generally useful 
identifier than the gid.



What in this can't be done with GID and how XID can achieve it?


It's a convenience.  Of course, an output plugin could lookup the xid 
via the gid.  But why force it to have to do that when the xid would be 
so readily available?  (Especially given that seems rather expensive. 
Or how would an extension lookup the xid by gid?)


The initial versions by Nikhil clearly did include it (actually a full 
ReorderBufferTXN, which I think would be even better).  I'm not clear on 
your motivations to restrict the API.  What's clear to me is that the 
more information Postgres exposes to plugins and extensions, the easier 
it becomes to extend Postgres.  (Modulo perhaps API stability 
considerations.  A TransactionId clearly is not a concern in that area. 
 Especially given we expose the entire ReorderBufferTXN struct for 
other callbacks.)


Regards

Markus




Re: [PATCH] Provide more information to filter_prepare

2021-03-10 Thread Markus Wanner

On 10.03.21 11:18, Amit Kapila wrote:

On Tue, Mar 9, 2021 at 2:14 PM Markus Wanner
 wrote:

currently, only the gid is passed on to the filter_prepare callback.
While we probably should not pass a full ReorderBufferTXN (as we do for
most other output plugin callbacks), a bit more information would be
nice, I think.


How the proposed 'xid' parameter can be useful? What exactly plugins
want to do with it?


The xid is the very basic identifier for transactions in Postgres.  Any 
output plugin that interacts with Postgres in any way slightly more 
interesting than "filter by gid prefix" is very likely to come across a 
TransactionId.


It allows for basics like checking if the transaction to decode still is 
in progress, for example.  Or in a much more complex scenario, decide on 
whether or not to filter based on properties the extension stored during 
processing the transaction.


Regards

Markus




Re: Make stream_prepare an optional callback

2021-03-09 Thread Markus Wanner

On 09.03.21 10:37, Amit Kapila wrote:

AFAICS, the error is removed by the patch as per below change:


Ah, well, that does not seem right, then.  We cannot just silently 
ignore the callback but not skip the prepare, IMO.  That would lead to 
the output plugin missing the PREPARE, but still seeing a COMMIT 
PREPARED for the transaction, potentially missing changes that went out 
with the prepare, no?



oh, right, in that case, it will skip the stream_prepare even though
that is required. I guess in FilterPrepare, we should check if
rbtxn_is_streamed and stream_prepare_cb is not provided, then we
return true.


Except that FilterPrepare doesn't (yet) have access to a 
ReorderBufferTXN struct (see the other thread I just started).


Maybe we need to do a ReorderBufferTXNByXid lookup already prior to (or 
as part of) FilterPrepare, then also skip (rather than silently ignore) 
the prepare if no stream_prepare_cb callback is given (without even 
calling filter_prepare_cb, because the output plugin has already stated 
it cannot handle that by not providing the corresponding callback).


However, I also wonder what's the use case for an output plugin enabling 
streaming and two-phase commit, but not providing a stream_prepare_cb. 
Maybe the original ERROR is the simpler approach?  I.e. making the 
stream_prepare_cb mandatory, if and only if both are enabled (and 
filter_prepare doesn't skip).  (As in the original comment that says: 
"in streaming mode with two-phase commits, stream_prepare_cb is required").


I guess I don't quite understand the initial motivation for the patch. 
It states: "This allows plugins to not allow the enabling of streaming 
and two_phase at the same time in logical replication."   That's beyond 
me ... "allows [..] to not allow"?  Why not, an output plugin can still 
reasonably request both.  And that's a good thing, IMO.  What problem 
does the patch try to solve?


Regards

Markus




Re: Make stream_prepare an optional callback

2021-03-09 Thread Markus Wanner

On 09.03.21 09:39, Amit Kapila wrote:
> It is attached with the initial email.

Oh, sorry, I looked up the initial email, but still didn't see the patch.


I think so. The behavior has to be similar to other optional callbacks
like message_cb, truncate_cb, stream_truncate_cb. Basically, we don't
need to error out if those callbacks are not provided.


Right, but the patch proposes to error out.  I wonder whether that could 
be avoided.



The extension can request two_phase without streaming.


Sure.  I'm worried about the case both are requested, but filter_prepare 
returns false, i.e. asking for a streamed prepare without providing the 
corresponding callback.


I wonder whether Postgres could deny the stream_prepare right away and 
not even invoke filter_prepare.  And instead just skip it because the 
output plugin did not provide an appropriate callback.


An error is not as nice, but I'm okay with that as well.

Best Regards

Markus




[PATCH] Provide more information to filter_prepare

2021-03-09 Thread Markus Wanner

Hi,

currently, only the gid is passed on to the filter_prepare callback. 
While we probably should not pass a full ReorderBufferTXN (as we do for 
most other output plugin callbacks), a bit more information would be 
nice, I think.


Attached is a patch that adds the xid (still lacking docs changes).  The 
question about stream_prepare being optional made me think about whether 
an output plugin needs to know if changes have been already streamed 
prior to a prepare.  Maybe not?  Any other information you think the 
output plugin might find useful to decide whether or not to skip the 
prepare?


If you are okay with adding just the xid, I'll add docs changes to the 
patch provided.


Regards

Markus
From: Markus Wanner 
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for the
 output plugin.

---
 contrib/test_decoding/test_decoding.c |  4 +++-
 src/backend/replication/logical/decode.c  | 17 +++--
 src/backend/replication/logical/logical.c |  5 +++--
 src/include/replication/logical.h |  3 ++-
 src/include/replication/output_plugin.h   |  1 +
 5 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 			  bool transactional, const char *prefix,
 			  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+	 TransactionId xid,
 	 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 		ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+		 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+ TransactionId xid, const char *gid);
 static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
 			  XLogRecordBuffer *buf, Oid dbId,
 			  RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * doesn't filter the transaction at prepare time.
  */
 if (info == XLOG_XACT_COMMIT_PREPARED)
-	two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+	two_phase = !(FilterPrepare(ctx, xid,
+parsed.twophase_gid));
 
 DecodeCommit(ctx, buf, &parsed, xid, two_phase);
 break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * doesn't filter the transaction at prepare time.
  */
 if (info == XLOG_XACT_ABORT_PREPARED)
-	two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+	two_phase = !(FilterPrepare(ctx, xid,
+parsed.twophase_gid));
 
 DecodeAbort(ctx, buf, &parsed, xid, two_phase);
 break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * manner iff output plugin supports two-phase commits and
  * doesn't filter the transaction at prepare time.
  */
-if (FilterPrepare(ctx, parsed.twophase_gid))
+if (FilterPrepare(ctx, parsed.twophase_xid,
+  parsed.twophase_gid))
 {
 	ReorderBufferProcessXid(reorder, parsed.twophase_xid,
 			buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * this transaction as a regular commit later.
  */
 static inline bool
-FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
+			  const char *gid)
 {
 	/*
 	 * Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
 	if (ctx->callbacks.filter_prepare_cb == NULL)
 		return false;
 
-	return filter_prepare_cb_wrapper(ctx, gid);
+	return filter_prepare_cb_wrapper(ctx, xid, gid);
 }
 
 static inline bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/log

Re: Make stream_prepare an optional callback

2021-03-09 Thread Markus Wanner

On 09.03.21 07:40, Amit Kapila wrote:

Sounds reasonable to me. I also don't see a reason why we need to make
this a necessary callback. Some plugin authors might just want 2PC
without streaming support.


Sounds okay to me.  Probably means we'll have to check for this callback 
and always skip the prepare for streamed transactions, w/o even 
triggering filter_prepare, right?  (Because the extension requesting not 
to filter it, but not providing the corresponding callback does not make 
sense.)


If you're going to together a patch Ajin, I'm happy to review.

Best Regards

Markus




Re: [HACKERS] logical decoding of two-phase transactions

2021-02-22 Thread Markus Wanner

Hello Amit,

On 04.01.21 09:18, Amit Kapila wrote:

Thanks, I have pushed the 0001* patch after making the above and a few
other cosmetic modifications.


That commit added the following snippet to the top of 
ReorderBufferFinishPrepared:


  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, false);

  /* unknown transaction, nothing to do */
  if (txn == NULL)
  return;

Passing true for the create argument seems like an oversight.  I think 
this should pass false and not ever (have to) create a ReorderBufferTXN 
entry.


Regards

Markus




Re: repeated decoding of prepared transactions

2021-02-22 Thread Markus Wanner

On 22.02.21 05:22, Andres Freund wrote:

Hi,

On 2021-02-19 15:53:32 +0100, Markus Wanner wrote:

However, more generally speaking, I suspect you are overthinking this. All
of the complexity arises because of the assumption that an output plugin
receiving and confirming a PREPARE may not be able to persist that first
phase of transaction application.  Instead, you are trying to somehow
resurrect the transactional changes and the prepare at COMMIT PREPARED time
and decode it in a deferred way.


The output plugin should never persist anything.


Sure, sorry, I was sloppy in formulation.  I meant the replica or client 
that receives the data from the output plugin.  Given it asked for 
two-phase commits in the output plugin, it clearly is interested in the 
PREPARE.



That's the job of the
client, not the output plugin. The output plugin simply doesn't have the
information to know whether the client received data and successfully
applied data or not.


Exactly.  Therefore, it should not randomly reshuffle or reorder 
PREPAREs until after other COMMITs.



The output plugin doesn't set / influence start_decoding_at (unless you
want to count just ERRORing out).


Yeah, same sloppiness, sorry.


With that line of thinking, the point in time (or in WAL) of the COMMIT
PREPARED does not matter at all to reason about the decoding of the PREPARE
operation.  Instead, there are only exactly two cases to consider:

a) the PREPARE happened before the start_decoding_at LSN and must not be
decoded. (But the effects of the PREPARE must then be included in the
initial synchronization. If that's not supported, the output plugin should
not enable two-phase commit.)


I don't think that can be made work without disproportionate
complexity. Especially not in cases where we start to be CONSISTENT
based on pre-existing on-disk snapshots.


Well, the PREPARE to happen before the start_decoding_at LSN is a case 
the output plugin needs to deal with.  I pointed out why the current way 
of dealing with it clearly is wrong.


What issues do you see with the approach I proposed?

Regards

Markus




Re: repeated decoding of prepared transactions

2021-02-22 Thread Markus Wanner

On 20.02.21 13:15, Amit Kapila wrote:

I think after the patch Ajin proposed decoders won't need any special
checks after receiving the prepared xacts. What additional simplicity
this approach will bring?


The API becomes clearer in that all PREPAREs are always decoded in WAL 
stream order and are not ever deferred (possibly until after the commits 
of many other transactions).  No output plugin will need to check 
against this peculiarity, but can rely on WAL ordering of events.


(And if an output plugin does not want prepares to be individual events, 
it should simply not enable two-phase support.  That seems like 
something the output plugin could even do on a per-transaction basis.)



Do you mean to say that after creating the slot we take an additional
pass over WAL (till the LSN where we found a consistent snapshot) to
collect all prepared transactions and wait for them to get
committed/rollbacked?


No.  A single pass is enough, the decoder won't need any further change 
beyond the code removal in my patch.


I'm proposing for the synchronization logic (in e.g. pgoutput) to defer 
the snapshot taking.  So that there's some time in between creating the 
logical slot (at step 1.) and taking a snapshot (at step 4.).  Another 
CATCHUP phase, if you want.


So that all two-phase commit transactions are delivered via either:

* the transferred snapshot (because their COMMIT PREPARED took place
  before the snapshot was taken in (4)), or

* the decoder stream (because their PREPARE took place after the slot
  was fully created and snapbuilder reached a consistent snapshot)

No transaction can have PREPAREd before (1) but not committed until 
after (4), because we waited for all prepared transactions to commit in 
step (3).



I think the scheme proposed by you is still not fully clear to me but
can you please explain how in the existing proposed patch there is a
danger of showing transactions as committed without the effects of the
PREPAREs being "visible"?


Please see the `twophase_snapshot` isolation test.  The expected output 
there shows the insert from s1 being committed prior to the prepare of 
the transaction in s2.


On a replica applying the stream in that order, a transaction in between 
these two events would see the results from s1 while still being allowed 
to lock the row that s2 is about to update.  Something I'd expect the 
PREPARE to prevent.


That is (IMO) wrong in `master` and Ajin's patch doesn't correct it. 
(While my patch does, so don't look at my patch for this example.)



* Second, it becomes possible to avoid inconsistencies during the
reconciliation window in between steps 5 and 6 by disallowing
concurrent (user) transactions to run until after completion of
step 6.


This second point sounds like a restriction that users might not like.


"It becomes possible" cannot be a restriction.  If a user (or 
replication solution) wants to allow for these inconsistencies, it still 
can.  I want to make sure that solutions which *want* to prevent 
inconsistencies can be implemented.


Your concern applies to step (3), though.  The current approach is 
clearly quicker to restore the backup and start to apply transactions. 
Until you start to think about reordering the "early" commits until 
after the deferred PREPAREs in the output plugin or on the replica side, 
so as to lock rows by prepared transactions before making other commits 
visible so as to prevent inconsistencies...



But we need something in existing logic in WALSender or somewhere to
allow supporting 2PC for subscriptions and from your above
description, it is not clear to me how we can achieve that?


I agree that some more code is required somewhere, outside of the walsender.

Regards

Markus




Re: [PATCH] Present all committed transaction to the output plugin

2021-02-21 Thread Markus Wanner

On 21.02.21 03:04, Andres Freund wrote:

Cost-wise, yes - a 2pc prepare/commit is expensive enough that
comparatively the replay cost is unlikely to be relevant.


Good.  I attached an updated patch eliminating only the filtering for 
empty two-phase transactions.



Behaviourally I'm still not convinced it's useful.


I don't have any further argument than: If you're promising to replicate 
two phases, I expect the first phase to be replicated individually.


A database state with a transaction prepared and identified by 
'woohoo-roll-me-back-if-you-can' is not the same as a state without it. 
 Even if the transaction is empty, or if you're actually going to roll 
it back. And therefore possibly ending up at the very same state without 
any useful effect.


Regards

Markus
From: Markus Wanner 
Date: Sun, 21 Feb 2021 10:43:34 +0100
Subject: [PATCH] Present empty prepares to the output plugin

While clearly not making any substantial changes to the data, a
PREPARE of an empty transaction still carries a gid that acts as a
sentinel for a prepared transaction that may be committed or rolled
back.

Drop the filtering of empty prepared transactions on the Postgres
side.  An output plugin can still filter empty prepares, if it
wishes to do so.
---
 .../replication/logical/reorderbuffer.c   | 45 ---
 1 file changed, 29 insertions(+), 16 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5a62ab8bbc1..18195255007 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2016,7 +2016,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	ReorderBufferBuildTupleCidHash(rb, txn);
 
 	/* setup the initial snapshot */
-	SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
+	if (snapshot_now)
+		SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
 
 	/*
 	 * Decoding needs access to syscaches et al., which in turn use
@@ -2289,6 +2290,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	break;
 
 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
+	Assert(snapshot_now);
+
 	/* get rid of the old */
 	TeardownHistoricSnapshot(false);
 
@@ -2321,6 +2324,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	break;
 
 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
+	Assert(snapshot_now);
 	Assert(change->data.command_id != InvalidCommandId);
 
 	if (command_id < change->data.command_id)
@@ -2397,8 +2401,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		 * streaming mode.
 		 */
 		if (streaming)
+		{
+			Assert(snapshot_now);
 			ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
-		else if (snapshot_now->copied)
+		}
+		else if (snapshot_now && snapshot_now->copied)
 			ReorderBufferFreeSnap(rb, snapshot_now);
 
 		/* cleanup */
@@ -2520,7 +2527,6 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
 	TimestampTz commit_time,
 	RepOriginId origin_id, XLogRecPtr origin_lsn)
 {
-	Snapshot	snapshot_now;
 	CommandId	command_id = FirstCommandId;
 
 	txn->final_lsn = commit_lsn;
@@ -2547,25 +2553,32 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
 	 * If this transaction has no snapshot, it didn't make any changes to the
 	 * database, so there's nothing to decode.  Note that
 	 * ReorderBufferCommitChild will have transferred any snapshots from
-	 * subtransactions if there were any.
+	 * subtransactions if there were any.  This effectively makes empty
+	 * transactions invisible to the output plugin.
+	 *
+	 * An empty two-phase transaction must not be short-circuited in the
+	 * same way, because it carries a gid which may carry useful
+	 * information, even if it is only a sentinel for an uncommitted empty
+	 * transaction.
 	 */
-	if (txn->base_snapshot == NULL)
+	if (txn->base_snapshot == NULL && !rbtxn_prepared(txn))
 	{
 		Assert(txn->ninvalidations == 0);
-
-		/*
-		 * Removing this txn before a commit might result in the computation
-		 * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
-		 */
-		if (!rbtxn_prepared(txn))
-			ReorderBufferCleanupTXN(rb, txn);
+		ReorderBufferCleanupTXN(rb, txn);
 		return;
 	}
 
-	snapshot_now = txn->base_snapshot;
-
-	/* Process and send the changes to output plugin. */
-	ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
+	/*
+	 * Process and send the changes to output plugin.
+	 *
+	 * Note that for empty transactions, txn->base_snapshot may well be NULL
+	 * in case of an empty two-phase transaction.  The corresponding
+	 * callbacks will still be invoked, as even an empty transaction carries
+	 * information (LSN increments, the gid in case of a two-phase
+	 * transaction).  This is unlike versions prior to 13 which optimized
+	 * away empty transactions

Re: [PATCH] Present all committed transaction to the output plugin

2021-02-20 Thread Markus Wanner

On 20.02.21 21:08, Andres Freund wrote:

It's not free though


Agreed.  It's an additional call to a callback.  Do you think that's 
acceptable if limited to two-phase transactions only?



I'm wondering the opposite: What's a potential use case for handing
"trivially empty" transactions to the output plugin that's worth
incurring some cost for everyone?


Outlined in my previous mail: prepare the transaction on one node, 
commit it on another one.  The PREPARE of a transaction is an event a 
user may well want to have replicated, without having to worry about 
whether or not the transaction happens to be empty.


[ Imagine: ERROR: transaction cannot be replicated because it's empty.
   HINT: add a dummy UPDATE so that Postgres always has
 something to replicate, whatever else your app does
 or does not do in the transaction. ]

Regards

Markus




Re: [PATCH] Present all committed transaction to the output plugin

2021-02-20 Thread Markus Wanner

On 20.02.21 12:15, Amit Kapila wrote:

What exactly is the use case to send empty transactions with or
without prepared?


I'm not saying that output plugins should *send* empty transactions to 
the replica.  I rather agree that this indeed is not wanted in most cases.


However, that's not what the patch changes.  It just moves the decision 
to the output plugin, giving it more flexibility.  And possibly allowing 
it to still take action.  For example, in case of a distributed 
two-phase commit scenario, where the publisher waits after its local 
PREPARE for replicas to also PREPARE.  If such a prepare doesn't even 
get to the output plugin, that won't work.  Not even thinking of a 
PREPARE on one node followed by a COMMIT PREPARED from a different node. 
 It simply is not the business of the decoder to decide what to do with 
empty transactions.


Plus, given the decoder does not manage to reliably filter all empty 
transactions, an output plugin might want to implement its own 
filtering, anyway (point in case: contrib/test_decoding and its 
'skip_empty_xacts' option - that actually kind of implies it would be 
possible to not skip them - as does the documentation).  So I'm rather 
wondering: what's the use case of filtering some, but not all empty 
transactions (on the decoder side)?


Regards

Markus




Re: repeated decoding of prepared transactions

2021-02-20 Thread Markus Wanner

On 20.02.21 04:38, Amit Kapila wrote:

I see a problem with this assumption. During the initial
synchronization, this transaction won't be visible to snapshot and we
won't copy it. Then later if we won't decode and send it then the
replica will be out of sync. Such a problem won't happen with Ajin's
patch.


You are assuming that the initial snapshot is a) logical and b) dumb.

A physical snapshot very well "sees" prepared transactions and will 
restore them to their prepared state.  But even in the logical case, I 
think it's beneficial to keep the decoder simpler and instead require 
some support for two-phase commit in the initial synchronization logic. 
 For example using the following approach (you will recognize 
similarities to what snapbuild does):


1.) create the slot
2.) start to retrieve changes and queue them
3.) wait for the prepared transactions that were pending at the
point in time of step 1 to complete
4.) take a snapshot (by visibility, w/o requiring to "see" prepared
transactions)
5.) apply the snapshot
6.) replay the queue, filtering commits already visible in the
snapshot

Just as with the solution proposed by Ajin and you, this has the danger 
of showing transactions as committed without the effects of the PREPAREs 
being "visible" (after step 5 but before 6).


However, this approach of solving the problem outside of the walsender 
has two advantages:


* The delay in step 3 can be made visible and dealt with.  As there's
  no upper boundary to that delay, it makes sense to e.g. inform the
  user after 10 minutes and provide a list of two-phase transactions
  still in progress.

* Second, it becomes possible to avoid inconsistencies during the
  reconciliation window in between steps 5 and 6 by disallowing
  concurrent (user) transactions to run until after completion of
  step 6.

Whereas the current implementation hides this in the walsender without 
any way to determine how much a PREPARE had been delayed or when 
consistency has been reached.  (Of course, short of using the very same 
initial snapshotting approach outlined above.  For which the reordering 
logic in the walsender does more harm than good.)


Essentially, I think I'm saying that while I agree that some kind of 
snapshot synchronization logic is needed, it should live in a different 
place.


Regards

Markus




Re: repeated decoding of prepared transactions

2021-02-19 Thread Markus Wanner

Ajin, Amit,

thank you both a lot for thinking this through and even providing a patch.

The changes in expectation for twophase.out matches exactly with what I 
prepared.  And the switch with pg_logical_slot_get_changes indeed is 
something I had not yet considered, either.


On 19.02.21 03:50, Ajin Cherian wrote:

For this, I am planning to change the semantics such that
two-phase-commit can only be specified while creating the slot using
pg_create_logical_replication_slot()
and not in pg_logical_slot_get_changes, thus preventing
two-phase-commit flag from being toggled between restarts of the
decoder. Let me know if anybody objects to this
change, else I will update that in the next patch.


This sounds like a good plan to me, yes.


However, more generally speaking, I suspect you are overthinking this. 
All of the complexity arises because of the assumption that an output 
plugin receiving and confirming a PREPARE may not be able to persist 
that first phase of transaction application.  Instead, you are trying to 
somehow resurrect the transactional changes and the prepare at COMMIT 
PREPARED time and decode it in a deferred way.


Instead, I'm arguing that a PREPARE is an atomic operation just like a 
transaction's COMMIT.  The decoder should always feed these in the order 
of appearance in the WAL.  For example, if you have PREAPRE A, COMMIT B, 
COMMIT PREPARED A in the WAL, the decoder should always output these 
events in exactly that order.  And not ever COMMIT B, PREPARE A, COMMIT 
PREPARED A (which is currently violated in the expectation for 
twophase_snapshot, because the COMMIT for `s1insert` there appears after 
the PREPARE of `s2p` in the WAL, but gets decoded before it).


The patch I'm attaching corrects this expectation in twophase_snapshot, 
adds an explanatory diagram, and eliminates any danger of sending 
PREPAREs at COMMIT PREPARED time.  Thereby preserving the ordering of 
PREPAREs vs COMMITs.


Given the output plugin supports two-phase commit, I argue there must be 
a good reason for it setting the start_decoding_at LSN to a point in 
time after a PREPARE.  To me that means the output plugin (or its 
downstream replica) has processed the PREPARE (and the downstream 
replica did whatever it needed to do on its side in order to make the 
transaction ready to be committed in a second phase).


(In the weird case of an output plugin that wants to enable two-phase 
commit but does not really support it downstream, it's still possible 
for it to hold back LSN confirmations for prepared-but-still-in-flight 
transactions.  However, I'm having a hard time justifying this use case.)


With that line of thinking, the point in time (or in WAL) of the COMMIT 
PREPARED does not matter at all to reason about the decoding of the 
PREPARE operation.  Instead, there are only exactly two cases to consider:


a) the PREPARE happened before the start_decoding_at LSN and must not be 
decoded. (But the effects of the PREPARE must then be included in the 
initial synchronization. If that's not supported, the output plugin 
should not enable two-phase commit.)


b) the PREPARE happens after the start_decoding_at LSN and must be 
decoded.  (It obviously is not included in the initial synchronization 
or decoded by a previous instance of the decoder process.)


The case where the PREPARE lies before SNAPBUILD_CONSISTENT must always 
be case a) where we must not repeat the PREPARE, anyway.  And in case b) 
where we need a consistent snapshot to decode the PREPARE, existing 
provisions already guarantee that to be possible (or how would this be 
different from a regular single-phase commit?).


Please let me know what you think and whether this approach is feasible 
for you as well.


Regards

Markus
>From ed03c463175733072edf8afb8d120a1285a3194f Mon Sep 17 00:00:00 2001
From: Markus Wanner 
Date: Tue, 9 Feb 2021 16:16:13 +0100
Subject: [PATCH] Preserve ordering of PREPAREs vs COMMITs in logical decoding

Decouple decoding of the prepare phase of a two-phase transaction from
the final commit (or rollback) of a two-phase transaction, so that
these are more like atomic operations which preserve the ordering in
WAL.  And so that transactional changes of a PREPARE are not ever
provided to the output plugin unnecessarily.

Correct test expectations to expect no duplication.  Add a variant with
a ROLLBACK PREPARED to twophase_snapshot and illustrate the test case
with an explanatory diagram.
---
 contrib/test_decoding/expected/twophase.out   | 38 -
 .../expected/twophase_snapshot.out| 40 -
 .../expected/twophase_stream.out  | 28 +-
 .../specs/twophase_snapshot.spec  | 56 ---
 doc/src/sgml/logicaldecoding.sgml | 17 --
 .../replication/logical/reorderbuffer.c   | 29 --
 6 files changed, 112 insertions(+), 96 deletions(-)

diff --git a/src/backend/replication/logi

[PATCH] Present all committed transaction to the output plugin

2021-02-19 Thread Markus Wanner

Hi,

attached is a patch that I think is cleaning up the API between Postgres 
and the logical decoding plugin.  Up until now, not only transactions 
rolled back, but also some committed transactions were filtered and not 
presented to the output plugin.  While it is documented that aborted 
transactions are not decoded, the second exception has not been documented.


The difference is with committed empty transactions that have a snapshot 
versus those that do not.  I think that's arbitrary and propose to 
remove this distinction, so that all committed transactions are decoded.


In the case of decoding a two-phase transaction, I argue that this is 
even more important, as the gid potentially carries information.


Please consider the attached patch, which drops the mentioned filter. 
It also adjusts tests to show the difference and provides a minor 
clarification to the documentation.


Regards

Markus
From: Markus Wanner 
Date: Fri, 19 Feb 2021 13:34:22 +0100
Subject: Present all committed transactions to the output plugin

Drop the filtering of some empty transactions and more
consistently present all committed transactions to the output
plugin.  Changes behavior for empty transactions that do not even
have a base_snapshot.

While clearly not making any substantial changes to the database,
an empty transactions still increments the local TransactionId
and possibly carries a gid (in the case of two-phase
transactions).

Adjust the test_decoder to show the difference between empty
transactions that do not carry a snapshot and those that
do. Adjust tests to cover decoding of committed empty
transactions with and without snapshots.

Add a minor clarification in the docs.
---
 .../replication/logical/reorderbuffer.c   | 41 ---
 doc/src/sgml/logicaldecoding.sgml | 10 +++--
 contrib/test_decoding/expected/mxact.out  |  2 +-
 .../test_decoding/expected/ondisk_startup.out |  2 +-
 contrib/test_decoding/expected/prepared.out   | 33 +--
 contrib/test_decoding/expected/xact.out   | 39 ++
 contrib/test_decoding/sql/prepared.sql| 23 +--
 contrib/test_decoding/sql/xact.sql| 21 ++
 contrib/test_decoding/test_decoding.c | 12 ++
 9 files changed, 147 insertions(+), 36 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5a62ab8bbc1..1e9ed81c3c1 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2016,7 +2016,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	ReorderBufferBuildTupleCidHash(rb, txn);
 
 	/* setup the initial snapshot */
-	SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
+	if (snapshot_now)
+		SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
 
 	/*
 	 * Decoding needs access to syscaches et al., which in turn use
@@ -2289,6 +2290,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	break;
 
 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
+	Assert(snapshot_now);
+
 	/* get rid of the old */
 	TeardownHistoricSnapshot(false);
 
@@ -2321,6 +2324,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	break;
 
 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
+	Assert(snapshot_now);
 	Assert(change->data.command_id != InvalidCommandId);
 
 	if (command_id < change->data.command_id)
@@ -2397,8 +2401,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		 * streaming mode.
 		 */
 		if (streaming)
+		{
+			Assert(snapshot_now);
 			ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
-		else if (snapshot_now->copied)
+		}
+		else if (snapshot_now && snapshot_now->copied)
 			ReorderBufferFreeSnap(rb, snapshot_now);
 
 		/* cleanup */
@@ -2520,7 +2527,6 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
 	TimestampTz commit_time,
 	RepOriginId origin_id, XLogRecPtr origin_lsn)
 {
-	Snapshot	snapshot_now;
 	CommandId	command_id = FirstCommandId;
 
 	txn->final_lsn = commit_lsn;
@@ -2544,28 +2550,15 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
 	}
 
 	/*
-	 * If this transaction has no snapshot, it didn't make any changes to the
-	 * database, so there's nothing to decode.  Note that
-	 * ReorderBufferCommitChild will have transferred any snapshots from
-	 * subtransactions if there were any.
+	 * Process and send the changes to output plugin.
+	 *
+	 * Note that for empty transactions, txn->base_snapshot may well be
+	 * NULL. The corresponding callbacks will still be invoked, as even an
+	 * empty transaction carries information (LSN increments, the gid in
+	 * case of a two-phase transaction).  This is unlike versions prior to
+	 * 14 which optimized away transactions without a snapshot.
 	 */
-	if (txn->base_snapshot == NULL)
-	{
-		Assert(txn->ninval

Re: repeated decoding of prepared transactions

2021-02-11 Thread Markus Wanner

Hello Amit,

thanks a lot for your extensive explanation and examples, I appreciate 
this very much.  I'll need to think this through and see how we can make 
this work for us.


Best Regards

Markus




Re: repeated decoding of prepared transactions

2021-02-10 Thread Markus Wanner

On 10.02.21 07:32, Amit Kapila wrote:

On Wed, Feb 10, 2021 at 11:45 AM Ajin Cherian  wrote:

But the other side of the problem is that ,without this, if the
prepared transaction is prior to a consistent snapshot when decoding
starts/restarts, then only the "commit prepared" is sent to downstream
(as seen in the test scenario I shared above), and downstream has to
error away the commit prepared because it does not have the
corresponding prepared transaction.


I think it is not only simple error handling, it is required for
data-consistency. We need to send the transactions whose commits are
encountered after a consistent snapshot is reached.


I'm with Ashutosh here.  If a replica is properly in sync, it knows 
about prepared transactions and all the gids of those.  Sending the 
transactional changes and the prepare again is inconsistent.


The point of a two-phase transaction is to have two phases.  An output 
plugin must have the chance of treating them as independent events. 
Once a PREPARE is confirmed, it must not be sent again.  Even if the 
transaction is still in-progress and its changes are not yet visible on 
the origin node.


Regards

Markus




Re: repeated decoding of prepared transactions

2021-02-08 Thread Markus Wanner

Hello Amit,

thanks for your very quick response.

On 08.02.21 11:13, Amit Kapila wrote:

/*
  * It is possible that this transaction is not decoded at prepare time
  * either because by that time we didn't have a consistent snapshot or it
  * was decoded earlier but we have restarted. We can't distinguish between
  * those two cases so we send the prepare in both the cases and let
  * downstream decide whether to process or skip it. We don't need to
  * decode the xact for aborts if it is not done already.
  */


The way I read the surrounding code, the only case a 2PC transaction 
does not get decoded a prepare time is if the transaction is empty.  Or 
are you aware of any other situation that might currently happen?



(unless the server needs to be restarted due to some reason)


Right, the repetition occurs only after a restart of the walsender in 
between a prepare and a commit prepared record.



That anyway is true without this work as well where restart_lsn can be
advanced on commits. We haven't changed anything in that regard.


I did not mean to blame the patch, but merely try to understand some of 
the design decisions behind it.


And as I just learned, even if we managed to avoid the repetition, a 
restarted walsender still needs to see prepared transactions as 
in-progress in its snapshots.  So we cannot move forward the restart_lsn 
to after a prepare record (until the final commit or rollback is consumed).


Best Regards

Markus




repeated decoding of prepared transactions

2021-02-08 Thread Markus Wanner

Amit, Ajin, hackers,

testing logical decoding for two-phase transactions, I stumbled over 
what I first thought is a bug.  But comments seems to indicate this is 
intended behavior.  Could you please clarify or elaborate on the design 
decision?  Or indicate this indeed is a bug?


What puzzled me is that if a decoder is restarted in between the PREPARE 
and the COMMIT PREPARED, it repeats the entire transaction, despite it 
being already sent and potentially prepared on the receiving side.


In terms of `pg_logical_slot_get_changes` (and roughly from the 
prepare.sql test), this looks as follows:


data

 BEGIN
 table public.test_prepared1: INSERT: id[integer]:1
 PREPARE TRANSACTION 'test_prepared#1'
(3 rows)


This is the first delivery of the transaction.  After a restart, it will 
get all of the changes again, though:



data

 BEGIN
 table public.test_prepared1: INSERT: id[integer]:1
 PREPARE TRANSACTION 'test_prepared#1'
 COMMIT PREPARED 'test_prepared#1'
(4 rows)


I did not expect this, as any receiver that wants to have decoded 2PC is 
likely supporting some kind of two-phase commits itself.  And would 
therefore prepare the transaction upon its first reception.  Potentially 
receiving it a second time would require complicated filtering on every 
prepared transaction.


Furthermore, this clearly and unnecessarily holds back the restart LSN. 
Meaning even just a single prepared transaction can block advancing the 
restart LSN.  In most cases, these are short lived.  But on the other 
hand, there may be an arbitrary amount of other transactions in between 
a PREPARE and the corresponding COMMIT PREPARED in the WAL.  Not being 
able to advance over a prepared transaction seems like a bad thing in 
such a case.


I fail to see where this repetition would ever be useful.  Is there any 
reason for the current implementation that I'm missing or can this be 
corrected?  Thanks for elaborating.


Regards

Markus




Re: How can we submit code patches that implement our (pending) patents?

2018-07-09 Thread Markus Wanner
David,

On 07/09/2018 02:52 PM, David Fetter wrote:
> Unfortunately, this does not mean anything until courts have upheld
> it.  Were Red Hat to be taken over by people who didn't see things
> this way, it is a long way from clear that such a statement would be
> upheld in every court, which is what would have to happen.

I agree that there certainly exist questionable patent grants. But I'm
equally sure there are well intended ones as well. For example, I'd
expect patent pools (including the Open Invention Network, cited by the
OP) to hire non-IANAL personnel who know Legalese well enough to setup
valid contracts (between participating companies).

> I am not any kind of attorney, but I would not believe anyone who said
> that they knew for certain how every court on (or off) the planet
> would ever rule on a clause in a...well, is that a contract? A
> license? An aspirational goal without legal force? To my knowledge, no
> court anywhere has had an opportunity to rule on it, so I don't know.

With all my own dislike of the patent system: I think you're setting the
bar unreasonably high. There won't ever exist any such guarantee, even
(or especially if) we avoid patent grants. Or do you expect our existing
code base to be 100% free of any patent violation?

> I really appreciate your forthrightness on this, but I frankly do not
> understand how you can actually make representations that would stand
> the test of time.

In all cases of patent violations by OSS I've heard of, the projects
replaced the portions of code that were (suspected to be) in violation
of the patent and moved on. Or what precedent do you know of that
*didn't* stand the test of time?

I certainly like the (future) patent holder coming forth to offer a
grant a lot better than the one who doesn't (but still holds the
patent). I'm missing the appreciation for that former strategy in this
thread and fear we're setting a precedent for the latter one, instead.

Kind Regards

-- 
Markus Wanner - http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: How can we submit code patches that implement our (pending) patents?

2018-07-09 Thread Markus Wanner
David,

On 07/07/2018 09:52 PM, David Fetter wrote:
> Any deviation from that process requires an explanation, which has not
> thus far been proffered.

Takayuki-san is *offering* a patent grant. That's something the TPL
clearly doesn't cover.

If they would follow the standard procedure, as you seem to propose,
they'd have every right to sue not only users of derivatives, but even
Postgres users for violation of their patent.

You seem to have experience with different licences and dual licensing.
What would you recommend Takayuki-san to do?

Kind Regards

Markus

-- 
Markus Wanner - http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services