Hi Vignesh, I have reviewed your latest patchset:
v20240814-0001. No comments
v20240814-0002. No comments
v20240814-0003. No comments
v20240814-0004. See below
v20240814-0005. No comments
//////
v20240814-0004.
======
src/backend/commands/subscriptioncmds.c
CreateSubscription:
nit - XXX comments
AlterSubscription_refresh:
nit - unnecessary parens in ereport
AlterSubscription:
nit - unnecessary parens in ereport
fetch_sequence_list:
nit - unnecessary parens in ereport
======
.../replication/logical/sequencesync.c
1. fetch_remote_sequence_data
+ * Returns:
+ * - TRUE if there are discrepancies between the sequence parameters in
+ * the publisher and subscriber.
+ * - FALSE if the parameters match.
+ */
+static bool
+fetch_remote_sequence_data(WalReceiverConn *conn, Oid relid, Oid remoteid,
+ char *nspname, char *relname, int64 *log_cnt,
+ bool *is_called, XLogRecPtr *page_lsn,
+ int64 *last_value)
IMO it is more natural to return TRUE for good results and FALSE for
bad ones. (FYI, I have implemented this reversal in the nitpicks
attachment).
~
nit - swapped columns seqmin and seqmax in the SQL to fetch them in
the natural order
nit - unnecessary parens in ereport
~~~
copy_sequence:
nit - update function comment to document the output parameter
nit - Assert that *sequence_mismatch is false on entry to this function
nit - tweak wrapping and add \n in the SQL
nit - unnecessary parens in ereport
report_sequence_mismatch:
nit - modify function comment
nit - function name changed
/report_sequence_mismatch/report_mismatched_sequences/ (now plural
(and more like the other one)
append_mismatched_sequences:
nit - param name /rel/seqrel/
~~~
2. LogicalRepSyncSequences:
+ Relation sequence_rel;
+ XLogRecPtr sequence_lsn;
+ bool sequence_mismatch;
The 'sequence_mismatch' variable must be initialized false, otherwise
we cannot trust it gets assigned.
~
LogicalRepSyncSequences:
nit - unnecessary parens in ereport
nit - move the for-loop variable declaration
nit - remove a blank line
process_syncing_sequences_for_apply:
nit - variable declaration indent
======
Kind Regards,
Peter Smith.
Fujitsu Australia
diff --git a/src/backend/commands/subscriptioncmds.c
b/src/backend/commands/subscriptioncmds.c
index 9fff288..22115bd 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -726,10 +726,10 @@ CreateSubscription(ParseState *pstate,
CreateSubscriptionStmt *stmt,
recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
/*
- * XXX: If the subscription is for a sequence-only publication,
- * creating this origin is unnecessary at this point. It can be created
- * later during the ALTER SUBSCRIPTION ... REFRESH command, if the
- * publication is updated to include tables or tables in schemas.
+ * XXX: If the subscription is for a sequence-only publication, creating
+ * this origin is unnecessary. It can be created later during the ALTER
+ * SUBSCRIPTION ... REFRESH command, if the publication is updated to
+ * include tables or tables in schemas.
*/
ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname,
sizeof(originname));
replorigin_create(originname);
@@ -800,9 +800,9 @@ CreateSubscription(ParseState *pstate,
CreateSubscriptionStmt *stmt,
* export it.
*
* XXX: If the subscription is for a sequence-only
publication,
- * creating this slot is not necessary at the moment.
It can be
- * created during the ALTER SUBSCRIPTION ... REFRESH
command if the
- * publication is updated to include tables or tables
in schema.
+ * creating this slot. It can be created later during
the ALTER
+ * SUBSCRIPTION ... REFRESH command, if the publication
is updated
+ * to include tables or tables in schema.
*/
if (opts.create_slot)
{
@@ -1021,9 +1021,9 @@ AlterSubscription_refresh(Subscription *sub, bool
copy_data,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
InvalidXLogRecPtr, true);
ereport(DEBUG1,
- (errmsg_internal("%s \"%s.%s\"
added to subscription \"%s\"",
-
relkind == RELKIND_SEQUENCE ? "sequence" : "table",
-
rv->schemaname, rv->relname, sub->name)));
+ errmsg_internal("%s \"%s.%s\"
added to subscription \"%s\"",
+
relkind == RELKIND_SEQUENCE ? "sequence" : "table",
+
rv->schemaname, rv->relname, sub->name));
}
}
@@ -1125,11 +1125,11 @@ AlterSubscription_refresh(Subscription *sub, bool
copy_data,
}
ereport(DEBUG1,
- (errmsg_internal("%s \"%s.%s\"
removed from subscription \"%s\"",
+ errmsg_internal("%s \"%s.%s\"
removed from subscription \"%s\"",
relkind == RELKIND_SEQUENCE ? "sequence" : "table",
get_namespace_name(get_rel_namespace(relid)),
get_rel_name(relid),
-
sub->name)));
+
sub->name));
}
}
@@ -1615,8 +1615,8 @@ AlterSubscription(ParseState *pstate,
AlterSubscriptionStmt *stmt,
{
if (!sub->enabled)
ereport(ERROR,
-
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("ALTER
SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES is not allowed for disabled
subscriptions")));
+
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("ALTER
SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES is not allowed for disabled
subscriptions"));
PreventInTransactionBlock(isTopLevel, "ALTER
SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES");
@@ -2494,8 +2494,8 @@ fetch_sequence_list(WalReceiverConn *wrconn, char
*subname, List *publications)
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
- (errmsg("could not receive list of sequences
from the publisher: %s",
- res->err)));
+ errmsg("could not receive list of sequences
from the publisher: %s",
+ res->err));
/* Process sequences. */
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
diff --git a/src/backend/replication/logical/sequencesync.c
b/src/backend/replication/logical/sequencesync.c
index 8211121..1f45564 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -82,9 +82,8 @@ List *sequence_states_not_ready = NIL;
* - last_value: The last value of the sequence.
*
* Returns:
- * - TRUE if there are discrepancies between the sequence parameters in
- * the publisher and subscriber.
- * - FALSE if the parameters match.
+ * - TRUE if parameters match for the local and remote sequences.
+ * - FALSE if parameters differ for the local and remote sequences.
*/
static bool
fetch_remote_sequence_data(WalReceiverConn *conn, Oid relid, Oid remoteid,
@@ -101,17 +100,17 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid
relid, Oid remoteid,
Oid seqtypid;
int64 seqstart;
int64 seqincrement;
- int64 seqmax;
int64 seqmin;
+ int64 seqmax;
bool seqcycle;
- bool seq_not_match = false;
+ bool seq_params_match;
HeapTuple tup;
Form_pg_sequence seqform;
initStringInfo(&cmd);
appendStringInfo(&cmd,
"SELECT last_value, log_cnt,
is_called, page_lsn,\n"
- "seqtypid, seqstart, seqincrement,
seqmax, seqmin, seqcycle\n"
+ "seqtypid, seqstart, seqincrement,
seqmin, seqmax, seqcycle\n"
"FROM pg_sequence_state(%d),
pg_sequence where seqrelid = %d",
remoteid, remoteid);
@@ -120,16 +119,16 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid
relid, Oid remoteid,
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
- (errmsg("could not receive sequence list from
the publisher: %s",
- res->err)));
+ errmsg("could not receive sequence list from
the publisher: %s",
+ res->err));
/* Process the sequence. */
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("sequence \"%s.%s\" not found on
publisher",
- nspname, relname)));
+ errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("sequence \"%s.%s\" not found on
publisher",
+ nspname, relname));
*last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
@@ -152,10 +151,10 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid
relid, Oid remoteid,
seqincrement = DatumGetInt64(slot_getattr(slot, 7, &isnull));
Assert(!isnull);
- seqmax = DatumGetInt64(slot_getattr(slot, 8, &isnull));
+ seqmin = DatumGetInt64(slot_getattr(slot, 8, &isnull));
Assert(!isnull);
- seqmin = DatumGetInt64(slot_getattr(slot, 9, &isnull));
+ seqmax = DatumGetInt64(slot_getattr(slot, 9, &isnull));
Assert(!isnull);
seqcycle = DatumGetBool(slot_getattr(slot, 10, &isnull));
@@ -169,16 +168,17 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid
relid, Oid remoteid,
seqform = (Form_pg_sequence) GETSTRUCT(tup);
- if (seqform->seqtypid != seqtypid || seqform->seqmin != seqmin ||
- seqform->seqmax != seqmax || seqform->seqstart != seqstart ||
- seqform->seqincrement != seqincrement || seqform->seqcycle !=
seqcycle)
- seq_not_match = true;
+ seq_params_match = seqform->seqtypid == seqtypid &&
+ seqform->seqmin == seqmin && seqform->seqmax == seqmax &&
+ seqform->seqcycle == seqcycle &&
+ seqform->seqstart == seqstart &&
+ seqform->seqincrement == seqincrement;
ReleaseSysCache(tup);
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
- return seq_not_match;
+ return seq_params_match;
}
/*
@@ -187,6 +187,9 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid
relid, Oid remoteid,
* Fetch the sequence value from the publisher and set the subscriber sequence
* with the same value. Caller is responsible for locking the local
* relation.
+ *
+ * The output parameter 'sequence_mismatch' indicates if a local/remote
+ * sequence parameter mismatch was detected.
*/
static XLogRecPtr
copy_sequence(WalReceiverConn *conn, Relation rel,
@@ -207,14 +210,15 @@ copy_sequence(WalReceiverConn *conn, Relation rel,
char *relname = RelationGetRelationName(rel);
Oid relid = RelationGetRelid(rel);
+ Assert(!*sequence_mismatch);
+
/* Fetch Oid. */
initStringInfo(&cmd);
- appendStringInfo(&cmd, "SELECT c.oid, c.relkind"
- " FROM pg_catalog.pg_class c"
- " INNER JOIN pg_catalog.pg_namespace
n"
- " ON (c.relnamespace = n.oid)"
- " WHERE n.nspname = %s"
- " AND c.relname = %s",
+ appendStringInfo(&cmd, "SELECT c.oid, c.relkind\n"
+ "FROM pg_catalog.pg_class c\n"
+ "INNER JOIN pg_catalog.pg_namespace
n\n"
+ " ON (c.relnamespace = n.oid)\n"
+ "WHERE n.nspname = %s AND c.relname =
%s",
quote_literal_cstr(nspname),
quote_literal_cstr(relname));
@@ -222,16 +226,16 @@ copy_sequence(WalReceiverConn *conn, Relation rel,
lengthof(tableRow), tableRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("sequence \"%s.%s\" info could not be
fetched from publisher: %s",
- nspname, relname, res->err)));
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("sequence \"%s.%s\" info could not be
fetched from publisher: %s",
+ nspname, relname, res->err));
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("sequence \"%s.%s\" not found on
publisher",
- nspname, relname)));
+ errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("sequence \"%s.%s\" not found on
publisher",
+ nspname, relname));
remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
@@ -242,7 +246,7 @@ copy_sequence(WalReceiverConn *conn, Relation rel,
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
- *sequence_mismatch = fetch_remote_sequence_data(conn, relid, remoteid,
+ *sequence_mismatch = !fetch_remote_sequence_data(conn, relid, remoteid,
nspname, relname,
&seq_log_cnt, &seq_is_called,
&seq_page_lsn, &seq_last_value);
@@ -255,12 +259,12 @@ copy_sequence(WalReceiverConn *conn, Relation rel,
}
/*
- * report_sequence_mismatch
+ * report_mismatched_sequences
*
- * Records details of sequence mismatches as a warning.
+ * Report any sequence mismatches as a single warning log.
*/
static void
-report_sequence_mismatch(StringInfo warning_sequences)
+report_mismatched_sequences(StringInfo warning_sequences)
{
if (warning_sequences->len)
{
@@ -269,6 +273,7 @@ report_sequence_mismatch(StringInfo warning_sequences)
errmsg("parameters differ for the remote and
local sequences (%s) for subscription \"%s\"",
warning_sequences->data,
MySubscription->name),
errhint("Alter/Re-create local sequences to
have the same parameters as the remote sequences."));
+
resetStringInfo(warning_sequences);
}
}
@@ -280,14 +285,14 @@ report_sequence_mismatch(StringInfo warning_sequences)
* and subscriber to the warning_sequences string.
*/
static void
-append_mismatched_sequences(StringInfo warning_sequences, Relation rel)
+append_mismatched_sequences(StringInfo warning_sequences, Relation seqrel)
{
if (warning_sequences->len)
appendStringInfoString(warning_sequences, ", ");
appendStringInfo(warning_sequences, "\"%s.%s\"",
-
get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel));
+
get_namespace_name(RelationGetNamespace(seqrel)),
+ RelationGetRelationName(seqrel));
}
/*
@@ -355,15 +360,15 @@ LogicalRepSyncSequences(void)
slotname, &err);
if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("could not connect to the publisher:
%s", err)));
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher:
%s", err));
seq_count = list_length(sequences_not_synced);
foreach_ptr(SubscriptionRelState, seqinfo, sequences_not_synced)
{
Relation sequence_rel;
XLogRecPtr sequence_lsn;
- bool sequence_mismatch;
+ bool sequence_mismatch = false;
CHECK_FOR_INTERRUPTS();
@@ -422,7 +427,7 @@ LogicalRepSyncSequences(void)
if (sequence_mismatch)
append_mismatched_sequences(warning_sequences,
sequence_rel);
- report_sequence_mismatch(warning_sequences);
+ report_mismatched_sequences(warning_sequences);
PG_RE_THROW();
}
PG_END_TRY();
@@ -444,11 +449,9 @@ LogicalRepSyncSequences(void)
if (((curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) ||
curr_seq == seq_count)
{
- /* Obtain the starting index of the current batch. */
- int i = (curr_seq - 1) - ((curr_seq
- 1) % MAX_SEQUENCES_SYNC_PER_BATCH);
-
/* LOG all the sequences synchronized during current
batch. */
- for (; i < curr_seq; i++)
+ for (int i = (curr_seq - 1) - ((curr_seq - 1) %
MAX_SEQUENCES_SYNC_PER_BATCH);
+ i < curr_seq; i++)
{
SubscriptionRelState *done_seq;
@@ -459,7 +462,7 @@ LogicalRepSyncSequences(void)
get_subscription_name(subid, false), get_rel_name(done_seq->relid)));
}
- report_sequence_mismatch(warning_sequences);
+ report_mismatched_sequences(warning_sequences);
ereport(LOG,
errmsg("logical replication
synchronized %d of %d sequences for subscription \"%s\" ",
@@ -469,7 +472,6 @@ LogicalRepSyncSequences(void)
CommitTransactionCommand();
start_txn = true;
}
-
}
list_free_deep(sequences_not_synced);
@@ -554,7 +556,7 @@ process_syncing_sequences_for_apply(void)
foreach_ptr(SubscriptionRelState, rstate, sequence_states_not_ready)
{
LogicalRepWorker *syncworker;
- int nsyncworkers;
+ int nsyncworkers;
if (!started_tx)
{