On Sun, Jan 22, 2023, at 9:42 AM, Takamichi Osumi (Fujitsu) wrote:
> On Saturday, January 21, 2023 3:36 AM I wrote:
> > Kindly have a look at the patch v18.
> I've conducted some refactoring for v18.
> Now the latest patch should be tidier and
> the comments would be clearer and more aligned as a whole.
>
> Attached the updated patch v19.
[I haven't been following this thread for a long time...]
Good to know that you keep improving this patch. I have a few suggestions that
were easier to provide a patch on top of your latest patch than to provide an
inline suggestions.
There are a few documentation polishing. Let me comment some of them above.
- The length of time (ms) to delay the application of changes.
+ Total time spent delaying the application of changes, in milliseconds
I don't remember if I suggested this description for catalog but IMO the
suggestion reads better for me.
- For time-delayed logical replication (i.e. when the subscription is
- created with parameter min_apply_delay > 0), the apply worker sends a
- Standby Status Update message to the publisher with a period of
- <literal>wal_receiver_status_interval</literal>. Make sure to set
- <literal>wal_receiver_status_interval</literal> less than the
- <literal>wal_sender_timeout</literal> on the publisher, otherwise, the
- walsender will repeatedly terminate due to the timeout errors. If
- <literal>wal_receiver_status_interval</literal> is set to zero, the
apply
- worker doesn't send any feedback messages during the subscriber's
- <literal>min_apply_delay</literal> period. See
- <xref linkend="sql-createsubscription"/> for details.
+ For time-delayed logical replication, the apply worker sends a feedback
+ message to the publisher every
+ <varname>wal_receiver_status_interval</varname> milliseconds. Make sure
+ to set <varname>wal_receiver_status_interval</varname> less than the
+ <varname>wal_sender_timeout</varname> on the publisher, otherwise, the
+ <literal>walsender</literal> will repeatedly terminate due to timeout
+ error. If <varname>wal_receiver_status_interval</varname> is set to
+ zero, the apply worker doesn't send any feedback messages during the
+ <literal>min_apply_delay</literal> interval.
I removed the parenthesis explanation about time-delayed logical replication.
If you are reading the documentation and does not know what it means you should
(a) read the logical replication chapter or (b) check the glossary (maybe a new
entry should be added). I also removed the Standby status Update message but it
is a low level detail; let's refer to it as feedback message as the other
sentences do. I changed "literal" to "varname" that's the correct tag for
parameters. I replace "period" with "interval" that was the previous
terminology. IMO we should be uniform, use one or the other.
- The subscriber replication can be instructed to lag behind the publisher
- side changes by specifying the <literal>min_apply_delay</literal>
- subscription parameter. See <xref linkend="sql-createsubscription"/> for
- details.
+ A logical replication subscription can delay the application of changes by
+ specifying the <literal>min_apply_delay</literal> subscription parameter.
+ See <xref linkend="sql-createsubscription"/> for details.
This feature refers to a specific subscription, hence, "logical replication
subscription" instead of "subscriber replication".
+ if (IsSet(opts->specified_opts, SUBOPT_MIN_APPLY_DELAY))
+ errorConflictingDefElem(defel, pstate);
+
Peter S referred to this missing piece of code too.
-int
+static int
defGetMinApplyDelay(DefElem *def)
{
It seems you forgot static keyword.
- elog(DEBUG2, "time-delayed replication for txid %u, min_apply_delay =
%lld ms, Remaining wait time: %ld ms",
- xid, (long long) MySubscription->minapplydelay, diffms);
+ elog(DEBUG2, "time-delayed replication for txid %u, min_apply_delay = "
INT64_FORMAT " ms, remaining wait time: %ld ms",
+ xid, MySubscription->minapplydelay, diffms);
int64 should use format modifier INT64_FORMAT.
- (long) wal_receiver_status_interval * 1000,
+ wal_receiver_status_interval * 1000L,
Cast is not required. I added a suffix to the constant.
- elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush
%X/%X in-delayed: %d",
+ elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush
%X/%X, apply delay: %s",
force,
LSN_FORMAT_ARGS(recvpos),
LSN_FORMAT_ARGS(writepos),
LSN_FORMAT_ARGS(flushpos),
- in_delayed_apply);
+ in_delayed_apply? "yes" : "no");
It is better to use a string to represent the yes/no option.
- gettext_noop("Min apply delay (ms)"));
+ gettext_noop("Min apply delay"));
I don't know if it was discussed but we don't add units to headers. When I
think about this parameter representation (internal and external), I decided to
use the previous code because it provides a unit for external representation. I
understand that using the same representation as recovery_min_apply_delay is
good but the current code does not handle the external representation
accordingly. (recovery_min_apply_delay uses the GUC machinery to adds the unit
but for min_apply_delay, it doesn't).
# Setup for streaming case
-$node_publisher->append_conf('postgres.conf',
+$node_publisher->append_conf('postgresql.conf',
'logical_decoding_mode = immediate');
$node_publisher->reload;
Fix configuration file name.
Maybe tests should do a better job. I think check_apply_delay_time is fragile
because it does not guarantee that time is not shifted. Time-delayed
replication is a subscriber feature and to check its correctness it should
check the logs.
# Note that we cannot call check_apply_delay_log() here because there is a
# possibility that the delay is skipped. The event happens when the WAL
# replication between publisher and subscriber is delayed due to a mechanical
# problem. The log output will be checked later - substantial delay-time case.
If you might not use the logs for it, it should adjust the min_apply_delay, no?
It does not exercise the min_apply_delay vs parallel streaming mode.
+ /*
+ * The combination of parallel streaming mode and
+ * min_apply_delay is not allowed.
+ */
+ if (opts.streaming == LOGICALREP_STREAM_PARALLEL)
+ if ((IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY)
&& opts.min_apply_delay > 0) ||
+ (!IsSet(opts.specified_opts,
SUBOPT_MIN_APPLY_DELAY) && sub->minapplydelay > 0))
+ ereport(ERROR,
+
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot enable %s mode for
subscription with %s",
+ "streaming = parallel",
"min_apply_delay"));
+
Is this code correct? I also didn't like this message. "cannot enable streaming
= parallel mode for subscription with min_apply_delay" is far from a good error
message. How about refer parallelism to "parallel streaming mode".
--
Euler Taveira
EDB https://www.enterprisedb.com/
From 5024325284ee3b4a4dc0a6a1cc6457ed5608cb46 Mon Sep 17 00:00:00 2001
From: Euler Taveira <[email protected]>
Date: Mon, 23 Jan 2023 15:52:55 -0300
Subject: [PATCH] Euler's review
---
doc/src/sgml/catalogs.sgml | 2 +-
doc/src/sgml/config.sgml | 20 ++-
doc/src/sgml/logical-replication.sgml | 7 +-
doc/src/sgml/ref/create_subscription.sgml | 13 +-
src/backend/commands/subscriptioncmds.c | 13 +-
src/backend/replication/logical/worker.c | 40 +++---
src/bin/psql/describe.c | 2 +-
src/test/regress/expected/subscription.out | 160 ++++++++++-----------
src/test/subscription/t/032_apply_delay.pl | 8 +-
9 files changed, 133 insertions(+), 132 deletions(-)
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index bf3c05241c..0bdb683296 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7878,7 +7878,7 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l
<structfield>subminapplydelay</structfield> <type>int8</type>
</para>
<para>
- The length of time (ms) to delay the application of changes.
+ Total time spent delaying the application of changes, in milliseconds
</para></entry>
</row>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 39244bf64a..a15723d74f 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4788,17 +4788,15 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
command line.
</para>
<para>
- For time-delayed logical replication (i.e. when the subscription is
- created with parameter min_apply_delay > 0), the apply worker sends a
- Standby Status Update message to the publisher with a period of
- <literal>wal_receiver_status_interval</literal>. Make sure to set
- <literal>wal_receiver_status_interval</literal> less than the
- <literal>wal_sender_timeout</literal> on the publisher, otherwise, the
- walsender will repeatedly terminate due to the timeout errors. If
- <literal>wal_receiver_status_interval</literal> is set to zero, the apply
- worker doesn't send any feedback messages during the subscriber's
- <literal>min_apply_delay</literal> period. See
- <xref linkend="sql-createsubscription"/> for details.
+ For time-delayed logical replication, the apply worker sends a feedback
+ message to the publisher every
+ <varname>wal_receiver_status_interval</varname> milliseconds. Make sure
+ to set <varname>wal_receiver_status_interval</varname> less than the
+ <varname>wal_sender_timeout</varname> on the publisher, otherwise, the
+ <literal>walsender</literal> will repeatedly terminate due to timeout
+ error. If <varname>wal_receiver_status_interval</varname> is set to
+ zero, the apply worker doesn't send any feedback messages during the
+ <literal>min_apply_delay</literal> interval.
</para>
</listitem>
</varlistentry>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 863af11a47..d8ae93f88d 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -248,10 +248,9 @@
</para>
<para>
- The subscriber replication can be instructed to lag behind the publisher
- side changes by specifying the <literal>min_apply_delay</literal>
- subscription parameter. See <xref linkend="sql-createsubscription"/> for
- details.
+ A logical replication subscription can delay the application of changes by
+ specifying the <literal>min_apply_delay</literal> subscription parameter.
+ See <xref linkend="sql-createsubscription"/> for details.
</para>
<sect2 id="logical-replication-subscription-slot">
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 76ee9c0b3d..97ca9f8d9e 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -378,10 +378,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para>
<warning>
<para>
- Delaying the replication can mean there is a much longer time between making
- a change on the publisher, and that change being committed on the subscriber.
- This can impact the performance of synchronous replication.
- See <xref linkend="guc-synchronous-commit"/>.
+ Delaying the replication can mean there is a much longer time
+ between making a change on the publisher, and that change being
+ committed on the subscriber. This can impact the performance of
+ synchronous replication. See <xref linkend="guc-synchronous-commit"/>
+ parameter.
</para>
</warning>
</listitem>
@@ -452,8 +453,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para>
<para>
- A non-zero <literal>min_apply_delay</literal> parameter is not allowed when streaming
- in parallel mode.
+ A non-zero <literal>min_apply_delay</literal> parameter is not allowed when
+ streaming in parallel mode.
</para>
<para>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 11e9e9160a..d5fa7a95a9 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -331,6 +331,9 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
else if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) &&
strcmp(defel->defname, "min_apply_delay") == 0)
{
+ if (IsSet(opts->specified_opts, SUBOPT_MIN_APPLY_DELAY))
+ errorConflictingDefElem(defel, pstate);
+
opts->specified_opts |= SUBOPT_MIN_APPLY_DELAY;
opts->min_apply_delay = defGetMinApplyDelay(defel);
}
@@ -2261,11 +2264,11 @@ defGetStreamingMode(DefElem *def)
/*
- * Extract the min_apply_delay mode value from a DefElem. This is very similar
- * to PGC_INT case of parse_and_validate_value(), because min_apply_delay
+ * Extract the min_apply_delay value from a DefElem. This is very similar to
+ * parse_and_validate_value() for integer values, because min_apply_delay
* accepts the same string as recovery_min_apply_delay.
*/
-int
+static int
defGetMinApplyDelay(DefElem *def)
{
char *value;
@@ -2294,8 +2297,8 @@ defGetMinApplyDelay(DefElem *def)
hintmsg ? errhint("%s", _(hintmsg)) : 0));
/*
- * Check lower bound. parse_int() has been already confirmed that result
- * is equal to or smaller than INT_MAX.
+ * Check lower bound. parse_int() has already been confirmed that result
+ * is less than or equal to INT_MAX.
*/
if (result < 0)
ereport(ERROR,
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index eeac69ea13..00fe29fc20 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -320,11 +320,11 @@ bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
/*
- * In order to avoid walsender timeout for time-delayed replication the worker
- * process keeps sending feedback messages during the delay period.
- * Meanwhile, the feature delays the apply before starting the
- * transaction and thus we don't write WALs for the suspended changes during
- * the wait. When the worker process sends a feedback message
+ * In order to avoid walsender timeout for time-delayed logical replication the
+ * apply worker keeps sending feedback messages during the delay interval.
+ * Meanwhile, the feature delays the apply before the start of the
+ * transaction and thus we don't write WAL records for the suspended changes during
+ * the wait. When the apply worker sends a feedback message
* during the delay, we should not make positions of the flushed and apply LSN
* overwritten by the last received latest LSN. See send_feedback() for details.
*/
@@ -1090,20 +1090,20 @@ maybe_delay_apply(TransactionId xid, TimestampTz finish_ts)
if (diffms <= 0)
break;
- elog(DEBUG2, "time-delayed replication for txid %u, min_apply_delay = %lld ms, Remaining wait time: %ld ms",
- xid, (long long) MySubscription->minapplydelay, diffms);
+ elog(DEBUG2, "time-delayed replication for txid %u, min_apply_delay = " INT64_FORMAT " ms, remaining wait time: %ld ms",
+ xid, MySubscription->minapplydelay, diffms);
/*
* Call send_feedback() to prevent the publisher from exiting by
* timeout during the delay, when wal_receiver_status_interval is
* available.
*/
- if (wal_receiver_status_interval > 0
- && diffms > wal_receiver_status_interval * 1000)
+ if (wal_receiver_status_interval > 0 &&
+ diffms > wal_receiver_status_interval * 1000L)
{
WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
- (long) wal_receiver_status_interval * 1000,
+ wal_receiver_status_interval * 1000L,
WAIT_EVENT_RECOVERY_APPLY_DELAY);
send_feedback(last_received, true, false, true);
}
@@ -2135,8 +2135,8 @@ ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
/*
* Common spoolfile processing.
*
- * The commit/prepare time for streaming transaction is required to achieve
- * time-delayed replication.
+ * The commit/prepare time (finish_ts) for streamed transactions is required
+ * for time-delayed logical replication.
*/
void
apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
@@ -3869,12 +3869,12 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply, bool in_delayed
* No outstanding transactions to flush, we can report the latest received
* position. This is important for synchronous replication.
*
- * If the subscriber side apply is delayed (because of time-delayed
- * replication) then do not tell the publisher that the received latest
- * LSN is already applied and flushed, otherwise, it leads to the
- * publisher side making a wrong assumption of logical replication
- * progress. Instead, we just send a feedback message to avoid a publisher
- * timeout during the delay.
+ * If the logical replication subscription is delayed (min_apply_delay
+ * parameter) then do not inform the publisher that the received latest LSN
+ * is already applied and flushed, otherwise, the publisher will make a
+ * wrong assumption about the logical replication progress. Instead, it
+ * just sends a feedback message to avoid a replication timeout during the
+ * delay.
*/
if (!have_pending_txes && !in_delayed_apply)
flushpos = writepos = recvpos;
@@ -3913,12 +3913,12 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply, bool in_delayed
pq_sendint64(reply_message, now); /* sendTime */
pq_sendbyte(reply_message, requestReply); /* replyRequested */
- elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X in-delayed: %d",
+ elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X, apply delay: %s",
force,
LSN_FORMAT_ARGS(recvpos),
LSN_FORMAT_ARGS(writepos),
LSN_FORMAT_ARGS(flushpos),
- in_delayed_apply);
+ in_delayed_apply? "yes" : "no");
walrcv_send(LogRepWorkerWalRcvConn,
reply_message->data, reply_message->len);
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 8a27063bed..81d4607a1c 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6533,7 +6533,7 @@ describeSubscriptions(const char *pattern, bool verbose)
", suborigin AS \"%s\"\n"
", subminapplydelay AS \"%s\"\n",
gettext_noop("Origin"),
- gettext_noop("Min apply delay (ms)"));
+ gettext_noop("Min apply delay"));
appendPQExpBuffer(&buf,
", subsynccommit AS \"%s\"\n"
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 1230bcb096..977f73fe9b 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@@ -135,10 +135,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -155,10 +155,10 @@ ERROR: unrecognized subscription parameter: "create_slot"
-- ok
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist2 | 0/12345
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist2 | 0/12345
(1 row)
-- ok - with lsn = NONE
@@ -167,10 +167,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
ERROR: invalid WAL location (LSN): 0/0
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist2 | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
BEGIN;
@@ -202,10 +202,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | local | dbname=regress_doesnotexist2 | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | local | dbname=regress_doesnotexist2 | 0/0
(1 row)
-- rename back to keep the rest simple
@@ -239,19 +239,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -263,27 +263,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication already exists
@@ -298,10 +298,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication used more than once
@@ -316,10 +316,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -355,10 +355,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
--fail - alter of two_phase option not supported.
@@ -367,10 +367,10 @@ ERROR: unrecognized subscription parameter: "two_phase"
-- but can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -380,10 +380,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -396,18 +396,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | 0 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | 0 | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -425,19 +425,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 123 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 123 | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- success -- min_apply_delay value with unit is converted into ms and stored as an integer
ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = '1 d');
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 86400000 | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 86400000 | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - alter subscription with streaming = parallel should fail when time-delayed replication is set
diff --git a/src/test/subscription/t/032_apply_delay.pl b/src/test/subscription/t/032_apply_delay.pl
index 37388e474f..1b6bc1ef80 100644
--- a/src/test/subscription/t/032_apply_delay.pl
+++ b/src/test/subscription/t/032_apply_delay.pl
@@ -16,7 +16,7 @@ sub check_apply_delay_log
{
my ($node_subscriber, $offset, $expected) = @_;
- my $log_location = $node_subscriber->wait_for_log(qr/time-delayed replication for txid (\d+), min_apply_delay = (\d+) ms, Remaining wait time: (\d+) ms/, $offset);
+ my $log_location = $node_subscriber->wait_for_log(qr/time-delayed replication for txid (\d+), min_apply_delay = (\d+) ms, remaining wait time: (\d+) ms/, $offset);
cmp_ok($log_location, '>', $offset,
"logfile contains triggered logical replication apply delay"
@@ -25,7 +25,7 @@ sub check_apply_delay_log
# Get the delay time from the server log
my $contents = slurp_file($node_subscriber->logfile, $offset);
$contents =~
- qr/time-delayed replication for txid (\d+), min_apply_delay = (\d+) ms, Remaining wait time: (\d+) ms/
+ qr/time-delayed replication for txid (\d+), min_apply_delay = (\d+) ms, remaining wait time: (\d+) ms/
or die "could not get the apply worker wait time";
my $logged_delay = $3;
@@ -87,7 +87,7 @@ $node_publisher->safe_psql('postgres',
my $appname = 'tap_sub';
-# Create a subscription that applies the trasaction after 50 milliseconds delay
+# Create a subscription that applies the transaction after 50 milliseconds delay
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (copy_data = off, min_apply_delay = '50ms', streaming = 'on')"
);
@@ -114,7 +114,7 @@ is($result, qq(2|1|2), 'check if the new rows were applied to subscriber');
check_apply_delay_time($node_publisher, $node_subscriber, '2', '0.05');
# Setup for streaming case
-$node_publisher->append_conf('postgres.conf',
+$node_publisher->append_conf('postgresql.conf',
'logical_decoding_mode = immediate');
$node_publisher->reload;
--
2.30.2