On 1/11/23 21:12, Andres Freund wrote:
> Hi,
> 
> 
> Heikki, CCed you due to the point about 2c03216d8311 below.
> 
> 
> On 2023-01-10 19:32:12 +0100, Tomas Vondra wrote:
>> 0001 is a fix for the pre-existing issue in logicalmsg_decode,
>> attempting to build a snapshot before getting into a consistent state.
>> AFAICS this only affects assert-enabled builds and is otherwise
>> harmless, because we are not actually using the snapshot (apply gets a
>> valid snapshot from the transaction).
> 
> LGTM.
> 
> 
>> 0002 is a rebased version of the original approach, committed as
>> 0da92dc530 (and then reverted in 2c7ea57e56). This includes the same fix
>> as 0001 (for the sequence messages), the primary reason for the revert.
>>
>> The rebase was not quite straightforward, due to extensive changes in
>> how publications deal with tables/schemas, and so on. So this adopts
>> them, but other than that it behaves just like the original patch.
> 
> This is a huge diff:
>>  72 files changed, 4715 insertions(+), 612 deletions(-)
> 
> It'd be nice to split it to make review easier. Perhaps the sequence decoding
> support could be split from the whole publication rigamarole?
> 
> 
>> This does not include any changes to test_decoding and/or the built-in
>> replication - those will be committed in separate patches.
> 
> Looks like that's not the case anymore?
> 

Ah, right!  Now I realized I originally committed this in chunks, but
the revert was a single commit. And I just "reverted the revert" to
create this patch.

I'll definitely split this into smaller patches. This also explains the
obsolete commit message about test_decoding not being included, etc.

> 
>> +/*
>> + * Update the sequence state by modifying the existing sequence data row.
>> + *
>> + * This keeps the same relfilenode, so the behavior is non-transactional.
>> + */
>> +static void
>> +SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 
>> log_cnt, bool is_called)
>> +{
>> +    SeqTable        elm;
>> +    Relation        seqrel;
>> +    Buffer          buf;
>> +    HeapTupleData seqdatatuple;
>> +    Form_pg_sequence_data seq;
>> +
>> +    /* open and lock sequence */
>> +    init_sequence(seqrelid, &elm, &seqrel);
>> +
>> +    /* lock page' buffer and read tuple */
>> +    seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
>> +
>> +    /* check the comment above nextval_internal()'s equivalent call. */
>> +    if (RelationNeedsWAL(seqrel))
>> +    {
>> +            GetTopTransactionId();
>> +
>> +            if (XLogLogicalInfoActive())
>> +                    GetCurrentTransactionId();
>> +    }
>> +
>> +    /* ready to change the on-disk (or really, in-buffer) tuple */
>> +    START_CRIT_SECTION();
>> +
>> +    seq->last_value = last_value;
>> +    seq->is_called = is_called;
>> +    seq->log_cnt = log_cnt;
>> +
>> +    MarkBufferDirty(buf);
>> +
>> +    /* XLOG stuff */
>> +    if (RelationNeedsWAL(seqrel))
>> +    {
>> +            xl_seq_rec      xlrec;
>> +            XLogRecPtr      recptr;
>> +            Page            page = BufferGetPage(buf);
>> +
>> +            XLogBeginInsert();
>> +            XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
>> +
>> +            xlrec.locator = seqrel->rd_locator;
>> +            xlrec.created = false;
>> +
>> +            XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
>> +            XLogRegisterData((char *) seqdatatuple.t_data, 
>> seqdatatuple.t_len);
>> +
>> +            recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
>> +
>> +            PageSetLSN(page, recptr);
>> +    }
>> +
>> +    END_CRIT_SECTION();
>> +
>> +    UnlockReleaseBuffer(buf);
>> +
>> +    /* Clear local cache so that we don't think we have cached numbers */
>> +    /* Note that we do not change the currval() state */
>> +    elm->cached = elm->last;
>> +
>> +    relation_close(seqrel, NoLock);
>> +}
>> +
>> +/*
>> + * Update the sequence state by creating a new relfilenode.
>> + *
>> + * This creates a new relfilenode, to allow transactional behavior.
>> + */
>> +static void
>> +SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, 
>> bool is_called)
>> +{
>> +    SeqTable        elm;
>> +    Relation        seqrel;
>> +    Buffer          buf;
>> +    HeapTupleData seqdatatuple;
>> +    Form_pg_sequence_data seq;
>> +    HeapTuple       tuple;
>> +
>> +    /* open and lock sequence */
>> +    init_sequence(seq_relid, &elm, &seqrel);
>> +
>> +    /* lock page' buffer and read tuple */
>> +    seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
>> +
>> +    /* Copy the existing sequence tuple. */
>> +    tuple = heap_copytuple(&seqdatatuple);
>> +
>> +    /* Now we're done with the old page */
>> +    UnlockReleaseBuffer(buf);
>> +
>> +    /*
>> +     * Modify the copied tuple to update the sequence state (similar to what
>> +     * ResetSequence does).
>> +     */
>> +    seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
>> +    seq->last_value = last_value;
>> +    seq->is_called = is_called;
>> +    seq->log_cnt = log_cnt;
>> +
>> +    /*
>> +     * Create a new storage file for the sequence - this is needed for the
>> +     * transactional behavior.
>> +     */
>> +    RelationSetNewRelfilenumber(seqrel, seqrel->rd_rel->relpersistence);
>> +
>> +    /*
>> +     * Ensure sequence's relfrozenxid is at 0, since it won't contain any
>> +     * unfrozen XIDs.  Same with relminmxid, since a sequence will never
>> +     * contain multixacts.
>> +     */
>> +    Assert(seqrel->rd_rel->relfrozenxid == InvalidTransactionId);
>> +    Assert(seqrel->rd_rel->relminmxid == InvalidMultiXactId);
>> +
>> +    /*
>> +     * Insert the modified tuple into the new storage file. This does all 
>> the
>> +     * necessary WAL-logging etc.
>> +     */
>> +    fill_seq_with_data(seqrel, tuple);
>> +
>> +    /* Clear local cache so that we don't think we have cached numbers */
>> +    /* Note that we do not change the currval() state */
>> +    elm->cached = elm->last;
>> +
>> +    relation_close(seqrel, NoLock);
>> +}
>> +
>> +/*
>> + * Set a sequence to a specified internal state.
>> + *
>> + * The change is made transactionally, so that on failure of the current
>> + * transaction, the sequence will be restored to its previous state.
>> + * We do that by creating a whole new relfilenode for the sequence; so this
>> + * works much like the rewriting forms of ALTER TABLE.
>> + *
>> + * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
>> + * which must not be released until end of transaction.  Caller is also
>> + * responsible for permissions checking.
>> + */
>> +void
>> +SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 
>> log_cnt, bool is_called)
>> +{
>> +    if (transactional)
>> +            SetSequence_transactional(seq_relid, last_value, log_cnt, 
>> is_called);
>> +    else
>> +            SetSequence_non_transactional(seq_relid, last_value, log_cnt, 
>> is_called);
>> +}
> 
> That's a lot of duplication with existing code. There's no explanation why
> SetSequence() as well as do_setval() exists.
> 

Thanks, I'll look into this.

> 
>>  /*
>>   * Initialize a sequence's relation with the specified tuple as content
>>   *
>> @@ -406,8 +560,13 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, 
>> ForkNumber forkNum)
>>  
>>      /* check the comment above nextval_internal()'s equivalent call. */
>>      if (RelationNeedsWAL(rel))
>> +    {
>>              GetTopTransactionId();
>>  
>> +            if (XLogLogicalInfoActive())
>> +                    GetCurrentTransactionId();
>> +    }
> 
> Is it actually possible to reach this without an xid already having been
> assigned for the current xact?
> 

I believe it is. That's probably how I found this change is needed,
actually.

> 
> 
>> @@ -806,10 +966,28 @@ nextval_internal(Oid relid, bool check_permissions)
>>       * It's sufficient to ensure the toplevel transaction has an xid, no 
>> need
>>       * to assign xids subxacts, that'll already trigger an appropriate wait.
>>       * (Have to do that here, so we're outside the critical section)
>> +     *
>> +     * We have to ensure we have a proper XID, which will be included in
>> +     * the XLOG record by XLogRecordAssemble. Otherwise the first nextval()
>> +     * in a subxact (without any preceding changes) would get XID 0, and it
>> +     * would then be impossible to decide which top xact it belongs to.
>> +     * It'd also trigger assert in DecodeSequence. We only do that with
>> +     * wal_level=logical, though.
>> +     *
>> +     * XXX This might seem unnecessary, because if there's no XID the xact
>> +     * couldn't have done anything important yet, e.g. it could not have
>> +     * created a sequence. But that's incorrect, because of subxacts. The
>> +     * current subtransaction might not have done anything yet (thus no 
>> XID),
>> +     * but an earlier one might have created the sequence.
>>       */
> 
> What about restricting this to the case you're mentioning,
> i.e. subtransactions?
> 

That might work, but I need to think about it a bit.

I don't think it'd save us much, though. I mean, vast majority of
transactions (and subtransactions) calling nextval() will then do
something else which requires a XID. This just moves the XID a bit,
that's all.

> 
>> @@ -845,6 +1023,7 @@ nextval_internal(Oid relid, bool check_permissions)
>>              seq->log_cnt = 0;
>>  
>>              xlrec.locator = seqrel->rd_locator;
> 
> I realize this isn't from this patch, but:
> 
> Why do we include the locator in the record? We already have it via
> XLogRegisterBuffer(), no? And afaict we don't even use it, as we read the page
> via XLogInitBufferForRedo() during recovery.
> 
> Kinda looks like an oversight in 2c03216d8311
> 

I don't know, it's what the code did.

> 
> 
> 
>> +/*
>> + * Handle sequence decode
>> + *
>> + * Decoding sequences is a bit tricky, because while most sequence actions
>> + * are non-transactional (not subject to rollback), some need to be handled
>> + * as transactional.
>> + *
>> + * By default, a sequence increment is non-transactional - we must not queue
>> + * it in a transaction as other changes, because the transaction might get
>> + * rolled back and we'd discard the increment. The downstream would not be
>> + * notified about the increment, which is wrong.
>> + *
>> + * On the other hand, the sequence may be created in a transaction. In this
>> + * case we *should* queue the change as other changes in the transaction,
>> + * because we don't want to send the increments for unknown sequence to the
>> + * plugin - it might get confused about which sequence it's related to etc.
>> + */
>> +void
>> +sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
>> +{
> 
>> +    /* extract the WAL record, with "created" flag */
>> +    xlrec = (xl_seq_rec *) XLogRecGetData(r);
>> +
>> +    /* XXX how could we have sequence change without data? */
>> +    if(!datalen || !tupledata)
>> +            return;
> 
> Yea, I think we should error out here instead, something has gone quite wrong
> if this happens.
> 

OK

> 
>> +    tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
>> +    DecodeSeqTuple(tupledata, datalen, tuplebuf);
>> +
>> +    /*
>> +     * Should we handle the sequence increment as transactional or not?
>> +     *
>> +     * If the sequence was created in a still-running transaction, treat
>> +     * it as transactional and queue the increments. Otherwise it needs
>> +     * to be treated as non-transactional, in which case we send it to
>> +     * the plugin right away.
>> +     */
>> +    transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
>> +                                                                            
>>                                  target_locator,
>> +                                                                            
>>                                  xlrec->created);
> 
> Why re-create this information during decoding, when we basically already have
> it available on the primary? I think we already pay the price for that
> tracking, which we e.g. use for doing a non-transactional truncate:
> 
>               /*
>                * Normally, we need a transaction-safe truncation here.  
> However, if
>                * the table was either created in the current (sub)transaction 
> or has
>                * a new relfilenumber in the current (sub)transaction, then we 
> can
>                * just truncate it in-place, because a rollback would cause 
> the whole
>                * table or the current physical file to be thrown away anyway.
>                */
>               if (rel->rd_createSubid == mySubid ||
>                       rel->rd_newRelfilelocatorSubid == mySubid)
>               {
>                       /* Immediate, non-rollbackable truncation is OK */
>                       heap_truncate_one_rel(rel);
>               }
> 
> Afaict we could do something similar for sequences, except that I think we
> would just check if the sequence was created in the current transaction
> (i.e. any of the fields are set).
> 

Hmm, good point.

> 
>> +/*
>> + * A transactional sequence increment is queued to be processed upon commit
>> + * and a non-transactional increment gets processed immediately.
>> + *
>> + * A sequence update may be both transactional and non-transactional. When
>> + * created in a running transaction, treat it as transactional and queue
>> + * the change in it. Otherwise treat it as non-transactional, so that we
>> + * don't forget the increment in case of a rollback.
>> + */
>> +void
>> +ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
>> +                                               Snapshot snapshot, 
>> XLogRecPtr lsn, RepOriginId origin_id,
>> +                                               RelFileLocator rlocator, 
>> bool transactional, bool created,
>> +                                               ReorderBufferTupleBuf 
>> *tuplebuf)
> 
> 
>> +            /*
>> +             * Decoding needs access to syscaches et al., which in turn use
>> +             * heavyweight locks and such. Thus we need to have enough 
>> state around to
>> +             * keep track of those.  The easiest way is to simply use a 
>> transaction
>> +             * internally.  That also allows us to easily enforce that 
>> nothing writes
>> +             * to the database by checking for xid assignments.
>> +             *
>> +             * When we're called via the SQL SRF there's already a 
>> transaction
>> +             * started, so start an explicit subtransaction there.
>> +             */
>> +            using_subtxn = IsTransactionOrTransactionBlock();
> 
> This duplicates a lot of the code from ReorderBufferProcessTXN(). But only
> does so partially. It's hard to tell whether some of the differences are
> intentional. Could we de-duplicate that code with ReorderBufferProcessTXN()?
> 
> Maybe something like
> 
> void
> ReorderBufferSetupXactEnv(ReorderBufferXactEnv *, bool process_invals);
> 
> void
> ReorderBufferTeardownXactEnv(ReorderBufferXactEnv *, bool is_error);
> 

Thanks for the suggestion, I'll definitely consider that in the next
version of the patch.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Reply via email to