On 20 March 2017 at 17:33, Andres Freund <and...@anarazel.de> wrote:

>> Subject: [PATCH 2/3] Follow timeline switches in logical decoding
>
> FWIW, the title doesn't really seem accurate to me.

Yeah, it's not really at the logical decoding layer at all.

"Teach xlogreader to follow timeline switches" ?

>> Logical slots cannot actually be created on a replica without use of
>> the low-level C slot management APIs so this is mostly foundation work
>> for subsequent changes to enable logical decoding on standbys.
>
> Everytime I read references to anything like this my blood starts to
> boil.  I kind of regret not having plastered RecoveryInProgress() errors
> all over this code.

In fairness, I've been trying for multiple releases to get a "right"
way in. I have no intention of using such hacks, and only ever did so
for testing xlogreader timeline following without full logical
decoding on standby being available.

>> From 8854d44e2227b9d076b0a25a9c8b9df9270b2433 Mon Sep 17 00:00:00 2001
>> From: Craig Ringer <cr...@2ndquadrant.com>
>> Date: Mon, 5 Sep 2016 15:30:53 +0800
>> Subject: [PATCH 3/3] Logical decoding on standby
>>
>> * Make walsender aware of ProcSignal and recovery conflicts, make walsender
>>   exit with recovery conflict on upstream drop database when it has an active
>>   logical slot on that database.
>> * Allow GetOldestXmin to omit catalog_xmin, be called already locked.
>
> "be called already locked"?

To be called with ProcArrayLock already held. But that's actually
outdated due to changes Petr requested earlier, thanks for noticing.

>> * Send catalog_xmin separately in hot_standby_feedback messages.
>> * Store catalog_xmin separately on a physical slot if received in 
>> hot_standby_feedback
>
> What does separate mean?

Currently, hot standby feedback sends effectively the
min(catalog_xmin, xmin) to the upstream, which in turn records that
either in the PGPROC entry or, if there's a slot in use, in the xmin
field on the slot.

So catalog_xmin on the standby gets promoted to xmin on the master's
physical slot. Lots of unnecessary bloat results.

This splits it up, so we can send catalog_xmin separately on the wire,
and store it on the physical replication slot as catalog_xmin, not
xmin.

>> * Separate the catalog_xmin used by vacuum from ProcArray's 
>> replication_slot_catalog_xmin,
>>   requiring that xlog be emitted before vacuum can remove no longer needed 
>> catalogs, store
>>   it in checkpoints, make vacuum and bgwriter advance it.
>
> I can't parse that sentence.

We now write an xlog record before allowing the catalog_xmin in
ProcArray replication_slot_catalog_xmin to advance and allow catalog
tuples to be removed. This is achieved by making vacuum use a separate
field in ShmemVariableCache, oldestCatalogXmin. When vacuum looks up
the new xmin from GetOldestXmin, it copies
ProcArray.replication_slot_catalog_xmin to
ShmemVariableCache.oldestCatalogXmin, writing an xlog record to ensure
we remember the new value and ensure standbys know about it.

This provides a guarantee to standbys that all catalog tuples >=
ShmemVariableCache.oldestCatalogXmin are protected from vacuum and
lets them discover when that threshold advances.

The reason we cannot use the xid field in existing vacuum xlog records
is that the downstream has no way to know if the xact affected
catalogs and therefore whether it should advance its idea of
catalog_xmin or not. It can't get a Relation for the affected
relfilenode because it can't use the relcache during redo. We'd have
to add a flag to every vacuum record indicating whether it affected
catalogs, which is not fun, and vacuum might not always even know. And
the standby would still need a way to keep track of the oldest valid
catalog_xmin across restart without the ability to write it to
checkpoints.

It's a lot simpler and cheaper to have the master do it.

>> * Add a new recovery conflict type for conflict with catalog_xmin. Abort
>>   in-progress logical decoding sessions with conflict with recovery where 
>> needed
>>   catalog_xmin is too old
>
> Are we retaining WAL for slots broken in that way?

Yes, until the slot is dropped.

If I added a persistent flag on the slot to indicate that the slot is
invalid, then we could ignore it for purposes of WAL retention. It
seemed unnecessary at this stage.

>> * Make extra efforts to reserve master's catalog_xmin during decoding startup
>>   on standby.
>
> What does that mean?

WaitForMasterCatalogXminReservation(...)

I don't like it. At all. I'd rather have hot standby feedback replies
so we can know for sure when the master has locked in our feedback.
It's my most disliked part of this patch.

>> * Remove checks preventing starting logical decoding on standby
>
> To me that's too many different things in one commit.  A bunch of them
> seem like it'd be good if they'd get independent buildfarm cycles too.

I agree with you. I had them all separate before and was told that
there were too many patches. I also had fixes that spanned multiple
patches and were difficult to split up effectively.

I'd like to split it roughly along the lines of the bulletted items,
but I don't want to do it only to have someone else tell me to just
squash it again and waste all the work (again). I'll take the risk I
guess.

>> diff --git a/src/backend/access/heap/rewriteheap.c 
>> b/src/backend/access/heap/rewriteheap.c
>> index d7f65a5..36bbb98 100644
>> --- a/src/backend/access/heap/rewriteheap.c
>> +++ b/src/backend/access/heap/rewriteheap.c
>> @@ -812,7 +812,8 @@ logical_begin_heap_rewrite(RewriteState state)
>>       if (!state->rs_logical_rewrite)
>>               return;
>>
>> -     ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
>> +     /* Use the catalog_xmin being retained by vacuum */
>> +     ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL);
>
> What does that comment mean? Vacuum isn't the only thing that prunes old
> records.

I mean to refer to ShmemVariableCache.oldestCatalogXmin, the effective
catalog xmin used for record removal, not
ProcArray.replication_slot_catalog_xmin, the pending catalog_xmin for
local slots.

i.e. use the catalog_xmin that we've recorded in WAL and promised to standbys.

I agree the comment is unclear. Not sure how to improve it without
making it overly long though.

>> +/*
>> + * Set the global oldest catalog_xmin used to determine when tuples
>> + * may be removed from catalogs and user-catalogs accessible from logical
>> + * decoding.
>> + *
>> + * Only to be called from the startup process or by 
>> UpdateOldestCatalogXmin(),
>> + * which ensures the update is properly written to xlog first.
>> + */
>> +void
>> +SetOldestCatalogXmin(TransactionId oldestCatalogXmin)
>> +{
>> +     Assert(InRecovery || !IsUnderPostmaster || AmStartupProcess() || 
>> LWLockHeldByMe(ProcArrayLock));
>
> Uh, that's long-ish.  And doesn't agree with the comment above
> (s/startup process/process performing recovery/?).
>
> This is a long enough list that I'd consider just dropping the assert.

Fair enough.

>> +     else if (info == XLOG_XACT_CATALOG_XMIN_ADV)
>> +     {
>> +             xl_xact_catalog_xmin_advance *xlrec = 
>> (xl_xact_catalog_xmin_advance *) XLogRecGetData(record);
>> +
>> +             /*
>> +              * Unless logical decoding is possible on this node, we don't 
>> care about
>> +              * this record.
>> +              */
>> +             if (!XLogLogicalInfoActive() || max_replication_slots == 0)
>> +                     return;
>
> Too many negatives for my taste, but whatever.

Also removed in latest version, since it turns out not be accurate.

I had made the incorrect assumption that our global catalog_xmin was
necessarily 0 when wal_level < logical. But this is not the case, per
the new TAP tests in latest patch. We can have logical slots from when
wal_level was logical still existing with a valid catalog_xmin after
we restart into wal_level = replica.

>> +             /*
>> +              * Apply the new catalog_xmin limit immediately. New decoding 
>> sessions
>> +              * will refuse to start if their slot is past it, and old ones 
>> will
>> +              * notice when we signal them with a recovery conflict. 
>> There's no
>> +              * effect on the catalogs themselves yet, so it's safe for 
>> backends
>> +              * with older catalog_xmins to still exist.
>> +              *
>> +              * We don't have to take ProcArrayLock since only the startup 
>> process
>> +              * is allowed to change oldestCatalogXmin when we're in 
>> recovery.
>> +              */
>> +             SetOldestCatalogXmin(xlrec->new_catalog_xmin);
>
> Which seems to rely on ResolveRecoveryConflictWithLogicalDecoding's
> lwlock acquisition for barriers?

I don't yet have a really solid grasp of memory ordering and barrier
issues in multiprocessing. As I understand it, processes created after
this point aren't going to see the old value, they'll fork() with a
current snapshot of memory, so either they'll see the new value or
they'll be captured by our
ResolveRecoveryConflictWithLogicalDecoding() run (assuming they don't
exit first).

New decoding sessions for existing backends would be an issue. They
call EnsureActiveLogicalSlotValid() which performs a volatile read on
ShmemVariableCache->oldestCatalogXmin . But that isn't sufficient, is
it? We need a write barrier in SetOldestCatalogXmin and a read barrier
in EnsureActiveLogicalSlotValid.

I'll fix that. Thanks very much.


>> +/*
>> + * Record when we advance the catalog_xmin used for tuple removal
>> + * so standbys find out before we remove catalog tuples they might
>> + * need for logical decoding.
>> + */
>> +XLogRecPtr
>> +XactLogCatalogXminUpdate(TransactionId new_catalog_xmin)
>> +{
>> +     XLogRecPtr ptr = InvalidXLogRecPtr;
>> +
>> +     if (XLogInsertAllowed())
>> +     {
>> +             xl_xact_catalog_xmin_advance xlrec;
>> +
>> +             xlrec.new_catalog_xmin = new_catalog_xmin;
>> +
>> +             XLogBeginInsert();
>> +             XLogRegisterData((char *) &xlrec, 
>> SizeOfXactCatalogXminAdvance);
>> +
>> +             ptr = XLogInsert(RM_XACT_ID, XLOG_XACT_CATALOG_XMIN_ADV);
>> +     }
>
> Huh, why is this test needed and ok?

Good point. It isn't anymore.

I previously had catalog_xmin advances on replicas running a similar
path and skipping xlog. But that was fragile. So now
UpdateOldestCatalogXmin() is only called from the master, per the
assertion at the start, so it's unnecessary to test for
XLogInsertAllowed( ) here.

Fixed.


>> @@ -9449,6 +9456,16 @@ XLogReportParameters(void)
>>                       XLogFlush(recptr);
>>               }
>>
>> +             /*
>> +              * If wal_level was lowered from WAL_LEVEL_LOGICAL we no longer
>> +              * require oldestCatalogXmin in checkpoints and it no longer
>> +              * makes sense, so update shmem and xlog the change. This will
>> +              * get written out in the next checkpoint.
>> +              */
>> +             if (ControlFile->wal_level >= WAL_LEVEL_LOGICAL &&
>> +                     wal_level < WAL_LEVEL_LOGICAL)
>> +                     UpdateOldestCatalogXmin(true);
>
> What if we crash before this happens?

We run XLogReportParameters before we set ControlFile->state =
DB_IN_PRODUCTION, so we'd re-run recovery and call it again next time
through.

But as it turns out the above is neither necessary nor correct anyway,
it relies on the invalid assumption that catalog_xmin is 0 when
wal_level is < logical. Per above, not the case, so we can't
short-circuit catalog_xmin logging tests when wal_level = replica.

>> diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
>> index ff633fa..2d16bf0 100644
>> --- a/src/backend/commands/vacuum.c
>> +++ b/src/backend/commands/vacuum.c
>> @@ -518,6 +518,15 @@ vacuum_set_xid_limits(Relation rel,
>>       MultiXactId safeMxactLimit;
>>
>>       /*
>> +      * When logical decoding is enabled, we must write any advance of
>> +      * catalog_xmin to xlog before we allow VACUUM to remove those tuples.
>> +      * This ensures that any standbys doing logical decoding can cancel
>> +      * decoding sessions and invalidate slots if we remove tuples they
>> +      * still need.
>> +      */
>> +     UpdateOldestCatalogXmin(false);
>
> I'm on a first read-through through this, but it appears you don't do
> anything similar in heap_page_prune()?  And we can't just start emitting
> loads of additional records there, because it's called much more often...

vacuum_set_xid_limits sets OldestXmin in lazy_vacuum_rel, which is the
OldestXmin passed to heap_page_prune.

I could Assert that heap_page_prune's OldestXmin PrecedesOrEquals
ShmemVariableCache->oldestCatalogXmin I guess. It seemed unnecessary.


>> +             /*
>> +              * The walreceiver should be running when we try to create a 
>> slot. If
>> +              * we're unlucky enough to catch the walreceiver just as it's
>> +              * restarting after an error, well, the client can just retry. 
>> We don't
>> +              * bother to sleep and re-check.
>> +              */
>> +             if (!walrcv_running)
>> +                     ereport(ERROR,
>> +                                     
>> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>> +                                      errmsg("streaming replication is not 
>> active"),
>> +                                      errhint("Logical decoding on standby 
>> requires that streaming replication be configured and active. Ensure that 
>> primary_conninfo is correct in recovery.conf and check for streaming 
>> replication errors in the logs.")));
>
>
> That seems quite problematic. What if there's a momentaneous connection
> failure?  This also has the issue that just because you checked that
> walrcv_running at some point, doesn't guarantee anything by the time you
> actually check.  Seems like life were easier if recovery.conf were
> guc-ified already - checking for primary_conninfo/primary_slot_name etc
> wouldn't have that issue (and can't be changed while running).

Yes, I very much wish walreceiver were already GUC-ified. I'd rather
test primary_conninfo and primary_slot_name .

> Usage of a slot doesn't actually guarantee much in cascased setups, does
> it?

It doesn't entirely eliminate the potential for a race with catalog
removal, but neither does hot_standby_feedback on a non-cascading
setup. Right now we tolerate that race and the risk that the slot may
become invalid. The application can prevent that by making sure it has
a slot on the master and the standby has caught up past the master's
lsn at the time of that slot's creation before it creates a slot on
the standby.

That's part of why the hoop jumping for catalog_xmin advance. To make
sure we know, for sure, if it's safe to decode from a slot given that
we haven't been able to directly enforce our xmin on the master.

To get rid of that race without application intervention, we need the
ability for a feedback message to flow up the cascade, and a reply
that specifically matches that feedback message (or at least that
individual downstream node) to flow down the cascade.

I'm working on just that, but there's no way it'll be ready for pg10
obviously, and it has some difficult issues. It's actually intended to
help prevent conflict with standby cancels shortly after hot_standby
starts up, but it'll help with slot creation too.

Even with all that, we'll still need some kind of xlog'd catalog_xmin
knowledge, because users can do silly things like drop the slot
connecting standby to master and re-create it, causing the standby's
needed catalog_xmin on the master to become un-pinned. We don't want
to risk messily crashing if that happens.

>> @@ -266,7 +306,9 @@ CreateInitDecodingContext(char *plugin,
>>        * xmin horizons by other backends, get the safe decoding xid, and 
>> inform
>>        * the slot machinery about the new limit. Once that's done the
>>        * ProcArrayLock can be released as the slot machinery now is
>> -      * protecting against vacuum.
>> +      * protecting against vacuum - if we're on the master. If we're 
>> running on
>> +      * a replica, we have to wait until hot_standby_feedback locks in our
>> +      * needed catalogs, per details on 
>> WaitForMasterCatalogXminReservation().
>>        * ----
>>        */
>>       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
>> @@ -276,6 +318,12 @@ CreateInitDecodingContext(char *plugin,
>>
>>       ReplicationSlotsComputeRequiredXmin(true);
>>
>> +     if (RecoveryInProgress())
>> +             WaitForMasterCatalogXminReservation(slot);
>> +
>> +     
>> Assert(TransactionIdPrecedesOrEquals(ShmemVariableCache->oldestCatalogXmin,
>> +                                                                            
>>   slot->data.catalog_xmin));
>> +
>>       LWLockRelease(ProcArrayLock);
>
> I think it's quite a bad idea to do a blocking operation like
> WaitForMasterCatalogXminReservation while holding ProcArrayLock.
>
>
>> +/*
>> + * Wait until the master's catalog_xmin is set, advancing our catalog_xmin
>> + * if needed. Caller must hold exclusive ProcArrayLock, which this function 
>> will
>> + * temporarily release while sleeping but will re-acquire.
>
> Ah. I see. Hm :(.

Exactly.

I'm increasingly inclined to rip that out and make preventing races
with master catalog removal the application's problem. Create a slot
on the master first, or accept that you may have to retry.

>
>> + * When we're creating a slot on a standby we can't directly set the
>> + * master's catalog_xmin; the catalog_xmin is set locally, then relayed
>> + * over hot_standby_feedback. The master may remove the catalogs we
>> + * asked to reserve between when we set a local catalog_xmin and when
>> + * hs feedback makes that take effect on the master. We need a feedback
>> + * reply mechanism here, where:
>> + *
>> + * - we tentatively reserve catalog_xmin locally
>
> Will that already trigger recovery conflicts?

If we already have local active slots, we'll be using their existing
catalog_xmin and there's no issue.

If we don't already have local slots the only conflict potential is
this backend. It could potentially cause a conflict if we replayed a
greatly advanced catalog_xmin from the master before we got the chance
to advance our local one accordingly.

>> + * - we wake the walreceiver by setting its latch
>> + * - walreceiver sends hs_feedback
>> + * - upstream walsender sends a new 'hs_feedback reply' message with
>> + *   actual (xmin, catalog_xmin) reserved.
>> + * - walreceiver sees reply and updates ShmemVariableCache or some other
>> + *   handy bit of shmem with hs feedback reservations from reply
>
> "or some other handy bit"?

Ha.  Will fix.

>> + * - we poll the reservations while we wait
>> + * - we set our catalog_xmin to that value, which might be later if
>> + *   we missed our requested reservation, or might be earlier if
>> + *   someone else is holding down catalog_xmin on master. We got a hs
>> + *   feedback reply so we know it's reserved.
>> + *
>> + * For cascading, the actual reservation will need to cascade up the
>> + * chain by walsender setting its own walreceiver's latch in turn, etc.
>> + *
>> + * For now, we just set the local slot catalog_xmin and sleep until
>> + * oldestCatalogXmin equals or passes our reservation. This is fine if we're
>> + * the only decoding session, but it is vulnerable to races if slots on the
>> + * master or other decoding sessions on other standbys connected to the same
>> + * master exist. They might advance their reservation before our hs_feedback
>> + * locks it down, allowing vacuum to remove tuples we need. So we might 
>> start
>> + * decoding on our slot then error with a conflict with recovery when we see
>> + * catalog_xmin advance.
>> + */
>
> I was about to list some of these issues.  That's a bit unsatisfying.

I concur. I just don't have a better answer.

I think I'd like to rip it out and make it the application's problem
until we can do it right.

>
>
> Pondering this for a bit, but I'm ~9h into a flight, so maybe not
> tonight^Wthis morning^Wwhaaaa.
>
>
>> +static void
>> +WaitForMasterCatalogXminReservation(ReplicationSlot *slot)
>> +{
>
>
> This comment seems to duplicate some of the function header
> comment. Such duplication usually leads to either or both getting out of
> date rather quickly.
>
>
> Not commenting line-by-line on the code here, but I'm extremely doubtful
> that this approach is stable enough, and that the effect of holding
> ProcArrayLock exclusively over prolonged amounts of time is acceptable.
>
>> +     ReplicationSlotsComputeRequiredXmin(true);
>>
> Why do we need this? The caller does it too, no?

Because we force a walsender update immediately and want a current value.

It's cheap enough for running in slot creation.

>> +     /* Tell the master what catalog_xmin we settled on */
>> +     WalRcvForceReply();
>> +
>> +     /* Reset ps display if we changed it */
>> +     if (new_status)
>> +     {
>> +             set_ps_display(new_status, false);
>> +             pfree(new_status);
>> +     }
>
> We really shouldn't do stuff like this while holding ProcArrayLock.

Yeah, good point.

>> +/*
>> + * Test to see if the active logical slot is usable.
>> + */
>> +static void
>> +EnsureActiveLogicalSlotValid()
>> +{
>
> Missing (void).

Augh, C++ still has its tentacles in my brain.

>> +/*
>> + * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
>> + * passed database oid. The caller should hold an exclusive lock on the 
>> database
>> + * to ensure no replication slots on the database are in use.
>
> Stuff like this really should be it's own commit.  It can trivially be
> tested on its own, is useful on its own (just have DROP DATABASE do it),

Agreed, will do.

>> + * If we fail here we'll leave the in-memory state of replication slots
>> + * inconsistent with its on-disk state, so we need to PANIC.
>
> We worked quite hard to make it extremely unlikely for that to happen in
> practice.  I also don't see why there should be any new PANICs in this
> code.

I didn't figure out a sensible way not to. I'll revisit that.

>> +void
>> +ReplicationSlotsDropDBSlots(Oid dboid)
>> +{
>> +     int                     i;
>> +
>> +     if (max_replication_slots <= 0)
>> +             return;
>> +
>> +     /*
>> +      * We only need a shared lock here even though we activate slots,
>> +      * because we have an exclusive lock on the database we're dropping
>> +      * slots on and don't touch other databases' slots.
>> +      */
>> +     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
>
> Hm? Acquiring a slot always only takes a shared lock, no?
>
> I don't really see how "database is locked" guarantees enough for your
> logic - it's already possible to drop slots from other databases, and
> dropping a slot acquires it temporarily?

You can drop slots from other DBs.

Ugh. Right. That's a frustrating oversight. I'll have to revisit that logic.

>
>> +     for (i = 0; i < max_replication_slots; i++)
>> +     {
>> +             ReplicationSlot *s;
>> +             NameData slotname;
>> +             int active_pid;
>> +
>> +             s = &ReplicationSlotCtl->replication_slots[i];
>> +
>> +             /* cannot change while ReplicationSlotCtlLock is held */
>> +             if (!s->in_use)
>> +                     continue;
>> +
>> +             /* only logical slots are database specific, skip */
>> +             if (!SlotIsLogical(s))
>> +                     continue;
>> +
>> +             /* not our database, skip */
>> +             if (s->data.database != dboid)
>> +                     continue;
>> +
>> +             /* Claim the slot, as if ReplicationSlotAcquire()ing */
>> +             SpinLockAcquire(&s->mutex);
>> +             strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN);
>> +             NameStr(slotname)[NAMEDATALEN-1] = '\0';
>> +             active_pid = s->active_pid;
>> +             if (active_pid == 0)
>> +             {
>> +                     MyReplicationSlot = s;
>> +                     s->active_pid = MyProcPid;
>> +             }
>> +             SpinLockRelease(&s->mutex);
>> +
>> +             /*
>> +              * The caller should have an exclusive lock on the database so
>> +              * we'll never have any in-use slots, but just in case...
>> +              */
>> +             if (active_pid)
>> +                     elog(PANIC, "replication slot %s is in use by pid %d",
>> +                              NameStr(slotname), active_pid);
>
> So, yea, this doesn't seem ok. Why don't we just ERROR out, instead of
> PANICing? There seems to be absolutely no correctness reason for a PANIC
> here?

We've acquired the slot but it's active by another backend. Something
broke. But you're right, PANIC is an over-reaction.

>> +             /*
>> +              * To avoid largely duplicating ReplicationSlotDropAcquired() 
>> or
>> +              * complicating it with already_locked flags for ProcArrayLock,
>> +              * ReplicationSlotControlLock and 
>> ReplicationSlotAllocationLock, we
>> +              * just release our ReplicationSlotControlLock to drop the 
>> slot.
>> +              *
>> +              * There's no race here: we acquired this slot, and no slot 
>> "behind"
>> +              * our scan can be created or become active with our target 
>> dboid due
>> +              * to our exclusive lock on the DB.
>> +              */
>> +             LWLockRelease(ReplicationSlotControlLock);
>> +             ReplicationSlotDropAcquired();
>> +             LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
>
> I don't see much problem with this, but I'd change the code so you
> simply do a goto restart; if you released the slot.  Then there's a lot
> less chance / complications around temporarily releasing
> ReplicationSlotControlLock.

Good idea.

>> +                                              *
>> +                                              * If logical decoding 
>> information is enabled, we also
>> +                                              * send immediate hot standby 
>> feedback so as to reduce
>> +                                              * the delay before our needed 
>> catalogs are locked in.
>
> "logical decoding information ... enabled"

XLogLogicalInfoActive()

> and "catalogs are locked in"

Yeah, fair.

> are a bit too imprecise descriptions for my taste.

Will adjust.


>
>
>> +             xmin = GetOldestXmin(NULL,
>> +                                                      false, /* don't 
>> ignore vacuum */
>> +                                                      true /* ignore 
>> catalog xmin */);
>> +
>> +             /*
>> +              * The catalog_Xmin reported by GetOldestXmin is the effective
>> +              * catalog_xmin used by vacuum, as set by 
>> xl_xact_catalog_xmin_advance
>> +              * records from the master. Sending it back to the master 
>> would be
>> +              * circular and prevent its catalog_xmin ever advancing once 
>> set.
>> +              * We should only send the catalog_xmin we actually need for 
>> slots.
>> +              */
>> +             ProcArrayGetReplicationSlotXmin(NULL, NULL, &catalog_xmin);
>
> Given that you don't have catalog_xmin set by GetOldestXmin that comment
> is a bit misleading.

It is. Too many revisions with too much time between them. Fixing.

>> @@ -1427,19 +1436,93 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
>>               NormalTransactionIdPrecedes(replication_slot_xmin, result))
>>               result = replication_slot_xmin;
>>
>> +     if (!ignoreCatalogXmin && (rel == NULL || 
>> RelationIsAccessibleInLogicalDecoding(rel)))
>> +     {
>> +             /*
>> +              * After locks have been released and defer_cleanup_age has 
>> been applied,
>> +              * check whether we need to back up further to make logical 
>> decoding
>> +              * safe. We need to do so if we're computing the global limit 
>> (rel =
>> +              * NULL) or if the passed relation is a catalog relation of 
>> some kind.
>> +              */
>> +             if (TransactionIdIsValid(replication_slot_catalog_xmin) &&
>> +                     
>> NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
>> +                     result = replication_slot_catalog_xmin;
>> +     }
>
> The nesting of these checks, and the comments about them, is a bit
> weird.

Agree. I didn't find it readable in one test, and it wasn't clear how
to comment on just the inner part of the tests without splitting it
up. But it's easy enough to merge, I just found it less readable and
harder to understand.

>> +/*
>> + * Return true if ShmemVariableCache->oldestCatalogXmin needs to be updated
>> + * to reflect an advance in procArray->replication_slot_catalog_xmin or
>> + * it becoming newly set or unset.
>> + *
>> + */
>> +static bool
>> +CatalogXminNeedsUpdate(TransactionId vacuum_catalog_xmin, TransactionId 
>> slots_catalog_xmin)
>> +{
>> +     return (TransactionIdPrecedes(vacuum_catalog_xmin, slots_catalog_xmin)
>> +                     || (TransactionIdIsValid(vacuum_catalog_xmin) != 
>> TransactionIdIsValid(slots_catalog_xmin)));
>> +}
>
> Your lines are really long - pgindent (which you really should run) will
> much this.  I think it'd be better to rephrase this.

Thanks. Will.

IIRC pgindent created a LOT of unrelated noise at the time I was
working on it, but I'll recheck.

>
>
>> +/*
>> + * If necessary, copy the current catalog_xmin needed by repliation slots to
>
> Typo: repliation

Thanks.

>> + * the effective catalog_xmin used for dead tuple removal.
>> + *
>> + * When logical decoding is enabled we write a WAL record before advancing 
>> the
>> + * effective value so that standbys find out if catalog tuples they still 
>> need
>> + * get removed, and can properly cancel decoding sessions and invalidate 
>> slots.
>> + *
>> + * The 'force' option is used when we're turning WAL_LEVEL_LOGICAL off
>> + * and need to clear the shmem state, since we want to bypass the wal_level
>> + * check and force xlog writing.
>> + */
>> +void
>> +UpdateOldestCatalogXmin(bool force)
>
> I'm a bit confused by this function and variable name.  What does
>
> +       TransactionId oldestCatalogXmin; /* oldest xid where complete catalog 
> state
> +                                                                         * 
> is guaranteed to still exist */
>
> mean?  I complained about the overall justification in the commit
> already, but looking at this commit alone, the justification for this
> part of the change is quite hard to understand.

Standbys have no way to know what catalog row versions are guaranteed to exist.

They know, from vacuum xlog records, when we remove row versions,
index entries, etc associated with a transaction. But the standby has
no way to know if the affected relation is a catalog or not, it only
knows the relfilenode. So it can't maintain a local notion of
"effective global catalog_xmin on the master as of the last xlog
record I replayed".

I could add is_catalog flags to all the vacuum xlog records via a
secondary struct that's only added when wal_level = logical, but that
seems pretty awful and likely to be very noisy. It also wouldn't help
the standby know, at startup, what the current catalog_xmin of the
master is since it won't be in a checkpoint or the control file.

>
>
>> +{
>> +     TransactionId vacuum_catalog_xmin;
>> +     TransactionId slots_catalog_xmin;
>> +
>> +     /*
>> +      * If we're not recording logical decoding information, catalog_xmin
>> +      * must be unset and we don't need to do any work here.
>
> If we don't need to do any work, shouldn't we return early?

Yes.

>> +     if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin) || 
>> force)
>> +     {
>> +             XactLogCatalogXminUpdate(slots_catalog_xmin);
>> +
>> +             LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
>> +             /*
>> +              * A concurrent updater could've changed these values so we 
>> need to re-check
>> +              * under ProcArrayLock before updating.
>> +              */
>> +             vacuum_catalog_xmin = *((volatile 
>> TransactionId*)&ShmemVariableCache->oldestCatalogXmin);
>> +             slots_catalog_xmin = *((volatile 
>> TransactionId*)&procArray->replication_slot_catalog_xmin);
>
> why are there volatile reads here?

Because I didn't understand volatile well enough. It's not a memory
barrier and provides no guarantee that we're seeing recent values.

It should probably just take ProcArrayLock.

>> +             if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, 
>> slots_catalog_xmin))
>> +                     SetOldestCatalogXmin(slots_catalog_xmin);
>
> Why don't we check force here, but above?

Good point.

I've removed force anyway, in the latest revision. Same reason as
given above re the StartupXLOG parameters check stuff.

>> @@ -2167,14 +2250,20 @@ GetOldestSafeDecodingTransactionId(void)
>>       oldestSafeXid = ShmemVariableCache->nextXid;
>>
>>       /*
>> -      * If there's already a slot pegging the xmin horizon, we can start 
>> with
>> -      * that value, it's guaranteed to be safe since it's computed by this
>> -      * routine initially and has been enforced since.
>> +      * If there's already an effectiveCatalogXmin held down by vacuum
>> +      * it's definitely safe to start there, and it can't advance
>> +      * while we hold ProcArrayLock.
>
> What does "held down by vacuum" mean?

Brain fart. Held down by an existing slot. Comment also needs rewording.

>>  /*
>> + * Notify a logical decoding session that it conflicts with a
>> + * newly set catalog_xmin from the master.
>> + */
>> +void
>> +CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid)
>> +{
>> +     ProcArrayStruct *arrayP = procArray;
>> +     int                     index;
>> +
>> +     /*
>> +      * We have to scan ProcArray to find the process and set a pending 
>> recovery
>> +      * conflict even though we know the pid. At least we can get the 
>> BackendId
>> +      * and void a ProcSignal scan later.
>> +      *
>> +      * The pid might've gone away, in which case we got the desired
>> +      * outcome anyway.
>> +      */
>> +     LWLockAcquire(ProcArrayLock, LW_SHARED);
>> +
>> +     for (index = 0; index < arrayP->numProcs; index++)
>> +     {
>> +             int                     pgprocno = arrayP->pgprocnos[index];
>> +             volatile PGPROC *proc = &allProcs[pgprocno];
>> +
>> +             if (proc->pid == session_pid)
>> +             {
>> +                     VirtualTransactionId procvxid;
>> +
>> +                     GET_VXID_FROM_PGPROC(procvxid, *proc);
>> +
>> +                     proc->recoveryConflictPending = true;
>> +
>> +                     /*
>> +                      * Kill the pid if it's still here. If not, that's 
>> what we
>> +                      * wanted so ignore any errors.
>> +                      */
>> +                     (void) SendProcSignal(session_pid,
>> +                             PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN, 
>> procvxid.backendId);
>> +
>> +                     break;
>> +             }
>> +     }
>> +
>> +     LWLockRelease(ProcArrayLock);
>
> Doesn't seem ok to do this while holding ProcArrayLock.

Fair enough. And I guess it's safe enough to take and release it,
since new processes that start won't be at risk of cancellation so we
don't care about whether or not we scan them.



>> +/*
>> + * Scan to see if any clients are using replication slots that are below the
>> + * new catalog_xmin theshold and sigal them to terminate with a recovery
>> + * conflict.
>> + *
>> + * We already applied the new catalog_xmin record and updated the shmem
>> + * catalog_xmin state, so new clients that try to use a replication slot
>> + * whose on-disk catalog_xmin is below the new threshold will ERROR, and we
>> + * don't have to guard against them here.
>> + *
>> + * Replay can only continue safely once every slot that needs the catalogs
>> + * we're going to free for removal is gone. So if any conflicting sessions
>> + * exist, wait for any standby conflict grace period then signal them to 
>> exit.
>> + *
>> + * The master might clear its reserved catalog_xmin if all upstream slots 
>> are
>> + * removed or clear their feedback reservations, sending us
>> + * InvalidTransactionId. If we're concurrently trying to create a new slot 
>> and
>> + * reserve catalogs the InvalidXid reservation report might come in while we
>> + * have a slot waiting for hs_feedback confirmation of its reservation. That
>> + * would cause the waiting process to get canceled with a conflict with
>> + * recovery here since its tentative reservation conflicts with the master's
>> + * report of 'nothing reserved'. To allow it to continue to seek a 
>> startpoint
>> + * we ignore slots whose catalog_xmin is >= nextXid, indicating that they're
>> + * still looking for where to start. We'll sometimes notice a conflict but 
>> the
>> + * slot will advance its catalog_xmin to a more recent nextXid and cease to
>> + * conflict when we re-check. (The alternative is to track slots being 
>> created
>> + * differently to slots actively decoding in shmem, which seems 
>> unnecessary. Or
>> + * to separate the 'tentative catalog_xmin reservation' of a slot from its
>> + * actual needed catalog_xmin.)
>> + *
>> + * We can't use ResolveRecoveryConflictWithVirtualXIDs() here because
>> + * walsender-based logical decoding sessions won't have any virtualxid for 
>> much
>> + * of their life and the end of their virtualxids doesn't mean the end of a
>> + * potential conflict. It would also cancel too aggressively, since it cares
>> + * about the backend's xmin and logical decoding only needs the 
>> catalog_xmin.
>> + */
>
> The use of "we" seems confusing here, because it's not the same process.
>
> Generally I think your comments need to be edited a bit for brevity and
> preciseness.

Will work on it.

Me, verbose? Really?

>> +void
>> +ResolveRecoveryConflictWithLogicalDecoding(TransactionId new_catalog_xmin)
>> +{
>> +     int i;
>> +
>> +     if (!InHotStandby)
>> +             /* nobody can be actively using logical slots */
>> +             return;
>> +
>> +     /* Already applied new limit, can't have replayed later one yet */
>> +     Assert(ShmemVariableCache->oldestCatalogXmin == new_catalog_xmin);
>> +
>> +     /*
>> +      * Find the first conflicting active slot and wait for it to be free,
>> +      * signalling it if necessary, then repeat until there are no more
>> +      * conflicts.
>> +      */
>> +     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
>> +     for (i = 0; i < max_replication_slots; i++)
>> +     {
>
> I'm pretty strongly against any code outside of slot.c doing this.

IIRC I originally tried to do that as part of slot.c but found that it
resulted in other ugliness relating to access to other structures. But
I can't remember what anymore, so I'll revisit it.

>
>
>> @@ -2789,12 +2797,13 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
>>               Assert(RecoveryConflictPending && (QueryCancelPending || 
>> ProcDiePending));
>>
>>               /*
>> -              * All conflicts apart from database cause dynamic errors 
>> where the
>> -              * command or transaction can be retried at a later point with 
>> some
>> -              * potential for success. No need to reset this, since 
>> non-retryable
>> -              * conflict errors are currently FATAL.
>> +              * All conflicts apart from database and catalog_xmin cause 
>> dynamic
>> +              * errors where the command or transaction can be retried at a 
>> later
>> +              * point with some potential for success. No need to reset 
>> this, since
>> +              * non-retryable conflict errors are currently FATAL.
>>                */
>> -             if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE)
>> +             if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE ||
>> +                     reason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN)
>>                       RecoveryConflictRetryable = false;
>>       }
>
> Hm. Why is this a non-retryable error?

The global catalog_xmin isn't going to go backwards, so if the slot
needs a given catalog_xmin and we want to discard it....

... then we should give it a while to catch up. Right. It should be retryable.


> Ok, landing soon.  Gotta finish here.

Greatly appreciated, I know it's not the nicest to review.
>
> 0002 should be doable as a whole this release, I have severe doubts that
> 0003 as a whole has a chance for 10 - the code is in quite a raw shape,
> there's a significant number of open ends.  I'd suggest breaking of bits
> that are independently useful, and work on getting those committed.

I'll be doing that, yes.

I really want some way to create slots on replicas, advance them to
follow the master's position, and have them able to be used after
promotion to master.

I don't think actually live decoding on replica is ready yet, though
I'd find the ability to shift decoding workloads to replicas rather
nice when it is ready.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to