Hi, here are some review comments for patch v18-0001.
======
doc/src/sgml/protocol.sgml
nitpick - Although it is no fault of your patch, IMO it would be nicer for
the TWO_PHASE description (of CREATE REPLICATION SLOT) to also be in the
same consistent order as what you have (e.g. below FAILOVER). So I moved it.
======
src/backend/access/transam/twophase.c
LookupGXactBySubid:
nitpick - add a blank line before return
======
src/backend/commands/subscriptioncmds.c
CommonChecksForFailoverAndTwophase:
nitpick - added Assert for the generic-looking "option" parameter name
nitpick - modified comment about transaction block
~~~
1. AlterSubscription
+ * Workers may still survive even if the subscription has
+ * been disabled. They may read the pg_subscription
+ * catalog and detect that the twophase parameter is
+ * updated, which causes the assertion failure. Ensure
+ * workers have already been exited to avoid it.
"which causes the assertion failure" -- what assertion failure is that? The
comment is not very clear.
~
nitpick - in comment /twophase/two_phase/
nitpick - typo /acoordingly/accordingly/
======
src/backend/replication/logical/launcher.c
logicalrep_workers_find:
nitpick - /require_lock/acquire_lock/
nitpick - take the Assert out of the else.
======
src/backend/replication/slot.c
nitpick - refactor the code to check (failover) only one time. See the
nitpicks attachment.
~
2. ParseAlterReplSlotOptions
nitpick -- IMO the ParseAlterReplSlotOptions(). function does more harm
than good here by adding the unnecessary complexity of messing around with
multiple parameters that are passed-by-reference. All this would be simpler
if it was just coded inline in the AlterReplicationSlot() function, which
is the only caller. I've refactored all this to demonstrate (see nitpicks
attachment)
======
src/include/replication/worker_internal.h
nitpick - /require_lock/acquire_lock/
======
src/test/regress/sql/subscription.sql
nitpick - tweak comments
======
src/test/subscription/t/021_twophase.pl
nitpick - change comment style to indicate each test part better.
======
99.
Please also see the attached diffs patch which implements any nitpicks
mentioned above.
======
Kind Regards,
Peter Smith.
Fujitsu Australia
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 3ac4a4b..cba6661 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2050,21 +2050,6 @@ psql "dbname=postgres replication=database" -c
"IDENTIFY_SYSTEM;"
<variablelist>
<varlistentry>
- <term><literal>TWO_PHASE [ <replaceable
class="parameter">boolean</replaceable> ]</literal></term>
- <listitem>
- <para>
- If true, this logical replication slot supports decoding of two-phase
- commit. With this option, commands related to two-phase commit such
as
- <literal>PREPARE TRANSACTION</literal>, <literal>COMMIT
PREPARED</literal>
- and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
- The transaction will be decoded and transmitted at
- <literal>PREPARE TRANSACTION</literal> time.
- The default is false.
- </para>
- </listitem>
- </varlistentry>
-
- <varlistentry>
<term><literal>RESERVE_WAL [ <replaceable
class="parameter">boolean</replaceable> ]</literal></term>
<listitem>
<para>
@@ -2104,6 +2089,21 @@ psql "dbname=postgres replication=database" -c
"IDENTIFY_SYSTEM;"
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><literal>TWO_PHASE [ <replaceable
class="parameter">boolean</replaceable> ]</literal></term>
+ <listitem>
+ <para>
+ If true, this logical replication slot supports decoding of two-phase
+ commit. With this option, commands related to two-phase commit such
as
+ <literal>PREPARE TRANSACTION</literal>, <literal>COMMIT
PREPARED</literal>
+ and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
+ The transaction will be decoded and transmitted at
+ <literal>PREPARE TRANSACTION</literal> time.
+ The default is false.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
<para>
diff --git a/src/backend/access/transam/twophase.c
b/src/backend/access/transam/twophase.c
index 35bce68..f3c6e1f 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2741,5 +2741,6 @@ LookupGXactBySubid(Oid subid)
}
}
LWLockRelease(TwoPhaseStateLock);
+
return found;
}
diff --git a/src/backend/commands/subscriptioncmds.c
b/src/backend/commands/subscriptioncmds.c
index 6995a62..3703cf6 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1076,6 +1076,8 @@ CheckAlterSubOption(Subscription *sub, const char
*option, bool isTopLevel)
{
StringInfoData cmd;
+ Assert(strstr("two_phase,failover", option));
+
if (!sub->slotname)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1098,8 +1100,8 @@ CheckAlterSubOption(Subscription *sub, const char
*option, bool isTopLevel)
appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
/*
- * The changed option of the slot can't be rolled back: prevent we are
in
- * the transaction state.
+ * The changed option of the slot can't be rolled back, so disallow if
we
+ * are in a transaction block.
*/
PreventInTransactionBlock(isTopLevel, cmd.data);
@@ -1282,7 +1284,7 @@ AlterSubscription(ParseState *pstate,
AlterSubscriptionStmt *stmt,
/*
* Workers may still survive even if
the subscription has
* been disabled. They may read the
pg_subscription
- * catalog and detect that the twophase
parameter is
+ * catalog and detect that the
two_phase parameter is
* updated, which causes the assertion
failure. Ensure
* workers have already been exited to
avoid it.
*/
@@ -1304,7 +1306,7 @@ AlterSubscription(ParseState *pstate,
AlterSubscriptionStmt *stmt,
errmsg("cannot
disable two_phase when prepared transactions are present"),
errhint("Resolve these transactions and try again.")));
- /* Change system catalog acoordingly */
+ /* Change system catalog accordingly */
values[Anum_pg_subscription_subtwophasestate - 1] =
CharGetDatum(opts.twophase ?
LOGICALREP_TWOPHASE_STATE_PENDING :
diff --git a/src/backend/replication/logical/launcher.c
b/src/backend/replication/logical/launcher.c
index 45744b7..c566d50 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -272,15 +272,15 @@ logicalrep_worker_find(Oid subid, Oid relid, bool
only_running)
* the subscription, instead of just one.
*/
List *
-logicalrep_workers_find(Oid subid, bool only_running, bool require_lock)
+logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
{
int i;
List *res = NIL;
- if (require_lock)
+ if (acquire_lock)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- else
- Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+ Assert(LWLockHeldByMe(LogicalRepWorkerLock));
/* Search for attached worker for a given subscription id. */
for (i = 0; i < max_logical_replication_workers; i++)
@@ -291,7 +291,7 @@ logicalrep_workers_find(Oid subid, bool only_running, bool
require_lock)
res = lappend(res, w);
}
- if (require_lock)
+ if (acquire_lock)
LWLockRelease(LogicalRepWorkerLock);
return res;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2f167a2..e75f24b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -842,23 +842,25 @@ ReplicationSlotAlter(const char *name, bool *failover,
bool *two_phase)
" on the standby"));
}
- /*
- * Do not allow users to enable failover for temporary slots as we do
not
- * support syncing temporary slots to the standby.
- */
- if (failover && *failover &&
- MyReplicationSlot->data.persistency == RS_TEMPORARY)
+ if (failover)
+ {
+ /*
+ * Do not allow users to enable failover for temporary slots as
we do not
+ * support syncing temporary slots to the standby.
+ */
+ if (*failover && MyReplicationSlot->data.persistency ==
RS_TEMPORARY)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot enable failover for a temporary
replication slot"));
- if (failover && MyReplicationSlot->data.failover != *failover)
- {
- SpinLockAcquire(&MyReplicationSlot->mutex);
- MyReplicationSlot->data.failover = *failover;
- SpinLockRelease(&MyReplicationSlot->mutex);
+ if (MyReplicationSlot->data.failover != *failover)
+ {
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->data.failover = *failover;
+ SpinLockRelease(&MyReplicationSlot->mutex);
- update_slot = true;
+ update_slot = true;
+ }
}
if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index 04f65e0..af8e958 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1405,56 +1405,42 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
}
+
/*
- * Process extra options given to ALTER_REPLICATION_SLOT.
+ * Change the definition of a replication slot.
*/
static void
-ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd,
- bool *failover_given, bool
*failover,
- bool *two_phase_given, bool
*two_phase)
+AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
{
- *failover_given = false;
- *two_phase_given = false;
+ bool failover_given = false;
+ bool two_phase_given = false;
+ bool failover;
+ bool two_phase;
/* Parse options */
foreach_ptr(DefElem, defel, cmd->options)
{
if (strcmp(defel->defname, "failover") == 0)
{
- if (*failover_given)
+ if (failover_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or
redundant options")));
- *failover_given = true;
- *failover = defGetBoolean(defel);
+ failover_given = true;
+ failover = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "two_phase") == 0)
{
- if (*two_phase_given)
+ if (two_phase_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or
redundant options")));
- *two_phase_given = true;
- *two_phase = defGetBoolean(defel);
+ two_phase_given = true;
+ two_phase = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
-}
-
-/*
- * Change the definition of a replication slot.
- */
-static void
-AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
-{
- bool failover_given;
- bool two_phase_given;
- bool failover;
- bool two_phase;
-
- ParseAlterReplSlotOptions(cmd, &failover_given, &failover,
- &two_phase_given,
&two_phase);
ReplicationSlotAlter(cmd->slotname,
failover_given ? &failover :
NULL,
diff --git a/src/include/replication/worker_internal.h
b/src/include/replication/worker_internal.h
index 990f524..9646261 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -241,7 +241,7 @@ extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
extern List *logicalrep_workers_find(Oid subid, bool only_running,
- bool
require_lock);
+ bool
acquire_lock);
extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
Oid
dbid, Oid subid, const char *subname,
Oid
userid, Oid relid,
diff --git a/src/test/regress/expected/subscription.out
b/src/test/regress/expected/subscription.out
index 51fa4b9..40e1a07 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -377,7 +377,7 @@ HINT: To initiate replication, you must manually create
the replication slot, e
regress_testsub | regress_subscription_user | f | {testpub} | f
| off | p | f | any | t
| f | f | off | dbname=regress_doesnotexist |
0/0
(1 row)
--- We can alter streaming when two_phase enabled
+-- we can alter streaming when two_phase is enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
List of subscriptions
diff --git a/src/test/regress/sql/subscription.sql
b/src/test/regress/sql/subscription.sql
index a3886d7..b64f419 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -256,7 +256,7 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION
'dbname=regress_doesnotexist' PUB
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist'
PUBLICATION testpub WITH (connect = false, two_phase = true);
\dRs+
--- We can alter streaming when two_phase enabled
+-- we can alter streaming when two_phase is enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
diff --git a/src/test/subscription/t/021_twophase.pl
b/src/test/subscription/t/021_twophase.pl
index 4e8f627..66265c7 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -371,8 +371,8 @@ is($result, qq(2), 'replicated data in subscriber table');
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
###############################
-# Disable the subscription and alter it to two_phase = false,
-# then verify that the altered subscription reflects the two_phase option.
+# Alter the subscription to two_phase = false.
+# Verify that the altered subscription reflects the two_phase option.
###############################
# Alter subscription two_phase to false
@@ -395,7 +395,10 @@ $result = $node_subscriber->safe_psql('postgres',
);
is($result, qq(d), 'two-phase should be disabled');
-# Now do a prepare on the publisher and make sure that it is not replicated.
+###############################
+# Now do a prepare on the publisher.
+# Verify that it is not replicated.
+###############################
$node_publisher->safe_psql(
'postgres', qq{
BEGIN;
@@ -411,7 +414,10 @@ $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'should be no prepared transactions on subscriber');
-# Now commit the insert and verify that it is replicated
+###############################
+# Now commit the insert.
+# Verify that it is replicated.
+###############################
$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'newgid';");
# Wait for the subscriber to catchup
@@ -422,7 +428,10 @@ $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
is($result, qq(3), 'replicated data in subscriber table');
-# Alter subscription two_phase to true
+###############################
+# Alter the subscription to two_phase = true.
+# Verify that the altered subscription reflects the two_phase option.
+###############################
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub_copy DISABLE;");
$node_subscriber->poll_query_until('postgres',