On Thu, Jan 11, 2024 at 7:28 AM Peter Smith <smithpb2...@gmail.com> wrote: > > Here are some review comments for patch v58-0002
Thank You for the feedback. These are addressed in v60. Please find my response inline for a few. > (FYI - I quickly checked with the latest v59-0002 and AFAIK all these > review comments below are still relevant) > > ====== > Commit message > > 1. > If a logical slot is invalidated on the primary, slot on the standby is also > invalidated. > > ~ > > /slot on the standby/then that slot on the standby/ > > ====== > doc/src/sgml/logicaldecoding.sgml > > 2. > In order to resume logical replication after failover from the synced > logical slots, it is required that 'conninfo' in subscriptions are > altered to point to the new primary server using ALTER SUBSCRIPTION > ... CONNECTION. It is recommended that subscriptions are first > disabled before promoting the standby and are enabled back once these > are altered as above after failover. > > ~ > > Minor rewording mainly to reduce a long sentence. > > SUGGESTION > To resume logical replication after failover from the synced logical > slots, the subscription's 'conninfo' must be altered to point to the > new primary server. This is done using ALTER SUBSCRIPTION ... > CONNECTION. It is recommended that subscriptions are first disabled > before promoting the standby and are enabled back after altering the > connection string. > > ====== > doc/src/sgml/system-views.sgml > > 3. > + <entry role="catalog_table_entry"><para role="column_definition"> > + <structfield>synced</structfield> <type>bool</type> > + </para> > + <para> > + True if this logical slot was synced from a primary server. > + </para> > + <para> > > SUGGESTION > True if this is a logical slot that was synced from a primary server. > > ====== > src/backend/access/transam/xlogrecovery.c > > 4. > + /* > + * Shutdown the slot sync workers to prevent potential conflicts between > + * user processes and slotsync workers after a promotion. > + * > + * We do not update the 'synced' column from true to false here, as any > + * failed update could leave some slot's 'synced' column as false. This > + * could cause issues during slot sync after restarting the server as a > + * standby. While updating after switching to the new timeline is an > + * option, it does not simplify the handling for 'synced' column. > + * Therefore, we retain the 'synced' column as true after promotion as they > + * can provide useful information about their origin. > + */ > > Minor comment wording changes. > > BEFORE > ...any failed update could leave some slot's 'synced' column as false. > SUGGESTION > ...any failed update could leave 'synced' column false for some slots. > > ~ > > BEFORE > Therefore, we retain the 'synced' column as true after promotion as > they can provide useful information about their origin. > SUGGESTION > Therefore, we retain the 'synced' column as true after promotion as it > may provide useful information about the slot origin. > > ====== > src/backend/replication/logical/slotsync.c > > 5. > + * While creating the slot on physical standby, if the local restart_lsn > and/or > + * local catalog_xmin is ahead of those on the remote then the worker cannot > + * create the local slot in sync with the primary server because that would > + * mean moving the local slot backwards and the standby might not have WALs > + * retained for old LSN. In this case, the worker will mark the slot as > + * RS_TEMPORARY. Once the primary server catches up, it will move the slot to > + * RS_PERSISTENT and will perform the sync periodically. > > /will move the slot to RS_PERSISTENT/will mark the slot as RS_PERSISTENT/ > > ~~~ > > 6. drop_synced_slots_internal > +/* > + * Helper function for drop_obsolete_slots() > + * > + * Drops synced slot identified by the passed in name. > + */ > +static void > +drop_synced_slots_internal(const char *name, bool nowait) > +{ > + Assert(MyReplicationSlot == NULL); > + > + ReplicationSlotAcquire(name, nowait); > + > + Assert(MyReplicationSlot->data.synced); > + > + ReplicationSlotDropAcquired(); > +} > > IMO you don't need this function. AFAICT it is only called from one > place and does not result in fewer lines of code. > > ~~~ > > 7. get_local_synced_slots > > + /* Check if it is logical synchronized slot */ > + if (s->in_use && SlotIsLogical(s) && s->data.synced) > + { > + local_slots = lappend(local_slots, s); > + } > > Do you need to check SlotIsLogical(s) here? I thought s->data.synced > can never be true for physical slots. I felt you could write this like > blelow: > > if (s->in_use s->data.synced) > { > Assert(SlotIsLogical(s)); > local_slots = lappend(local_slots, s); > } > > ~~~ > > 8. check_sync_slot_on_remote > > +static bool > +check_sync_slot_on_remote(ReplicationSlot *local_slot, List *remote_slots, > + bool *locally_invalidated) > +{ > + ListCell *lc; > + > + foreach(lc, remote_slots) > + { > + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc); > > I think you can use the new style foreach_ptr list macros here. > > ~~~ > > 9. drop_obsolete_slots > > +drop_obsolete_slots(List *remote_slot_list) > +{ > + List *local_slots = NIL; > + ListCell *lc; > + > + local_slots = get_local_synced_slots(); > + > + foreach(lc, local_slots) > + { > + ReplicationSlot *local_slot = (ReplicationSlot *) lfirst(lc); > > I think you can use the new style foreach_ptr list macros here. > > ~~~ > > 10. reserve_wal_for_slot > > + Assert(slot != NULL); > + Assert(slot->data.restart_lsn == InvalidXLogRecPtr); > > You can use the macro XLogRecPtrIsInvalid(lot->data.restart_lsn) > > ~~~ > > 11. update_and_persist_slot > > +/* > + * Update the LSNs and persist the slot for further syncs if the remote > + * restart_lsn and catalog_xmin have caught up with the local ones. > Otherwise, > + * persist the slot and return. > + * > + * Return true if the slot is marked READY, otherwise false. > + */ > +static bool > +update_and_persist_slot(RemoteSlot *remote_slot) > > 11a. > The comment says "Otherwise, persist the slot and return" but there is > a return false which doesn't seem to persist anything so it seems > contrary to the comment. > > ~ > > 11b. > "slot is marked READY" -- IIUC the synced states no longer exist in > v58 so this comment maybe should not be referring to READY anymore. Or > maybe there just needs to be more explanation about the difference > between 'synced' and the state you call "READY". > > ~~~ > > 12. synchronize_one_slot > > + * The slot is created as a temporary slot and stays in same state until the > + * initialization is complete. The initialization is considered to be > completed > + * once the remote_slot catches up with locally reserved position and local > + * slot is updated. The slot is then persisted. > > I think this comment is related to the "READY" mentioned by > update_and_persist_slot. Still, perhaps the terminology needs to be > made consistent across all these comments -- e.g. "considered to be > completed" versus "READY" versus "sync-ready" etc. > > ~~~ > > 13. > + ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, > + remote_slot->two_phase, > + remote_slot->failover, > + true); > > > This review comment is similar to elsewhere in this post. Consider > commenting on the new parameter like "true /* synced */" > > ~~~ > > 14. synchronize_slots > > + /* > + * It is possible to get null values for LSN and Xmin if slot is > + * invalidated on the primary server, so handle accordingly. > + */ > + remote_slot->confirmed_lsn = !slot_attisnull(tupslot, 3) ? > + DatumGetLSN(slot_getattr(tupslot, 3, &isnull)) : > + InvalidXLogRecPtr; > + > + remote_slot->restart_lsn = !slot_attisnull(tupslot, 4) ? > + DatumGetLSN(slot_getattr(tupslot, 4, &isnull)) : > + InvalidXLogRecPtr; > + > + remote_slot->catalog_xmin = !slot_attisnull(tupslot, 5) ? > + DatumGetTransactionId(slot_getattr(tupslot, 5, &isnull)) : > + InvalidTransactionId; > > Isn't this the same functionality as the older v51 code that was > written differently? I felt the old style (without ignoring the > 'isnull') was more readable. > > v51 > + remote_slot->confirmed_lsn = DatumGetLSN(slot_getattr(tupslot, 3, &isnull)); > + if (isnull) > + remote_slot->confirmed_lsn = InvalidXLogRecPtr; > > v58 > + remote_slot->confirmed_lsn = !slot_attisnull(tupslot, 3) ? > + DatumGetLSN(slot_getattr(tupslot, 3, &isnull)) : > + InvalidXLogRecPtr; > > If you prefer a ternary, it might be cleaner to do it like: We got a CFBot failure, where the v51's way was crashing in a 32-bit env, because there a Datum for int64 is regarded as a pointer and thus it resulted in NULL pointer access if slot_getattr() returned NULL. Please see DatumGetInt64(). > Datum d; > ... > d = slot_getattr(tupslot, 3, &isnull); > remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d); Okay, I see. This can also be done. I kind of missed this line earlier, I can consider it in the next version. > ~~~ > > 15. > + > + /* Drop local slots that no longer need to be synced. */ > + drop_obsolete_slots(remote_slot_list); > + > + /* Now sync the slots locally */ > + foreach(lc, remote_slot_list) > + { > + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc); > + > + some_slot_updated |= synchronize_one_slot(wrconn, remote_slot); > + } > > Here you can use the new list macro like foreach_ptr. > > ~~~ > > 16. ReplSlotSyncWorkerMain > > + wrconn = walrcv_connect(PrimaryConnInfo, true, false, > + cluster_name[0] ? cluster_name : "slotsyncworker", > + &err); > + if (wrconn == NULL) > + ereport(ERROR, > + errcode(ERRCODE_CONNECTION_FAILURE), > + errmsg("could not connect to the primary server: %s", err)); > > > Typically, I saw other PG code doing "if (!wrconn)" instead of "if > (wrconn == NULL)" > > > ====== > src/backend/replication/slotfuncs.c > > 17. create_physical_replication_slot > > ReplicationSlotCreate(name, false, > temporary ? RS_TEMPORARY : RS_PERSISTENT, false, > - false); > + false, false); > > IMO passing parameters like "false, false, false" becomes a bit > difficult to understand from the caller's POV so it might be good to > comment on the parameter like: > > ReplicationSlotCreate(name, false, > temporary ? RS_TEMPORARY : RS_PERSISTENT, false, > false, false /* synced */); > > (there are a few other places like this where the same review comment applies) > > ~~~ > > 18. create_logical_replication_slot > > ReplicationSlotCreate(name, true, > temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, > - failover); > + failover, false); > > Same as above. Maybe comment on the parameter like "false /* synced */" > > ~~~ > > 19. pg_get_replication_slots > > case RS_INVAL_WAL_REMOVED: > - values[i++] = CStringGetTextDatum("wal_removed"); > + values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_REMOVED_TEXT); > break; > > case RS_INVAL_HORIZON: > - values[i++] = CStringGetTextDatum("rows_removed"); > + values[i++] = CStringGetTextDatum(SLOT_INVAL_HORIZON_TEXT); > break; > > case RS_INVAL_WAL_LEVEL: > - values[i++] = CStringGetTextDatum("wal_level_insufficient"); > + values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_LEVEL_TEXT); > break; > > IMO this code and the #defines that it uses can be written and pushed > as an independent patch. Okay, let me review this one and #22 which mentions the same. > ====== > src/backend/replication/walsender.c > > 20. CreateReplicationSlot > > ReplicationSlotCreate(cmd->slotname, false, > cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, > - false, false); > + false, false, false); > > Consider commenting the parameter like "false /* synced */" > > ~~~ > > 21. > ReplicationSlotCreate(cmd->slotname, true, > cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, > - two_phase, failover); > + two_phase, failover, false); > > Consider commenting the parameter like "false /* synced */" > > ====== > src/include/replication/slot.h > > 22. > +/* > + * The possible values for 'conflict_reason' returned in > + * pg_get_replication_slots. > + */ > +#define SLOT_INVAL_WAL_REMOVED_TEXT "wal_removed" > +#define SLOT_INVAL_HORIZON_TEXT "rows_removed" > +#define SLOT_INVAL_WAL_LEVEL_TEXT "wal_level_insufficient" > > IMO these #defines and also the code in pg_get_replication_slots() > that uses them can be written and pushed as an independent patch. > > ====== > .../t/050_standby_failover_slots_sync.pl > > 23. > +# Wait for the standby to start sync > +$standby1->start; > > But there is no waiting here? Maybe the comment should say like "Start > the standby so that slot syncing can begin" > > ~~~ > > 24. > +# Wait for the standby to finish sync > +my $offset = -s $standby1->logfile; > +$standby1->wait_for_log( > + qr/LOG: ( [A-Z0-9]+:)? newly locally created slot \"lsub1_slot\" is > sync-ready now/, > + $offset); > > SUGGESTION > # Wait for the standby to finish slot syncing > > ~~~ > > 25. > +# Confirm that logical failover slot is created on the standby and is sync > +# ready. > +is($standby1->safe_psql('postgres', > + q{SELECT failover, synced FROM pg_replication_slots WHERE slot_name > = 'lsub1_slot';}), > + "t|t", > + 'logical slot has failover as true and synced as true on standby'); > > SUGGESTION > # Confirm that the logical failover slot is created on the standby and > is flagged as 'synced' > > ~~~ > > 26. > +$subscriber1->safe_psql( > + 'postgres', qq[ > + CREATE TABLE tab_int (a int PRIMARY KEY); > + ALTER SUBSCRIPTION regress_mysub1 REFRESH PUBLICATION; > +]); > + > +$subscriber1->wait_for_subscription_sync; > > Add a comment like > > # Subscribe to the new table data and wait for it to arrive > > ~~~ > > 27. > +# Disable hot_standby_feedback temporarily to stop slot sync worker otherwise > +# the concerned testing scenarios here may be interrupted by different error: > +# 'ERROR: replication slot is active for PID ..' > + > +$standby1->safe_psql('postgres', 'ALTER SYSTEM SET > hot_standby_feedback = off;'); > +$standby1->restart; > > Remove the blank line. > > ~~~ > > 28. > +is($standby1->safe_psql('postgres', > + q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = > 'lsub1_slot';}), > + 'lsub1_slot', > + 'synced slot retained on the new primary'); > > There should be some comment like: > > SUGGESTION > # Confirm the synced slot 'lsub1_slot' is retained on the new primary > > ~~~ > > 29. > +# Confirm that data in tab_int replicated on subscriber > +is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}), > + "20", > + 'data replicated from the new primary'); > > /replicated on subscriber/replicated on the subscriber/ > > > ====== > Kind Regards, > Peter Smith. > Fujitsu Australia