On 4/1/22 17:02, Tomas Vondra wrote:
> 
> 
> On 3/28/22 07:29, Amit Kapila wrote:
>> ...
>>
>> While thinking about this, I think I see a problem with the
>> non-transactional handling of sequences. It seems that we will skip
>> sending non-transactional sequence change if it occurs before the
>> decoding has reached a consistent point but the surrounding commit
>> occurs after a consistent point is reached. In such cases, the
>> corresponding DMLs like inserts will be sent but sequence changes
>> won't be sent. For example (this scenario is based on
>> twophase_snapshot.spec),
>>
>> Initial setup:
>> ==============
>> Create table t1_seq(c1 int);
>> Create Sequence seq1;
>>
>> Test Execution via multiple sessions (this test allows insert in
>> session-2 to happen before we reach a consistent point and commit
>> happens after a consistent point):
>> =======================================================================================================
>>
>> Session-2:
>> Begin;
>> SELECT pg_current_xact_id();
>>
>> Session-1:
>> SELECT 'init' FROM pg_create_logical_replication_slot('test_slot',
>> 'test_decoding', false, true);
>>
>> Session-3:
>> Begin;
>> SELECT pg_current_xact_id();
>>
>> Session-2:
>> Commit;
>> Begin;
>> INSERT INTO t1_seq SELECT nextval('seq1') FROM generate_series(1,100);
>>
>> Session-3:
>> Commit;
>>
>> Session-2:
>> Commit 'foo'
>>
>> Session-1:
>> SELECT data  FROM pg_logical_slot_get_changes('test_slot', NULL, NULL,
>> 'include-xids', 'false', 'skip-empty-xacts', '1');
>>
>>                      data
>> ----------------------------------------------
>>  BEGIN
>>  table public.t1_seq: INSERT: c1[integer]:1
>>  table public.t1_seq: INSERT: c1[integer]:2
>>  table public.t1_seq: INSERT: c1[integer]:3
>>  table public.t1_seq: INSERT: c1[integer]:4
>>  table public.t1_seq: INSERT: c1[integer]:5
>>  table public.t1_seq: INSERT: c1[integer]:6
>>
>>
>> Now, if we normally try to decode such an insert, the result would be
>> something like:
>>                                      data
>> ------------------------------------------------------------------------------
>>  sequence public.seq1: transactional:0 last_value: 33 log_cnt: 0 is_called:1
>>  sequence public.seq1: transactional:0 last_value: 66 log_cnt: 0 is_called:1
>>  sequence public.seq1: transactional:0 last_value: 99 log_cnt: 0 is_called:1
>>  sequence public.seq1: transactional:0 last_value: 132 log_cnt: 0 is_called:1
>>  BEGIN
>>  table public.t1_seq: INSERT: c1[integer]:1
>>  table public.t1_seq: INSERT: c1[integer]:2
>>  table public.t1_seq: INSERT: c1[integer]:3
>>  table public.t1_seq: INSERT: c1[integer]:4
>>  table public.t1_seq: INSERT: c1[integer]:5
>>  table public.t1_seq: INSERT: c1[integer]:6
>>
>> This will create an inconsistent replica as sequence changes won't be
>> replicated.
> 
> Hmm, that's interesting. I wonder if it can actually happen, though.
> Have you been able to reproduce that, somehow?
> 
>> I thought about changing snapshot dealing of
>> non-transactional sequence changes similar to transactional ones but
>> that also won't work because it is only at commit we decide whether we
>> can send the changes.
>>
> I wonder if there's some earlier LSN (similar to the consistent point)
> which might be useful for this.
> 
> Or maybe we should queue even the non-transactional changes, not
> per-transaction but in a global list, and then at each commit either
> discard inspect them (at that point we know the lowest LSN for all
> transactions and the consistent point). Seems complex, though.
> 
>> For the transactional case, as we are considering the create sequence
>> operation as transactional, we would unnecessarily queue them even
>> though that is not required. Basically, they don't need to be
>> considered transactional and we can simply ignore such messages like
>> other DDLs. But for that probably we need to distinguish Alter/Create
>> case which may or may not be straightforward. Now, queuing them is
>> probably harmless unless it causes the transaction to spill/stream.
>>
> 
> I'm not sure I follow. Why would we queue them unnecessarily?
> 
> Also, there's the bug with decoding changes in transactions that create
> the sequence and add it to a publication. I think the agreement was that
> this behavior was incorrect, we should not decode changes until the
> subscription is refreshed. Doesn't that mean can't be any CREATE case,
> just ALTER?
> 

So, I investigated this a bit more, and I wrote a couple test_decoding
isolation tests (patch attached) demonstrating the issue. Actually, I
should say "issues" because it's a bit worse than you described ...

The whole problem is in this chunk of code in sequence_decode():


  /* Skip the change if already processed (per the snapshot). */
  if (transactional &&
      !SnapBuildProcessChange(builder, xid, buf->origptr))
      return;
  else if (!transactional &&
           (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
            SnapBuildXactNeedsSkip(builder, buf->origptr)))
      return;

  /* Queue the increment (or send immediately if not transactional). */
  snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
  ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
                             origin_id, target_node, transactional,
                             xlrec->created, tuplebuf);

With the script you described, the increment is non-transactional, so we
end up in the second branch, return and thus discard the increment.

But it's also possible the change is transactional, which can only
trigger the first branch. But it does not, so we start building the
snapshot. But the first thing SnapBuildGetOrBuildSnapshot does is

   Assert(builder->state == SNAPBUILD_CONSISTENT);

and we're still not in a consistent snapshot, so it just crashes and
burn :-(

The sequences.spec file has two definitions of s2restart step, one empty
(resulting in non-transactional change), one with ALTER SEQUENCE (which
means the change will be transactional).


The really "funny" thing is this is not new code - this is an exact copy
from logicalmsg_decode(), and logical messages have all those issues
too. We may discard some messages, trigger the same Assert, etc. There's
a messages2.spec demonstrating this (s2message step defines whether the
message is transactional or not).

So I guess we need to fix both places, perhaps in a similar way. And one
of those will have to be backpatched (which may make it more complex).


The only option I see is reworking the decoding so that it does not need
the snapshot at all. We'll need to stash the changes just like any other
change, apply them at end of transaction, and the main difference
between transactional and non-transactional case would be what happens
at abort. Transactional changes would be discarded, non-transactional
would be applied anyway.

The challenge is this reorders the sequence changes, so we'll need to
reconcile them somehow. One option would be to simply (1) apply the
change with the highest LSN in the transaction, and then walk all other
in-progress transactions and changes for that sequence with a lower LSN.
Not sure how complex/expensive that would be, though. Another problem is
not all increments are WAL-logged, of course, not sure about that.

Another option might be revisiting the approach proposed by Hannu in
September [1], i.e. tracking sequences touched in a transaction, and
then replicating the current state at that particular moment.



regards


[1]
https://www.postgresql.org/message-id/CAMT0RQQeDR51xs8zTa25YpfKB1B34nS-Q4hhsRPznVsjMB_P1w%40mail.gmail.com

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 36929dd97d3..0912a1237ed 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	sequence
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
-	twophase_snapshot slot_creation_error
+	twophase_snapshot slot_creation_error messages2 sequences
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/messages2.out b/contrib/test_decoding/expected/messages2.out
new file mode 100644
index 00000000000..c705693ed55
--- /dev/null
+++ b/contrib/test_decoding/expected/messages2.out
@@ -0,0 +1,78 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s1init s2message s2insert s1start
+step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true);
+?column?
+--------
+init    
+(1 row)
+
+step s2message: SELECT pg_logical_emit_message(false, 'test', 'msg non-transactional');
+pg_logical_emit_message
+-----------------------
+0/1882B00              
+(1 row)
+
+step s2insert: INSERT INTO t1_message VALUES ('msg');
+step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');
+data                                                                        
+----------------------------------------------------------------------------
+message: transactional: 0 prefix: test, sz: 21 content:msg non-transactional
+BEGIN                                                                       
+table public.t1_message: INSERT: msg[text]:'msg'                            
+COMMIT                                                                      
+(4 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
+
+starting permutation: s2begin s2txid s1init s3begin s3txid s2commit s2begin s2message s2insert s3commit s2commit s1start
+step s2begin: BEGIN;
+step s2txid: SELECT pg_current_xact_id();
+pg_current_xact_id
+------------------
+               728
+(1 row)
+
+step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true); <waiting ...>
+step s3begin: BEGIN;
+step s3txid: SELECT pg_current_xact_id();
+pg_current_xact_id
+------------------
+               729
+(1 row)
+
+step s2commit: COMMIT;
+step s2begin: BEGIN;
+step s2message: SELECT pg_logical_emit_message(false, 'test', 'msg non-transactional');
+pg_logical_emit_message
+-----------------------
+0/1887998              
+(1 row)
+
+step s2insert: INSERT INTO t1_message VALUES ('msg');
+step s3commit: COMMIT;
+step s1init: <... completed>
+?column?
+--------
+init    
+(1 row)
+
+step s2commit: COMMIT;
+step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');
+data                                            
+------------------------------------------------
+message: transactional: 0 prefix: test, sz: 21 content:msg non-transactional
+BEGIN                                           
+table public.t1_message: INSERT: msg[text]:'msg'
+COMMIT                                          
+(3 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/expected/sequences.out b/contrib/test_decoding/expected/sequences.out
new file mode 100644
index 00000000000..36f27d9848b
--- /dev/null
+++ b/contrib/test_decoding/expected/sequences.out
@@ -0,0 +1,272 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s1init s2restart s2insert s1start
+step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true);
+?column?
+--------
+init    
+(1 row)
+
+step s2restart: 
+step s2insert: INSERT INTO t1_seq SELECT nextval('seq1') FROM generate_series(1,100);
+step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');
+data                                                                        
+----------------------------------------------------------------------------
+sequence public.seq1: transactional:0 last_value: 33 log_cnt: 0 is_called:1 
+sequence public.seq1: transactional:0 last_value: 66 log_cnt: 0 is_called:1 
+sequence public.seq1: transactional:0 last_value: 99 log_cnt: 0 is_called:1 
+sequence public.seq1: transactional:0 last_value: 132 log_cnt: 0 is_called:1
+BEGIN                                                                       
+table public.t1_seq: INSERT: c1[integer]:1                                  
+table public.t1_seq: INSERT: c1[integer]:2                                  
+table public.t1_seq: INSERT: c1[integer]:3                                  
+table public.t1_seq: INSERT: c1[integer]:4                                  
+table public.t1_seq: INSERT: c1[integer]:5                                  
+table public.t1_seq: INSERT: c1[integer]:6                                  
+table public.t1_seq: INSERT: c1[integer]:7                                  
+table public.t1_seq: INSERT: c1[integer]:8                                  
+table public.t1_seq: INSERT: c1[integer]:9                                  
+table public.t1_seq: INSERT: c1[integer]:10                                 
+table public.t1_seq: INSERT: c1[integer]:11                                 
+table public.t1_seq: INSERT: c1[integer]:12                                 
+table public.t1_seq: INSERT: c1[integer]:13                                 
+table public.t1_seq: INSERT: c1[integer]:14                                 
+table public.t1_seq: INSERT: c1[integer]:15                                 
+table public.t1_seq: INSERT: c1[integer]:16                                 
+table public.t1_seq: INSERT: c1[integer]:17                                 
+table public.t1_seq: INSERT: c1[integer]:18                                 
+table public.t1_seq: INSERT: c1[integer]:19                                 
+table public.t1_seq: INSERT: c1[integer]:20                                 
+table public.t1_seq: INSERT: c1[integer]:21                                 
+table public.t1_seq: INSERT: c1[integer]:22                                 
+table public.t1_seq: INSERT: c1[integer]:23                                 
+table public.t1_seq: INSERT: c1[integer]:24                                 
+table public.t1_seq: INSERT: c1[integer]:25                                 
+table public.t1_seq: INSERT: c1[integer]:26                                 
+table public.t1_seq: INSERT: c1[integer]:27                                 
+table public.t1_seq: INSERT: c1[integer]:28                                 
+table public.t1_seq: INSERT: c1[integer]:29                                 
+table public.t1_seq: INSERT: c1[integer]:30                                 
+table public.t1_seq: INSERT: c1[integer]:31                                 
+table public.t1_seq: INSERT: c1[integer]:32                                 
+table public.t1_seq: INSERT: c1[integer]:33                                 
+table public.t1_seq: INSERT: c1[integer]:34                                 
+table public.t1_seq: INSERT: c1[integer]:35                                 
+table public.t1_seq: INSERT: c1[integer]:36                                 
+table public.t1_seq: INSERT: c1[integer]:37                                 
+table public.t1_seq: INSERT: c1[integer]:38                                 
+table public.t1_seq: INSERT: c1[integer]:39                                 
+table public.t1_seq: INSERT: c1[integer]:40                                 
+table public.t1_seq: INSERT: c1[integer]:41                                 
+table public.t1_seq: INSERT: c1[integer]:42                                 
+table public.t1_seq: INSERT: c1[integer]:43                                 
+table public.t1_seq: INSERT: c1[integer]:44                                 
+table public.t1_seq: INSERT: c1[integer]:45                                 
+table public.t1_seq: INSERT: c1[integer]:46                                 
+table public.t1_seq: INSERT: c1[integer]:47                                 
+table public.t1_seq: INSERT: c1[integer]:48                                 
+table public.t1_seq: INSERT: c1[integer]:49                                 
+table public.t1_seq: INSERT: c1[integer]:50                                 
+table public.t1_seq: INSERT: c1[integer]:51                                 
+table public.t1_seq: INSERT: c1[integer]:52                                 
+table public.t1_seq: INSERT: c1[integer]:53                                 
+table public.t1_seq: INSERT: c1[integer]:54                                 
+table public.t1_seq: INSERT: c1[integer]:55                                 
+table public.t1_seq: INSERT: c1[integer]:56                                 
+table public.t1_seq: INSERT: c1[integer]:57                                 
+table public.t1_seq: INSERT: c1[integer]:58                                 
+table public.t1_seq: INSERT: c1[integer]:59                                 
+table public.t1_seq: INSERT: c1[integer]:60                                 
+table public.t1_seq: INSERT: c1[integer]:61                                 
+table public.t1_seq: INSERT: c1[integer]:62                                 
+table public.t1_seq: INSERT: c1[integer]:63                                 
+table public.t1_seq: INSERT: c1[integer]:64                                 
+table public.t1_seq: INSERT: c1[integer]:65                                 
+table public.t1_seq: INSERT: c1[integer]:66                                 
+table public.t1_seq: INSERT: c1[integer]:67                                 
+table public.t1_seq: INSERT: c1[integer]:68                                 
+table public.t1_seq: INSERT: c1[integer]:69                                 
+table public.t1_seq: INSERT: c1[integer]:70                                 
+table public.t1_seq: INSERT: c1[integer]:71                                 
+table public.t1_seq: INSERT: c1[integer]:72                                 
+table public.t1_seq: INSERT: c1[integer]:73                                 
+table public.t1_seq: INSERT: c1[integer]:74                                 
+table public.t1_seq: INSERT: c1[integer]:75                                 
+table public.t1_seq: INSERT: c1[integer]:76                                 
+table public.t1_seq: INSERT: c1[integer]:77                                 
+table public.t1_seq: INSERT: c1[integer]:78                                 
+table public.t1_seq: INSERT: c1[integer]:79                                 
+table public.t1_seq: INSERT: c1[integer]:80                                 
+table public.t1_seq: INSERT: c1[integer]:81                                 
+table public.t1_seq: INSERT: c1[integer]:82                                 
+table public.t1_seq: INSERT: c1[integer]:83                                 
+table public.t1_seq: INSERT: c1[integer]:84                                 
+table public.t1_seq: INSERT: c1[integer]:85                                 
+table public.t1_seq: INSERT: c1[integer]:86                                 
+table public.t1_seq: INSERT: c1[integer]:87                                 
+table public.t1_seq: INSERT: c1[integer]:88                                 
+table public.t1_seq: INSERT: c1[integer]:89                                 
+table public.t1_seq: INSERT: c1[integer]:90                                 
+table public.t1_seq: INSERT: c1[integer]:91                                 
+table public.t1_seq: INSERT: c1[integer]:92                                 
+table public.t1_seq: INSERT: c1[integer]:93                                 
+table public.t1_seq: INSERT: c1[integer]:94                                 
+table public.t1_seq: INSERT: c1[integer]:95                                 
+table public.t1_seq: INSERT: c1[integer]:96                                 
+table public.t1_seq: INSERT: c1[integer]:97                                 
+table public.t1_seq: INSERT: c1[integer]:98                                 
+table public.t1_seq: INSERT: c1[integer]:99                                 
+table public.t1_seq: INSERT: c1[integer]:100                                
+COMMIT                                                                      
+(106 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
+
+starting permutation: s2begin s2txid s1init s3begin s3txid s2commit s2begin s2restart s2insert s3commit s2commit s1start
+step s2begin: BEGIN;
+step s2txid: SELECT pg_current_xact_id();
+pg_current_xact_id
+------------------
+               728
+(1 row)
+
+step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true); <waiting ...>
+step s3begin: BEGIN;
+step s3txid: SELECT pg_current_xact_id();
+pg_current_xact_id
+------------------
+               729
+(1 row)
+
+step s2commit: COMMIT;
+step s2begin: BEGIN;
+step s2restart: 
+step s2insert: INSERT INTO t1_seq SELECT nextval('seq1') FROM generate_series(1,100);
+step s3commit: COMMIT;
+step s1init: <... completed>
+?column?
+--------
+init    
+(1 row)
+
+step s2commit: COMMIT;
+step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');
+data                                        
+--------------------------------------------
+sequence public.seq1: transactional:0 last_value: 33 log_cnt: 0 is_called:1 
+sequence public.seq1: transactional:0 last_value: 66 log_cnt: 0 is_called:1 
+sequence public.seq1: transactional:0 last_value: 99 log_cnt: 0 is_called:1 
+sequence public.seq1: transactional:0 last_value: 132 log_cnt: 0 is_called:1
+BEGIN                                       
+table public.t1_seq: INSERT: c1[integer]:1  
+table public.t1_seq: INSERT: c1[integer]:2  
+table public.t1_seq: INSERT: c1[integer]:3  
+table public.t1_seq: INSERT: c1[integer]:4  
+table public.t1_seq: INSERT: c1[integer]:5  
+table public.t1_seq: INSERT: c1[integer]:6  
+table public.t1_seq: INSERT: c1[integer]:7  
+table public.t1_seq: INSERT: c1[integer]:8  
+table public.t1_seq: INSERT: c1[integer]:9  
+table public.t1_seq: INSERT: c1[integer]:10 
+table public.t1_seq: INSERT: c1[integer]:11 
+table public.t1_seq: INSERT: c1[integer]:12 
+table public.t1_seq: INSERT: c1[integer]:13 
+table public.t1_seq: INSERT: c1[integer]:14 
+table public.t1_seq: INSERT: c1[integer]:15 
+table public.t1_seq: INSERT: c1[integer]:16 
+table public.t1_seq: INSERT: c1[integer]:17 
+table public.t1_seq: INSERT: c1[integer]:18 
+table public.t1_seq: INSERT: c1[integer]:19 
+table public.t1_seq: INSERT: c1[integer]:20 
+table public.t1_seq: INSERT: c1[integer]:21 
+table public.t1_seq: INSERT: c1[integer]:22 
+table public.t1_seq: INSERT: c1[integer]:23 
+table public.t1_seq: INSERT: c1[integer]:24 
+table public.t1_seq: INSERT: c1[integer]:25 
+table public.t1_seq: INSERT: c1[integer]:26 
+table public.t1_seq: INSERT: c1[integer]:27 
+table public.t1_seq: INSERT: c1[integer]:28 
+table public.t1_seq: INSERT: c1[integer]:29 
+table public.t1_seq: INSERT: c1[integer]:30 
+table public.t1_seq: INSERT: c1[integer]:31 
+table public.t1_seq: INSERT: c1[integer]:32 
+table public.t1_seq: INSERT: c1[integer]:33 
+table public.t1_seq: INSERT: c1[integer]:34 
+table public.t1_seq: INSERT: c1[integer]:35 
+table public.t1_seq: INSERT: c1[integer]:36 
+table public.t1_seq: INSERT: c1[integer]:37 
+table public.t1_seq: INSERT: c1[integer]:38 
+table public.t1_seq: INSERT: c1[integer]:39 
+table public.t1_seq: INSERT: c1[integer]:40 
+table public.t1_seq: INSERT: c1[integer]:41 
+table public.t1_seq: INSERT: c1[integer]:42 
+table public.t1_seq: INSERT: c1[integer]:43 
+table public.t1_seq: INSERT: c1[integer]:44 
+table public.t1_seq: INSERT: c1[integer]:45 
+table public.t1_seq: INSERT: c1[integer]:46 
+table public.t1_seq: INSERT: c1[integer]:47 
+table public.t1_seq: INSERT: c1[integer]:48 
+table public.t1_seq: INSERT: c1[integer]:49 
+table public.t1_seq: INSERT: c1[integer]:50 
+table public.t1_seq: INSERT: c1[integer]:51 
+table public.t1_seq: INSERT: c1[integer]:52 
+table public.t1_seq: INSERT: c1[integer]:53 
+table public.t1_seq: INSERT: c1[integer]:54 
+table public.t1_seq: INSERT: c1[integer]:55 
+table public.t1_seq: INSERT: c1[integer]:56 
+table public.t1_seq: INSERT: c1[integer]:57 
+table public.t1_seq: INSERT: c1[integer]:58 
+table public.t1_seq: INSERT: c1[integer]:59 
+table public.t1_seq: INSERT: c1[integer]:60 
+table public.t1_seq: INSERT: c1[integer]:61 
+table public.t1_seq: INSERT: c1[integer]:62 
+table public.t1_seq: INSERT: c1[integer]:63 
+table public.t1_seq: INSERT: c1[integer]:64 
+table public.t1_seq: INSERT: c1[integer]:65 
+table public.t1_seq: INSERT: c1[integer]:66 
+table public.t1_seq: INSERT: c1[integer]:67 
+table public.t1_seq: INSERT: c1[integer]:68 
+table public.t1_seq: INSERT: c1[integer]:69 
+table public.t1_seq: INSERT: c1[integer]:70 
+table public.t1_seq: INSERT: c1[integer]:71 
+table public.t1_seq: INSERT: c1[integer]:72 
+table public.t1_seq: INSERT: c1[integer]:73 
+table public.t1_seq: INSERT: c1[integer]:74 
+table public.t1_seq: INSERT: c1[integer]:75 
+table public.t1_seq: INSERT: c1[integer]:76 
+table public.t1_seq: INSERT: c1[integer]:77 
+table public.t1_seq: INSERT: c1[integer]:78 
+table public.t1_seq: INSERT: c1[integer]:79 
+table public.t1_seq: INSERT: c1[integer]:80 
+table public.t1_seq: INSERT: c1[integer]:81 
+table public.t1_seq: INSERT: c1[integer]:82 
+table public.t1_seq: INSERT: c1[integer]:83 
+table public.t1_seq: INSERT: c1[integer]:84 
+table public.t1_seq: INSERT: c1[integer]:85 
+table public.t1_seq: INSERT: c1[integer]:86 
+table public.t1_seq: INSERT: c1[integer]:87 
+table public.t1_seq: INSERT: c1[integer]:88 
+table public.t1_seq: INSERT: c1[integer]:89 
+table public.t1_seq: INSERT: c1[integer]:90 
+table public.t1_seq: INSERT: c1[integer]:91 
+table public.t1_seq: INSERT: c1[integer]:92 
+table public.t1_seq: INSERT: c1[integer]:93 
+table public.t1_seq: INSERT: c1[integer]:94 
+table public.t1_seq: INSERT: c1[integer]:95 
+table public.t1_seq: INSERT: c1[integer]:96 
+table public.t1_seq: INSERT: c1[integer]:97 
+table public.t1_seq: INSERT: c1[integer]:98 
+table public.t1_seq: INSERT: c1[integer]:99 
+table public.t1_seq: INSERT: c1[integer]:100
+COMMIT                                      
+(102 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/messages2.spec b/contrib/test_decoding/specs/messages2.spec
new file mode 100644
index 00000000000..1300b781b6c
--- /dev/null
+++ b/contrib/test_decoding/specs/messages2.spec
@@ -0,0 +1,45 @@
+# Test decoding of two-phase transactions during the build of a consistent snapshot.
+setup
+{
+    DROP TABLE IF EXISTS t1_message;
+    CREATE TABLE t1_message(msg text);
+}
+
+teardown
+{
+    DROP TABLE t1_message;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true);}
+step "s1start" {SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');}
+
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2txid" {SELECT pg_current_xact_id();}
+step "s2begin" {BEGIN;}
+step "s2commit" {COMMIT;}
+step "s2message" {SELECT pg_logical_emit_message(false, 'test', 'msg non-transactional');}
+#step "s2message" {SELECT pg_logical_emit_message(true, 'test', 'msg transactional');}
+step "s2insert" {INSERT INTO t1_message VALUES ('msg');}
+
+session "s3"
+setup { SET synchronous_commit=on; }
+
+step "s3txid" {SELECT pg_current_xact_id();}
+step "s3begin" {BEGIN;}
+step "s3commit" {COMMIT;}
+
+# Simple case, with consistent state before the whole insert, non-transactional
+#
+permutation "s1init" "s2message" "s2insert" "s1start"
+
+# Force building of a consistent snapshot after the INSERT but before COMMIT, non-transactional
+#
+permutation "s2begin" "s2txid" "s1init" "s3begin" "s3txid" "s2commit" "s2begin" "s2message" "s2insert" "s3commit" "s2commit" "s1start"
diff --git a/contrib/test_decoding/specs/sequences.spec b/contrib/test_decoding/specs/sequences.spec
new file mode 100644
index 00000000000..0f2cd06a7d3
--- /dev/null
+++ b/contrib/test_decoding/specs/sequences.spec
@@ -0,0 +1,49 @@
+# Test decoding of two-phase transactions during the build of a consistent snapshot.
+setup
+{
+    DROP TABLE IF EXISTS t1_seq;
+    DROP SEQUENCE IF EXISTS seq1;
+
+    CREATE TABLE t1_seq(c1 INT);
+    CREATE SEQUENCE seq1;
+}
+
+teardown
+{
+    DROP TABLE t1_seq;
+    DROP SEQUENCE seq1;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true);}
+step "s1start" {SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');}
+
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2txid" {SELECT pg_current_xact_id();}
+step "s2begin" {BEGIN;}
+step "s2commit" {COMMIT;}
+step "s2insert" {INSERT INTO t1_seq SELECT nextval('seq1') FROM generate_series(1,100);}
+#step "s2restart" {ALTER SEQUENCE seq1 RESTART;}
+step "s2restart" {}
+
+session "s3"
+setup { SET synchronous_commit=on; }
+
+step "s3txid" {SELECT pg_current_xact_id();}
+step "s3begin" {BEGIN;}
+step "s3commit" {COMMIT;}
+
+# Simple case, with consistent state before the whole insert
+#
+permutation "s1init" "s2restart" "s2insert" "s1start"
+
+# Force building of a consistent snapshot after the INSERT but before COMMIT
+#
+permutation "s2begin" "s2txid" "s1init" "s3begin" "s3txid" "s2commit" "s2begin" "s2restart" "s2insert" "s3commit" "s2commit" "s1start"

Reply via email to