On Wed, Mar 23, 2022, at 6:19 PM, Euler Taveira wrote: > On Mon, Mar 21, 2022, at 10:09 PM, Euler Taveira wrote: >> On Mon, Mar 21, 2022, at 10:04 PM, Andres Freund wrote: >>> On 2022-03-20 21:40:40 -0300, Euler Taveira wrote: >>> > On Mon, Feb 28, 2022, at 9:18 PM, Euler Taveira wrote: >>> > > Long time, no patch. Here it is. I will provide documentation in the >>> > > next >>> > > version. I would appreciate some feedback. >>> > This patch is broken since commit >>> > 705e20f8550c0e8e47c0b6b20b5f5ffd6ffd9e33. I >>> > rebased it. >>> >>> This fails tests, specifically it seems psql crashes: >>> https://cirrus-ci.com/task/6592281292570624?logs=cores#L46 >> Yeah. I forgot to test this patch with cassert before sending it. :( I didn't >> send a new patch because there is another issue (with int128) that I'm >> currently reworking. I'll send another patch soon. > Here is another version after rebasing it. In this version I fixed the psql > issue and rewrote interval_to_ms function. >From the previous version, I added support for streamed transactions. For streamed transactions, the delay is applied during STREAM COMMIT message. That's ok if we add the delay before applying the spooled messages. Hence, we guarantee that the delay is applied *before* each transaction. The same logic is applied to prepared transactions. The delay is introduced before applying the spooled messages in STREAM PREPARE message.
Tests were refactored a bit. A test for streamed transaction was included too. Version 4 is attached. -- Euler Taveira EDB https://www.enterprisedb.com/
From 7dd7a3523ed8e7a3494e7ec25ddc0af8ed4cf4d3 Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Sat, 6 Nov 2021 11:31:10 -0300 Subject: [PATCH v4] Time-delayed logical replication subscriber Similar to physical replication, a time-delayed copy of the data for logical replication is useful for some scenarios (specially to fix errors that might cause data loss). If the subscriber sets min_apply_delay parameter, the logical replication worker will delay the transaction commit for min_apply_delay milliseconds. The delay is calculated between the WAL time stamp and the current time on the subscriber. The delay occurs only on WAL records for transaction begins. Regular and prepared transactions are covered. Streamed transactions are also covered. Author: Euler Taveira Discussion: https://postgr.es/m/CAB-JLwYOYwL=xtyaxkih5ctm_vm8kjkh7aaitckvmch4rzr...@mail.gmail.com --- doc/src/sgml/catalogs.sgml | 9 ++ doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 31 ++++- src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 46 ++++++- src/backend/replication/logical/worker.c | 82 ++++++++++++ src/backend/utils/adt/timestamp.c | 32 +++++ src/bin/pg_dump/pg_dump.c | 16 ++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 8 +- src/bin/psql/tab-complete.c | 7 +- src/include/catalog/pg_subscription.h | 3 + src/include/datatype/timestamp.h | 2 + src/include/utils/timestamp.h | 2 + src/test/regress/expected/subscription.out | 149 ++++++++++++--------- src/test/regress/sql/subscription.sql | 20 +++ src/test/subscription/t/032_apply_delay.pl | 110 +++++++++++++++ 18 files changed, 455 insertions(+), 71 deletions(-) create mode 100644 src/test/subscription/t/032_apply_delay.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 25b02c4e37..9b94b7aef2 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7833,6 +7833,15 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l </para></entry> </row> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>subapplydelay</structfield> <type>int8</type> + </para> + <para> + Delay the application of changes by a specified amount of time. + </para></entry> + </row> + <row> <entry role="catalog_table_entry"><para role="column_definition"> <structfield>subname</structfield> <type>name</type> diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 353ea5def2..ae9d625f9d 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -207,8 +207,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < information. The parameters that can be altered are <literal>slot_name</literal>, <literal>synchronous_commit</literal>, - <literal>binary</literal>, <literal>streaming</literal>, and - <literal>disable_on_error</literal>. + <literal>binary</literal>, <literal>streaming</literal>, + <literal>disable_on_error</literal>, and + <literal>min_apply_delay</literal>. </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 34b3264b26..ae80db8a3d 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -302,7 +302,36 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </para> </listitem> </varlistentry> - </variablelist></para> + + <varlistentry> + <term><literal>min_apply_delay</literal> (<type>integer</type>)</term> + <listitem> + <para> + By default, subscriber applies changes as soon as possible. Similar + to the physical replication feature + (<xref linkend="guc-recovery-min-apply-delay"/>), it may be useful to + have a time-delayed copy of data for logical replication. This + parameter allows you to delay the application of changes by a + specified amount of time. If this value is specified without units, + it is taken as milliseconds. The default is zero, adding no delay. + </para> + <para> + The delay occurs only on WAL records for transaction begins and after + the initial table synchronization. It is possible that the + replication delay between publisher and subscriber exceeds the value + of this parameter, in which case no delay is added. Note that the + delay is calculated between the WAL time stamp as written on + publisher and the current time on the subscriber. Delays in logical + decoding and in transfer the transaction may reduce the actual wait + time. If the system clocks on publisher and subscriber are not + synchronized, this may lead to apply changes earlier than expected. + This is not a major issue because a typical setting of this parameter + are much larger than typical time deviations between servers. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> </listitem> </varlistentry> diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 8856ce3b50..42915a2ea9 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -64,6 +64,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->oid = subid; sub->dbid = subform->subdbid; sub->skiplsn = subform->subskiplsn; + sub->applydelay = subform->subapplydelay; sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; sub->enabled = subform->subenabled; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fedaed533b..6610edb75f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1297,7 +1297,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, +GRANT SELECT (oid, subdbid, subskiplsn, subapplydelay, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index e2852286a7..2c5125f979 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -47,6 +47,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/syscache.h" +#include "utils/timestamp.h" /* * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION @@ -64,6 +65,7 @@ #define SUBOPT_TWOPHASE_COMMIT 0x00000200 #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_LSN 0x00000800 +#define SUBOPT_MIN_APPLY_DELAY 0x00001000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -87,6 +89,7 @@ typedef struct SubOpts bool twophase; bool disableonerr; XLogRecPtr lsn; + int64 min_apply_delay; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -292,12 +295,35 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) && + strcmp(defel->defname, "min_apply_delay") == 0) + { + char *val; + Interval *interval; + + if (IsSet(opts->specified_opts, SUBOPT_MIN_APPLY_DELAY)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_MIN_APPLY_DELAY; + val = defGetString(defel); + + interval = DatumGetIntervalP(DirectFunctionCall3(interval_in, + CStringGetDatum(val), + ObjectIdGetDatum(InvalidOid), + Int32GetDatum(-1))); + opts->min_apply_delay = interval_to_ms(interval); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized subscription parameter: \"%s\"", defel->defname))); } + if (opts->min_apply_delay < 0) + ereport(ERROR, + errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("option \"%s\" must not be negative", "min_apply_delay")); + /* * We've been explicitly asked to not connect, that requires some * additional processing. @@ -530,7 +556,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | - SUBOPT_DISABLE_ON_ERR); + SUBOPT_DISABLE_ON_ERR | SUBOPT_MIN_APPLY_DELAY); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -595,6 +621,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid); values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId); values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr); + values[Anum_pg_subscription_subapplydelay - 1] = Int64GetDatum(opts.min_apply_delay); values[Anum_pg_subscription_subname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); @@ -1070,6 +1097,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subdisableonerr - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY)) + { + values[Anum_pg_subscription_subapplydelay - 1] = + Int64GetDatum(opts.min_apply_delay); + replaces[Anum_pg_subscription_subapplydelay - 1] = true; + } update_tuple = true; break; @@ -1093,6 +1126,17 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (opts.enabled) ApplyLauncherWakeupAtCommit(); + /* + * If this subscription has been disabled and it has an apply + * delay set, wake up the logical replication worker to finish + * it as soon as possible. + */ + if (!opts.enabled && sub->applydelay > 0) + { + elog(DEBUG1, "subscription has been disabled"); + logicalrep_worker_wakeup(sub->oid, InvalidOid); + } + update_tuple = true; break; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 38e3b1c1b3..db1cc2477c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -252,6 +252,7 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL; Subscription *MySubscription = NULL; static bool MySubscriptionValid = false; +TimestampTz MySubscriptionMinApplyDelayUntil = 0; bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; @@ -324,6 +325,8 @@ static void maybe_reread_subscription(void); static void DisableSubscriptionAndExit(void); +static void apply_delay(TimestampTz ts); + /* prototype needed because of stream_commit */ static void apply_dispatch(StringInfo s); @@ -803,6 +806,53 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, ExecStoreVirtualTuple(slot); } +/* + * Apply the informed delay for the transaction. + * + * A regular transaction uses the commit time to calculate the delay. A + * prepared transaction uses the prepare time to calculate the delay. + */ +static void +apply_delay(TimestampTz ts) +{ + /* nothing to do if no delay set */ + if (MySubscription->applydelay <= 0) + return; + + /* set apply delay */ + MySubscriptionMinApplyDelayUntil = TimestampTzPlusMilliseconds(TimestampTzGetDatum(ts), + MySubscription->applydelay); + + while (true) + { + int diffms; + + diffms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), MySubscriptionMinApplyDelayUntil); + + /* + * Exit without arming the latch if it's already past time to apply + * this transaction. + */ + if (diffms <= 0) + break; + + elog(DEBUG2, "logical replication apply delay: %u ms", diffms); + + WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + diffms, + WAIT_EVENT_RECOVERY_APPLY_DELAY); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + } + + /* + * Delay applied. Reset state. + */ + MySubscriptionMinApplyDelayUntil = 0; +} + /* * Handle BEGIN message. */ @@ -814,6 +864,9 @@ apply_handle_begin(StringInfo s) logicalrep_read_begin(s, &begin_data); set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); + /* Should we delay the current transaction? */ + apply_delay(begin_data.committime); + remote_final_lsn = begin_data.final_lsn; maybe_start_skipping_changes(begin_data.final_lsn); @@ -868,6 +921,9 @@ apply_handle_begin_prepare(StringInfo s) logicalrep_read_begin_prepare(s, &begin_data); set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn); + /* Should we delay the current prepared transaction? */ + apply_delay(begin_data.prepare_time); + remote_final_lsn = begin_data.prepare_lsn; maybe_start_skipping_changes(begin_data.prepare_lsn); @@ -1090,6 +1146,19 @@ apply_handle_stream_prepare(StringInfo s) elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); + /* + * Should we delay the current prepared transaction? + * + * Although the delay is applied in BEGIN PREPARE messages, streamed + * prepared transactions apply the delay in a STREAM PREPARE message. + * That's ok because no changes have been applied yet + * (apply_spooled_messages() will do it). + * The STREAM START message does not contain a prepare time (it will be + * available when the in-progress prepared transaction finishes), hence, it + * was not possible to apply a delay at that time. + */ + apply_delay(prepare_data.prepare_time); + /* Replay all the spooled operations. */ apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn); @@ -1481,6 +1550,19 @@ apply_handle_stream_commit(StringInfo s) elog(DEBUG1, "received commit for streamed transaction %u", xid); + /* + * Should we delay the current transaction? + * + * Although the delay is applied in BEGIN messages, streamed transactions + * apply the delay in a STREAM COMMIT message. That's ok because no changes + * have been applied yet (apply_spooled_messages() will do it). + * The STREAM START message would be a natural choice for this delay but + * there is no commit time yet (it will be available when the in-progress + * transaction finishes), hence, it was not possible to apply a delay at + * that time. + */ + apply_delay(commit_data.committime); + apply_spooled_messages(xid, commit_data.commit_lsn); apply_handle_commit_internal(&commit_data); diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index f70f829d83..dc9f2d6677 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -2411,6 +2411,38 @@ interval_cmp_internal(Interval *interval1, Interval *interval2) return int128_compare(span1, span2); } +/* + * Given an Interval returns the number of milliseconds. + */ +int64 +interval_to_ms(const Interval *interval) +{ + int64 days; + int64 ms; + int64 result; + + days = interval->month * INT64CONST(30); + days += interval->day; + + /* + * The following operations use these special functions to detect overflow. + * Number of ms per informed days. + */ + if (pg_mul_s64_overflow(days, MSECS_PER_DAY, &result)) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("bigint out of range"))); + + /* adds portion time (in ms) to the previous result. */ + ms = interval->time / INT64CONST(1000); + if (pg_add_s64_overflow(result, ms, &result)) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("bigint out of range"))); + + return result; +} + Datum interval_eq(PG_FUNCTION_ARGS) { diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index c871cb727d..b86234ab94 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4410,6 +4410,7 @@ getSubscriptions(Archive *fout) int i_subsynccommit; int i_subpublications; int i_subbinary; + int i_subapplydelay; int i, ntups; @@ -4454,13 +4455,18 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 150000) appendPQExpBufferStr(query, " s.subtwophasestate,\n" - " s.subdisableonerr\n"); + " s.subdisableonerr,\n"); else appendPQExpBuffer(query, " '%c' AS subtwophasestate,\n" - " false AS subdisableonerr\n", + " false AS subdisableonerr,\n", LOGICALREP_TWOPHASE_STATE_DISABLED); + if (fout->remoteVersion >= 160000) + appendPQExpBufferStr(query, " s.subapplydelay\n"); + else + appendPQExpBufferStr(query, " 0 AS subapplydelay\n"); + appendPQExpBufferStr(query, "FROM pg_subscription s\n" "WHERE s.subdbid = (SELECT oid FROM pg_database\n" @@ -4486,6 +4492,7 @@ getSubscriptions(Archive *fout) i_substream = PQfnumber(res, "substream"); i_subtwophasestate = PQfnumber(res, "subtwophasestate"); i_subdisableonerr = PQfnumber(res, "subdisableonerr"); + i_subapplydelay = PQfnumber(res, "subapplydelay"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4515,6 +4522,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subtwophasestate)); subinfo[i].subdisableonerr = pg_strdup(PQgetvalue(res, i, i_subdisableonerr)); + subinfo[i].subapplydelay = + strtoi64(PQgetvalue(res, i, i_subapplydelay), NULL, 10); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4591,6 +4600,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); + if (subinfo->subapplydelay > 0) + appendPQExpBuffer(query, ", min_apply_delay = '" INT64_FORMAT " ms'", subinfo->subapplydelay); + appendPQExpBufferStr(query, ");\n"); if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 1d21c2906f..3017273c7b 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -660,6 +660,7 @@ typedef struct _SubscriptionInfo char *subtwophasestate; char *subdisableonerr; char *subsynccommit; + int64 subapplydelay; char *subpublications; } SubscriptionInfo; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 88d92a08ae..c7e93c555f 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6351,7 +6351,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6393,6 +6393,12 @@ describeSubscriptions(const char *pattern, bool verbose) gettext_noop("Two-phase commit"), gettext_noop("Disable on error")); + /* min_apply_delay is only supported in v16 and higher */ + if (pset.sversion >= 160000) + appendPQExpBuffer(&buf, + ", subapplydelay AS \"%s\"\n", + gettext_noop("Apply delay")); + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index c5cafe6f4b..3e012f5162 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1873,7 +1873,8 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION <name> SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) - COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error"); + COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", + "disable_on_error", "min_apply_delay"); /* ALTER SUBSCRIPTION <name> SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3152,8 +3153,8 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION <name> ... WITH ( <opt>" */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "enabled", "slot_name", "streaming", - "synchronous_commit", "two_phase", "disable_on_error"); + "enabled", "slot_name", "streaming", "synchronous_commit", + "two_phase", "disable_on_error", "min_apply_delay"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index d1260f590c..fda97dab56 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -58,6 +58,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW XLogRecPtr subskiplsn; /* All changes finished at this LSN are * skipped */ + int64 subapplydelay; /* Replication apply delay */ + NameData subname; /* Name of the subscription */ Oid subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */ @@ -104,6 +106,7 @@ typedef struct Subscription * in */ XLogRecPtr skiplsn; /* All changes finished at this LSN are * skipped */ + int64 applydelay; /* Replication apply delay */ char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ bool enabled; /* Indicates if the subscription is enabled */ diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h index d155f1b03b..d5bbfad1c4 100644 --- a/src/include/datatype/timestamp.h +++ b/src/include/datatype/timestamp.h @@ -127,6 +127,8 @@ struct pg_itm_in #define SECS_PER_MINUTE 60 #define MINS_PER_HOUR 60 +#define MSECS_PER_DAY INT64CONST(86400000) + #define USECS_PER_DAY INT64CONST(86400000000) #define USECS_PER_HOUR INT64CONST(3600000000) #define USECS_PER_MINUTE INT64CONST(60000000) diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index edf3a97318..91709035da 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -78,6 +78,8 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec); +extern int64 interval_to_ms(const Interval *interval); + extern TimestampTz time_t_to_timestamptz(pg_time_t tm); extern pg_time_t timestamptz_to_time_t(TimestampTz t); diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 5db7146e06..b35381e065 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -96,10 +96,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+-------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | 0 | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -108,10 +108,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+-------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | 0 | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -143,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+-------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | 0 | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -179,19 +179,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -202,19 +202,19 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -229,10 +229,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more then once @@ -247,10 +247,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -284,10 +284,10 @@ ERROR: two_phase requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -296,10 +296,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -308,10 +308,10 @@ DROP SUBSCRIPTION regress_testsub; CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -323,18 +323,47 @@ ERROR: disable_on_error requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | 0 | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail -- min_apply_delay must be a non-negative integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo); +ERROR: invalid input syntax for type interval: "foo" +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1); +ERROR: option "min_apply_delay" must not be negative +-- success -- 123 ms +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | 123000 | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- success -- interval is converted into ms and stored as integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = '4h 27min 35s'); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | 16055000 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 74c38ead5d..51dade37b1 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -254,6 +254,26 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail -- min_apply_delay must be a non-negative integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo); +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1); + +-- success -- 123 ms +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + +-- success -- interval is converted into ms and stored as integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = '4h 27min 35s'); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/subscription/t/032_apply_delay.pl b/src/test/subscription/t/032_apply_delay.pl new file mode 100644 index 0000000000..14591323b6 --- /dev/null +++ b/src/test/subscription/t/032_apply_delay.pl @@ -0,0 +1,110 @@ + +# Copyright (c) 2022, PostgreSQL Global Development Group + +# Test replication apply delay +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Create publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + "log_min_messages = debug2"); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on, min_apply_delay = '2s')" +); + +$node_publisher->wait_for_catchup($appname); + +# Check log starting now for logical replication apply delay +my log_location = -s $node_subscriber->logfile; + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM test_tab"); +is($result, qq(2|1|2), 'check initial data was copied to subscriber'); + +# new row to trigger apply delay +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (3, 'baz')"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM test_tab"); +is($result, qq(3|1|3), 'check if the new row was applied to subscriber'); + +check_apply_delay_log("logical replication apply delay"); + +# Test streamed transaction. +# Insert, update and delete enough rows to exceed 64kB limit. +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4, 5000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM test_tab"); +is($result, qq(3334|1|5000), 'check if the new rows were applied to subscriber'); + +check_apply_delay_log("logical replication apply delay"); + +$node_subscriber->stop; +$node_publisher->stop; + +done_testing(); + +sub check_apply_delay_log +{ + my $message = shift; + my $old_log_location = $log_location; + + $log_location = $node_subscriber->wait_for_log(qr/$message/, $log_location); + + cmp_ok($log_location, '>', $old_log_location, + "$sect: logfile contains triggered logical replication apply delay" + ); +} -- 2.30.2