On Monday, December 11, 2023 3:32 PM Peter Smith <smithpb2...@gmail.com>
> 
> Here are some review comments for v44-0001
> 
> ======
> src/backend/replication/slot.c
> 
> 
> 1. ReplicationSlotCreate
> 
>   *     during getting changes, if the two_phase option is enabled it can skip
>   *     prepare because by that time start decoding point has been moved. So
> the
>   *     user will only get commit prepared.
> + * failover: Allows the slot to be synced to physical standbys so that 
> logical
> + *     replication can be resumed after failover.
>   */
>  void
>  ReplicationSlotCreate(const char *name, bool db_specific,
> 
> ~
> 
> /Allows the slot.../If enabled, allows the slot.../

Changed.

> 
> ======
> 
> 2. validate_standby_slots
> 
> +validate_standby_slots(char **newval)
> +{
> + char    *rawname;
> + List    *elemlist;
> + ListCell   *lc;
> + bool ok = true;
> +
> + /* Need a modifiable copy of string */ rawname = pstrdup(*newval);
> +
> + /* Verify syntax and parse string into list of identifiers */ if (!(ok
> + = SplitIdentifierString(rawname, ',', &elemlist)))
> + GUC_check_errdetail("List syntax is invalid.");
> +
> + /*
> + * If there is a syntax error in the name or if the replication slots'
> + * data is not initialized yet (i.e., we are in the startup process),
> + skip
> + * the slot verification.
> + */
> + if (!ok || !ReplicationSlotCtl)
> + {
> + pfree(rawname);
> + list_free(elemlist);
> + return ok;
> + }
> 
> 
> 2a.
> You don't need to initialize 'ok' during declaration because it is assigned
> immediately anyway.
> 
> ~
> 
> 2b.
> AFAIK assignment within a conditional like this is not a normal PG coding 
> style
> unless there is no other way to do it.
> 

Changed.

> ~
> 
> 2c.
> /into list/into a list/
> 
> SUGGESTION
> /* Verify syntax and parse string into a list of identifiers */ ok =
> SplitIdentifierString(rawname, ',', &elemlist); if (!ok)
>   GUC_check_errdetail("List syntax is invalid.");
> 
> 

Changed.

> ~~~
> 
> 3. assign_standby_slot_names
> 
> + if (!SplitIdentifierString(standby_slot_names_cpy, ',',
> + &standby_slots)) {
> + /* This should not happen if GUC checked check_standby_slot_names. */
> + elog(ERROR, "list syntax is invalid"); }
> 
> This error here and in validate_standby_slots() are different -- "list" versus
> "List".
> 

The message has been changed to "invalid list syntax" to be consistent with 
other elog.

> ======
> src/backend/replication/walsender.c
> 
> 
> 4. WalSndFilterStandbySlots
> 
> 
> + foreach(lc, standby_slots_cpy)
> + {
> + char    *name = lfirst(lc);
> + XLogRecPtr restart_lsn = InvalidXLogRecPtr; bool invalidated = false;
> + char    *warningfmt = NULL;
> + ReplicationSlot *slot;
> +
> + slot = SearchNamedReplicationSlot(name, true);
> +
> + if (slot && SlotIsPhysical(slot))
> + {
> + SpinLockAcquire(&slot->mutex);
> + restart_lsn = slot->data.restart_lsn;
> + invalidated = slot->data.invalidated != RS_INVAL_NONE;
> + SpinLockRelease(&slot->mutex); }
> +
> + /* Continue if the current slot hasn't caught up. */ if (!invalidated
> + && !XLogRecPtrIsInvalid(restart_lsn) && restart_lsn < wait_for_lsn) {
> + /* Log warning if no active_pid for this physical slot */ if
> + (slot->active_pid == 0) ereport(WARNING, errmsg("replication slot
> + \"%s\" specified in parameter \"%s\" does
> not have active_pid",
> +    name, "standby_slot_names"),
> + errdetail("Logical replication is waiting on the "
> +   "standby associated with \"%s\"", name), errhint("Consider starting
> + standby associated with "
> + "\"%s\" or amend standby_slot_names", name));
> +
> + continue;
> + }
> + else if (!slot)
> + {
> + /*
> + * It may happen that the slot specified in standby_slot_names GUC
> + * value is dropped, so let's skip over it.
> + */
> + warningfmt = _("replication slot \"%s\" specified in parameter
> \"%s\" does not exist, ignoring");
> + }
> + else if (SlotIsLogical(slot))
> + {
> + /*
> + * If a logical slot name is provided in standby_slot_names, issue
> + * a WARNING and skip it. Although logical slots are disallowed in
> + * the GUC check_hook(validate_standby_slots), it is still
> + * possible for a user to drop an existing physical slot and
> + * recreate a logical slot with the same name. Since it is
> + * harmless, a WARNING should be enough, no need to error-out.
> + */
> + warningfmt = _("cannot have logical replication slot \"%s\" in
> parameter \"%s\", ignoring");
> + }
> + else if (XLogRecPtrIsInvalid(restart_lsn) || invalidated) {
> + /*
> + * Specified physical slot may have been invalidated, so there is no
> + point
> + * in waiting for it.
> + */
> + warningfmt = _("physical slot \"%s\" specified in parameter \"%s\"
> has been invalidated, ignoring");
> + }
> + else
> + {
> + Assert(restart_lsn >= wait_for_lsn);
> + }
> 
> This if/else chain seems structured awkwardly. IMO it would be tidier to
> eliminate the NULL slot and IsLogicalSlot up-front, which would also simplify
> some of the subsequent conditions
> 
> SUGGESTION
> 
> slot = SearchNamedReplicationSlot(name, true);
> 
> if (!slot)
> {
> ...
> }
> else if (SlotIsLogical(slot))
> {
> ...
> }
> else
> {
>   Assert(SlotIsPhysical(slot))
> 
>   SpinLockAcquire(&slot->mutex);
>   restart_lsn = slot->data.restart_lsn;
>   invalidated = slot->data.invalidated != RS_INVAL_NONE;
>   SpinLockRelease(&slot->mutex);
> 
>   if (XLogRecPtrIsInvalid(restart_lsn) || invalidated)
>   {
>   ...
>   }
>   else if (!invalidated && !XLogRecPtrIsInvalid(restart_lsn) && restart_lsn <
> wait_for_lsn)
>   {
>   ...
>   }
>   else
>   {
>     Assert(restart_lsn >= wait_for_lsn);
>   }
> }
> 

Changed.

> ~~~~
> 
> 5. WalSndWaitForWal
> 
> + else
> + {
> + /* already caught up and doesn't need to wait for standby_slots */
>   break;
> + }
> 
> /Already/already/
> 

Changed.

> ======
> src/test/recovery/t/050_standby_failover_slots_sync.pl
> 
> 
> 6.
> +$subscriber1->safe_psql('postgres',
> + "CREATE TABLE tab_int (a int PRIMARY KEY);");
> +
> +# Create a subscription with failover = true
> +$subscriber1->safe_psql('postgres',
> +     "CREATE SUBSCRIPTION regress_mysub1 CONNECTION
> '$publisher_connstr' "
> +   . "PUBLICATION regress_mypub WITH (slot_name = lsub1_slot,
> failover = true);"
> +);
> 
> 
> Consider combining these DDL statements.
> 

Changed.

> ~~~
> 
> 7.
> +$subscriber2->safe_psql('postgres',
> + "CREATE TABLE tab_int (a int PRIMARY KEY);");
> +$subscriber2->safe_psql('postgres',
> +     "CREATE SUBSCRIPTION regress_mysub2 CONNECTION
> '$publisher_connstr' "
> +   . "PUBLICATION regress_mypub WITH (slot_name = lsub2_slot);");
> 
> Consider combining these DDL statements
> 

Changed.

> ~~~
> 
> 8.
> +# Stop the standby associated with specified physical replication slot
> +so that # the logical replication slot won't receive changes until the
> +standby slot's # restart_lsn is advanced or the slots is removed from
> +the standby_slot_names # list $publisher->safe_psql('postgres',
> +"TRUNCATE tab_int;"); $publisher->wait_for_catchup('regress_mysub1');
> +$standby1->stop;
> 
> /with specified/with the specified/
> 
> /or the slots is/or the slot is/
> 

Changed.

> ~~~
> 
> 9.
> +# Create some data on primary
> 
> /on primary/on the primary/
> 

Changed.

> ~~~
> 
> 10.
> +$result =
> +  $subscriber1->safe_psql('postgres', "SELECT count(*) = 10 FROM
> +tab_int;"); is($result, 't',
> + "subscriber1 doesn't get data as the sb1_slot doesn't catch up");
> 
> 
> I felt instead of checking for 10 maybe it's more consistent with the previous
> code to assign again that $primary_row_count variable to 20;
> 
> Then check that those primary rows are not all yet received like:
> 
> SELECT count(*) < $primary_row_count FROM tab_int;
> 

I think we'd better check the accurate number here to make sure the number is 
what we expect.

> ~~~
> 
> 11.
> +# Now that the standby lsn has advanced, primary must send the decoded
> +# changes to the subscription.
> +$publisher->wait_for_catchup('regress_mysub1');
> +$result =
> +  $subscriber1->safe_psql('postgres', "SELECT count(*) = 20 FROM
> +tab_int;"); is($result, 't',
> + "subscriber1 gets data from primary after standby1 is removed from
> the standby_slot_names list"
> +);
> 
> /primary must/the primary must/
> 
> (continuing the suggestion from the previous review comment)
> 
> Now this SQL can use the variable too:
> 
> subscriber1->safe_psql('postgres', "SELECT count(*) =
> $primary_row_count FROM tab_int;");
> 

Changed.

> ~~~
> 
> 12.
> +
> +# Create another subscription enabling failover
> +$subscriber1->safe_psql('postgres',
> +     "CREATE SUBSCRIPTION regress_mysub3 CONNECTION
> '$publisher_connstr' "
> +   . "PUBLICATION regress_mypub WITH (slot_name = lsub3_slot,
> copy_data=false, failover = true, create_slot = false);"
> +);
> 
> 
> Maybe give some more information in that comment:
> 
> SUGGESTION
> Create another subscription (using the same slot created above) that enables
> failover.
> 

Added.

Best Regards,
Hou zj

Reply via email to