On Monday, December 11, 2023 3:32 PM Peter Smith <smithpb2...@gmail.com> > > Here are some review comments for v44-0001 > > ====== > src/backend/replication/slot.c > > > 1. ReplicationSlotCreate > > * during getting changes, if the two_phase option is enabled it can skip > * prepare because by that time start decoding point has been moved. So > the > * user will only get commit prepared. > + * failover: Allows the slot to be synced to physical standbys so that > logical > + * replication can be resumed after failover. > */ > void > ReplicationSlotCreate(const char *name, bool db_specific, > > ~ > > /Allows the slot.../If enabled, allows the slot.../
Changed. > > ====== > > 2. validate_standby_slots > > +validate_standby_slots(char **newval) > +{ > + char *rawname; > + List *elemlist; > + ListCell *lc; > + bool ok = true; > + > + /* Need a modifiable copy of string */ rawname = pstrdup(*newval); > + > + /* Verify syntax and parse string into list of identifiers */ if (!(ok > + = SplitIdentifierString(rawname, ',', &elemlist))) > + GUC_check_errdetail("List syntax is invalid."); > + > + /* > + * If there is a syntax error in the name or if the replication slots' > + * data is not initialized yet (i.e., we are in the startup process), > + skip > + * the slot verification. > + */ > + if (!ok || !ReplicationSlotCtl) > + { > + pfree(rawname); > + list_free(elemlist); > + return ok; > + } > > > 2a. > You don't need to initialize 'ok' during declaration because it is assigned > immediately anyway. > > ~ > > 2b. > AFAIK assignment within a conditional like this is not a normal PG coding > style > unless there is no other way to do it. > Changed. > ~ > > 2c. > /into list/into a list/ > > SUGGESTION > /* Verify syntax and parse string into a list of identifiers */ ok = > SplitIdentifierString(rawname, ',', &elemlist); if (!ok) > GUC_check_errdetail("List syntax is invalid."); > > Changed. > ~~~ > > 3. assign_standby_slot_names > > + if (!SplitIdentifierString(standby_slot_names_cpy, ',', > + &standby_slots)) { > + /* This should not happen if GUC checked check_standby_slot_names. */ > + elog(ERROR, "list syntax is invalid"); } > > This error here and in validate_standby_slots() are different -- "list" versus > "List". > The message has been changed to "invalid list syntax" to be consistent with other elog. > ====== > src/backend/replication/walsender.c > > > 4. WalSndFilterStandbySlots > > > + foreach(lc, standby_slots_cpy) > + { > + char *name = lfirst(lc); > + XLogRecPtr restart_lsn = InvalidXLogRecPtr; bool invalidated = false; > + char *warningfmt = NULL; > + ReplicationSlot *slot; > + > + slot = SearchNamedReplicationSlot(name, true); > + > + if (slot && SlotIsPhysical(slot)) > + { > + SpinLockAcquire(&slot->mutex); > + restart_lsn = slot->data.restart_lsn; > + invalidated = slot->data.invalidated != RS_INVAL_NONE; > + SpinLockRelease(&slot->mutex); } > + > + /* Continue if the current slot hasn't caught up. */ if (!invalidated > + && !XLogRecPtrIsInvalid(restart_lsn) && restart_lsn < wait_for_lsn) { > + /* Log warning if no active_pid for this physical slot */ if > + (slot->active_pid == 0) ereport(WARNING, errmsg("replication slot > + \"%s\" specified in parameter \"%s\" does > not have active_pid", > + name, "standby_slot_names"), > + errdetail("Logical replication is waiting on the " > + "standby associated with \"%s\"", name), errhint("Consider starting > + standby associated with " > + "\"%s\" or amend standby_slot_names", name)); > + > + continue; > + } > + else if (!slot) > + { > + /* > + * It may happen that the slot specified in standby_slot_names GUC > + * value is dropped, so let's skip over it. > + */ > + warningfmt = _("replication slot \"%s\" specified in parameter > \"%s\" does not exist, ignoring"); > + } > + else if (SlotIsLogical(slot)) > + { > + /* > + * If a logical slot name is provided in standby_slot_names, issue > + * a WARNING and skip it. Although logical slots are disallowed in > + * the GUC check_hook(validate_standby_slots), it is still > + * possible for a user to drop an existing physical slot and > + * recreate a logical slot with the same name. Since it is > + * harmless, a WARNING should be enough, no need to error-out. > + */ > + warningfmt = _("cannot have logical replication slot \"%s\" in > parameter \"%s\", ignoring"); > + } > + else if (XLogRecPtrIsInvalid(restart_lsn) || invalidated) { > + /* > + * Specified physical slot may have been invalidated, so there is no > + point > + * in waiting for it. > + */ > + warningfmt = _("physical slot \"%s\" specified in parameter \"%s\" > has been invalidated, ignoring"); > + } > + else > + { > + Assert(restart_lsn >= wait_for_lsn); > + } > > This if/else chain seems structured awkwardly. IMO it would be tidier to > eliminate the NULL slot and IsLogicalSlot up-front, which would also simplify > some of the subsequent conditions > > SUGGESTION > > slot = SearchNamedReplicationSlot(name, true); > > if (!slot) > { > ... > } > else if (SlotIsLogical(slot)) > { > ... > } > else > { > Assert(SlotIsPhysical(slot)) > > SpinLockAcquire(&slot->mutex); > restart_lsn = slot->data.restart_lsn; > invalidated = slot->data.invalidated != RS_INVAL_NONE; > SpinLockRelease(&slot->mutex); > > if (XLogRecPtrIsInvalid(restart_lsn) || invalidated) > { > ... > } > else if (!invalidated && !XLogRecPtrIsInvalid(restart_lsn) && restart_lsn < > wait_for_lsn) > { > ... > } > else > { > Assert(restart_lsn >= wait_for_lsn); > } > } > Changed. > ~~~~ > > 5. WalSndWaitForWal > > + else > + { > + /* already caught up and doesn't need to wait for standby_slots */ > break; > + } > > /Already/already/ > Changed. > ====== > src/test/recovery/t/050_standby_failover_slots_sync.pl > > > 6. > +$subscriber1->safe_psql('postgres', > + "CREATE TABLE tab_int (a int PRIMARY KEY);"); > + > +# Create a subscription with failover = true > +$subscriber1->safe_psql('postgres', > + "CREATE SUBSCRIPTION regress_mysub1 CONNECTION > '$publisher_connstr' " > + . "PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, > failover = true);" > +); > > > Consider combining these DDL statements. > Changed. > ~~~ > > 7. > +$subscriber2->safe_psql('postgres', > + "CREATE TABLE tab_int (a int PRIMARY KEY);"); > +$subscriber2->safe_psql('postgres', > + "CREATE SUBSCRIPTION regress_mysub2 CONNECTION > '$publisher_connstr' " > + . "PUBLICATION regress_mypub WITH (slot_name = lsub2_slot);"); > > Consider combining these DDL statements > Changed. > ~~~ > > 8. > +# Stop the standby associated with specified physical replication slot > +so that # the logical replication slot won't receive changes until the > +standby slot's # restart_lsn is advanced or the slots is removed from > +the standby_slot_names # list $publisher->safe_psql('postgres', > +"TRUNCATE tab_int;"); $publisher->wait_for_catchup('regress_mysub1'); > +$standby1->stop; > > /with specified/with the specified/ > > /or the slots is/or the slot is/ > Changed. > ~~~ > > 9. > +# Create some data on primary > > /on primary/on the primary/ > Changed. > ~~~ > > 10. > +$result = > + $subscriber1->safe_psql('postgres', "SELECT count(*) = 10 FROM > +tab_int;"); is($result, 't', > + "subscriber1 doesn't get data as the sb1_slot doesn't catch up"); > > > I felt instead of checking for 10 maybe it's more consistent with the previous > code to assign again that $primary_row_count variable to 20; > > Then check that those primary rows are not all yet received like: > > SELECT count(*) < $primary_row_count FROM tab_int; > I think we'd better check the accurate number here to make sure the number is what we expect. > ~~~ > > 11. > +# Now that the standby lsn has advanced, primary must send the decoded > +# changes to the subscription. > +$publisher->wait_for_catchup('regress_mysub1'); > +$result = > + $subscriber1->safe_psql('postgres', "SELECT count(*) = 20 FROM > +tab_int;"); is($result, 't', > + "subscriber1 gets data from primary after standby1 is removed from > the standby_slot_names list" > +); > > /primary must/the primary must/ > > (continuing the suggestion from the previous review comment) > > Now this SQL can use the variable too: > > subscriber1->safe_psql('postgres', "SELECT count(*) = > $primary_row_count FROM tab_int;"); > Changed. > ~~~ > > 12. > + > +# Create another subscription enabling failover > +$subscriber1->safe_psql('postgres', > + "CREATE SUBSCRIPTION regress_mysub3 CONNECTION > '$publisher_connstr' " > + . "PUBLICATION regress_mypub WITH (slot_name = lsub3_slot, > copy_data=false, failover = true, create_slot = false);" > +); > > > Maybe give some more information in that comment: > > SUGGESTION > Create another subscription (using the same slot created above) that enables > failover. > Added. Best Regards, Hou zj