On 25/01/17 18:16, Peter Eisentraut wrote: > On 1/22/17 8:11 PM, Petr Jelinek wrote: >> 0001 - Changes the libpqrcv_connect to use async libpq api so that it >> won't get stuck forever in case of connect is stuck. This is preexisting >> bug that also affects walreceiver but it's less visible there as there >> is no SQL interface to initiate connection there. > > Probably a mistake here: > > + case PGRES_POLLING_READING: > + extra_flag = WL_SOCKET_READABLE; > + /* pass through */ > + case PGRES_POLLING_WRITING: > + extra_flag = WL_SOCKET_WRITEABLE; > > extra_flag gets overwritten in the reading case. >
Eh, reworked that to just if statement as switch does not really buy us anything there. > Please elaborate in the commit message what this change is for. > Okay. >> 0002 - Close replication connection when CREATE SUBSCRIPTION gets >> canceled (otherwise walsender on the other side may stay in idle in >> transaction state). > > committed Thanks! > >> 0003 - Fixes buffer initialization in walsender that I found when >> testing the above two. This one should be back-patched to 9.4 since it's >> broken since then. > > Can you explain more in which code path this problem occurs? With existing code base, anything that calls WalSndWaitForWal (it calls ProcessRepliesIfAny()) which is called from logical_read_xlog_page which is given as callback to logical decoding in CreateReplicationSlot and StartLogicalReplication. The reason why I decided to put it into init is that following up all the paths to where buffers are used is rather complicated due to various callbacks so if anybody else starts poking around in the future it might get easily broken again if we don't initialize those unconditionally (plus the memory footprint is few kB and in usual use of WalSender they will eventually be initialized anyway as they are needed for streaming). > I think we should get rid of the global variables and give each function > its own buffer that it initializes the first time through. Otherwise > we'll keep having to worry about this. > Because of above, it would mean some refactoring in logical decoding APIs not just in WalSender so that would not be backpatchable (and in general it's much bigger patch then). >> 0004 - Fixes the foreign key issue reported by Thom Brown and also adds >> tests for FK and trigger handling. > > I think the trigger handing should go into execReplication.c. > Not in the current state, eventually (and I am afraid that PG11 material at this point as we still have partitioned tables support and initial data copy to finish in this release) we'll want to move all the executor state code to execReplication.c and do less of reinitialization but in the current code the trigger stuff belongs to worker IMHO. >> 0005 - Adds support for renaming publications and subscriptions. > > Could those not be handled in the generic rename support in > ExecRenameStmt()? Yes seems they can. Attached updated version of the uncommitted patches. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From 980ce872862e7a9abcbec14864721103507b5136 Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmodos@pjmodos.net> Date: Fri, 20 Jan 2017 21:20:56 +0100 Subject: [PATCH 1/4] Use asynchronous connect API in libpqwalreceiver This makes the connection attempt from CREATE SUBSCRIPTION and from WalReceiver interruptable by user in case the libpq connection is hanging. The previous coding required immediate shutdown (SIGQUIT) of PostgreSQL in that situation. --- src/backend/postmaster/pgstat.c | 4 +- .../libpqwalreceiver/libpqwalreceiver.c | 51 +++++++++++++++++++++- src/include/pgstat.h | 2 +- 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 7176cf1..19ad6b5 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3340,8 +3340,8 @@ pgstat_get_wait_client(WaitEventClient w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; - case WAIT_EVENT_LIBPQWALRECEIVER_READ: - event_name = "LibPQWalReceiverRead"; + case WAIT_EVENT_LIBPQWALRECEIVER: + event_name = "LibPQWalReceiver"; break; case WAIT_EVENT_WAL_SENDER_WAIT_WAL: event_name = "WalSenderWaitForWAL"; diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 44a89c7..536324c 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -112,6 +112,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err) { WalReceiverConn *conn; + PostgresPollingStatusType status; const char *keys[5]; const char *vals[5]; int i = 0; @@ -138,7 +139,53 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, vals[i] = NULL; conn = palloc0(sizeof(WalReceiverConn)); - conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true); + conn->streamConn = PQconnectStartParams(keys, vals, + /* expand_dbname = */ true); + /* Check for conn status. */ + if (PQstatus(conn->streamConn) == CONNECTION_BAD) + { + *err = pstrdup(PQerrorMessage(conn->streamConn)); + return NULL; + } + + /* Poll connection. */ + do + { + int rc; + + /* Determine current state of the connection. */ + status = PQconnectPoll(conn->streamConn); + + /* Sleep a bit if waiting for socket. */ + if (status == PGRES_POLLING_READING || + status == PGRES_POLLING_WRITING) + { + int extra_flag; + + extra_flag = PGRES_POLLING_READING ? WL_SOCKET_READABLE : + WL_SOCKET_WRITEABLE; + + ResetLatch(&MyProc->procLatch); + rc = WaitLatchOrSocket(&MyProc->procLatch, + WL_POSTMASTER_DEATH | + WL_LATCH_SET | extra_flag, + PQsocket(conn->streamConn), + 0, + WAIT_EVENT_LIBPQWALRECEIVER); + + /* Emergency bailout. */ + if (rc & WL_POSTMASTER_DEATH) + exit(1); + + /* Interrupted. */ + if (rc & WL_LATCH_SET) + CHECK_FOR_INTERRUPTS(); + } + + /* Otherwise loop until we have OK or FAILED status. */ + } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); + + /* Check the status. */ if (PQstatus(conn->streamConn) != CONNECTION_OK) { *err = pstrdup(PQerrorMessage(conn->streamConn)); @@ -521,7 +568,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) WL_LATCH_SET, PQsocket(streamConn), 0, - WAIT_EVENT_LIBPQWALRECEIVER_READ); + WAIT_EVENT_LIBPQWALRECEIVER); if (rc & WL_POSTMASTER_DEATH) exit(1); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index de8225b..e088474 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -764,7 +764,7 @@ typedef enum WAIT_EVENT_CLIENT_WRITE, WAIT_EVENT_SSL_OPEN_SERVER, WAIT_EVENT_WAL_RECEIVER_WAIT_START, - WAIT_EVENT_LIBPQWALRECEIVER_READ, + WAIT_EVENT_LIBPQWALRECEIVER, WAIT_EVENT_WAL_SENDER_WAIT_WAL, WAIT_EVENT_WAL_SENDER_WRITE_DATA } WaitEventClient; -- 2.7.4
From a4003b904e9edb6f8043c33b65ce6791feb6cce3 Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmodos@pjmodos.net> Date: Fri, 20 Jan 2017 22:22:53 +0100 Subject: [PATCH 2/4] Always initialize stringinfo buffers in walsender --- src/backend/replication/walsender.c | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5909b7d..8dd7a6b 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -244,6 +244,14 @@ InitWalSender(void) */ MarkPostmasterChildWalSender(); SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); + + /* + * Allocate buffers that will be used for each outgoing and incoming + * message. + */ + initStringInfo(&output_message); + initStringInfo(&reply_message); + initStringInfo(&tmpbuf); } /* @@ -819,8 +827,6 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL); } - initStringInfo(&output_message); - if (cmd->kind == REPLICATION_KIND_LOGICAL) { LogicalDecodingContext *ctx; @@ -1814,14 +1820,6 @@ static void WalSndLoop(WalSndSendDataCallback send_data) { /* - * Allocate buffers that will be used for each outgoing and incoming - * message. We do this just once to reduce palloc overhead. - */ - initStringInfo(&output_message); - initStringInfo(&reply_message); - initStringInfo(&tmpbuf); - - /* * Initialize the last reply timestamp. That enables timeout processing * from hereon. */ -- 2.7.4
From 85d8ef1ef6881660547870bcce96b2693d4f9530 Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmodos@pjmodos.net> Date: Sun, 22 Jan 2017 23:16:57 +0100 Subject: [PATCH 3/4] Fix after trigger execution in logical replication --- src/backend/replication/logical/worker.c | 15 ++++ src/test/subscription/t/003_constraints.pl | 113 +++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+) create mode 100644 src/test/subscription/t/003_constraints.pl diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9383960..293140c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -176,6 +176,9 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) if (resultRelInfo->ri_TrigDesc) estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate); + /* Prepare to catch AFTER triggers. */ + AfterTriggerBeginQuery(); + return estate; } @@ -536,6 +539,10 @@ apply_handle_insert(StringInfo s) /* Cleanup. */ ExecCloseIndices(estate->es_result_relation_info); PopActiveSnapshot(); + + /* Handle queued AFTER triggers. */ + AfterTriggerEndQuery(estate); + ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); @@ -676,6 +683,10 @@ apply_handle_update(StringInfo s) /* Cleanup. */ ExecCloseIndices(estate->es_result_relation_info); PopActiveSnapshot(); + + /* Handle queued AFTER triggers. */ + AfterTriggerEndQuery(estate); + EvalPlanQualEnd(&epqstate); ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); @@ -763,6 +774,10 @@ apply_handle_delete(StringInfo s) /* Cleanup. */ ExecCloseIndices(estate->es_result_relation_info); PopActiveSnapshot(); + + /* Handle queued AFTER triggers. */ + AfterTriggerEndQuery(estate); + EvalPlanQualEnd(&epqstate); ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl new file mode 100644 index 0000000..203322f --- /dev/null +++ b/src/test/subscription/t/003_constraints.pl @@ -0,0 +1,113 @@ +# Basic logical replication test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 4; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Setup structure on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_fk (bid int PRIMARY KEY);"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_fk (bid int PRIMARY KEY);"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR ALL TABLES;"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"); + +# Wait for subscriber to finish initialization +my $caughtup_query = +"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';"; +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk (bid) VALUES (1);"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk_ref (id, bid) VALUES (1,1);"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Check data on subscriber +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk"); +is($result, qq(1|1|1), 'check replicated tab_fk inserts on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref"); +is($result, qq(1|1|1), 'check replicated tab_fk_ref inserts on subscriber'); + +# Drop the fk on provider +$node_publisher->safe_psql('postgres', + "DROP TABLE tab_fk CASCADE;"); + +# Insert data +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk_ref (id, bid) VALUES (2,2);"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# FK is not enforced on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref"); +is($result, qq(2|1|2), 'check FK ignored on subscriber'); + +# Add replica trigger +$node_subscriber->safe_psql('postgres', qq{ +CREATE FUNCTION filter_basic_dml_fn() RETURNS TRIGGER AS \$\$ +BEGIN + IF (TG_OP = 'INSERT') THEN + IF (NEW.id < 10) THEN + RETURN NEW; + ELSE + RETURN NULL; + END IF; + ELSE + RAISE WARNING 'Unknown action'; + RETURN NULL; + END IF; +END; +\$\$ LANGUAGE plpgsql; +CREATE TRIGGER filter_basic_dml_trg +BEFORE INSERT ON tab_fk_ref +FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn(); +ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg; +}); + +# Insert data +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk_ref (id, bid) VALUES (10,10);"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# The row should be skipped on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref"); +is($result, qq(2|1|2), 'check skipped insert on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 2.7.4
From 754b061abd52e75a9af8c1119ec7cef08d9632c4 Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmodos@pjmodos.net> Date: Thu, 19 Jan 2017 00:59:01 +0100 Subject: [PATCH 4/4] Add RENAME support for PUBLICATIONs and SUBSCRIPTIONs --- src/backend/commands/alter.c | 15 +++++++++++++++ src/backend/parser/gram.y | 18 ++++++++++++++++++ src/backend/replication/logical/worker.c | 16 +++++++++++++++- src/bin/psql/tab-complete.c | 6 ++++-- src/test/regress/expected/publication.out | 10 +++++++++- src/test/regress/expected/subscription.out | 10 +++++++++- src/test/regress/sql/publication.sql | 6 +++++- src/test/regress/sql/subscription.sql | 6 +++++- src/test/subscription/t/001_rep_changes.pl | 11 ++++++++++- 9 files changed, 90 insertions(+), 8 deletions(-) diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 768fcc8..8498880 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -32,6 +32,7 @@ #include "catalog/pg_opclass.h" #include "catalog/pg_opfamily.h" #include "catalog/pg_proc.h" +#include "catalog/pg_subscription.h" #include "catalog/pg_ts_config.h" #include "catalog/pg_ts_dict.h" #include "catalog/pg_ts_parser.h" @@ -90,6 +91,12 @@ report_name_conflict(Oid classId, const char *name) case LanguageRelationId: msgfmt = gettext_noop("language \"%s\" already exists"); break; + case PublicationRelationId: + msgfmt = gettext_noop("publication \"%s\" already exists"); + break; + case SubscriptionRelationId: + msgfmt = gettext_noop("subscription \"%s\" already exists"); + break; default: elog(ERROR, "unsupported object class %u", classId); break; @@ -256,6 +263,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name) IsThereOpFamilyInNamespace(new_name, opf->opfmethod, opf->opfnamespace); } + else if (classId == SubscriptionRelationId) + { + if (SearchSysCacheExists2(SUBSCRIPTIONNAME, MyDatabaseId, + CStringGetDatum(new_name))) + report_name_conflict(classId, new_name); + } else if (nameCacheId >= 0) { if (OidIsValid(namespaceId)) @@ -365,6 +378,8 @@ ExecRenameStmt(RenameStmt *stmt) case OBJECT_TSDICTIONARY: case OBJECT_TSPARSER: case OBJECT_TSTEMPLATE: + case OBJECT_PUBLICATION: + case OBJECT_SUBSCRIPTION: { ObjectAddress address; Relation catalog; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index a4edea0..03b9bc3 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -8475,6 +8475,24 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name n->missing_ok = false; $$ = (Node *)n; } + | ALTER PUBLICATION name RENAME TO name + { + RenameStmt *n = makeNode(RenameStmt); + n->renameType = OBJECT_PUBLICATION; + n->object = list_make1(makeString($3)); + n->newname = $6; + n->missing_ok = false; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name RENAME TO name + { + RenameStmt *n = makeNode(RenameStmt); + n->renameType = OBJECT_SUBSCRIPTION; + n->object = list_make1(makeString($3)); + n->newname = $6; + n->missing_ok = false; + $$ = (Node *)n; + } ; opt_column: COLUMN { $$ = COLUMN; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 293140c..3aba7b6 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1265,6 +1265,21 @@ reread_subscription(void) } /* + * Exit if subscription name was changed (it's used for + * fallback_application_name). The launcher will start new worker. + */ + if (strcmp(newsub->name, MySubscription->name) != 0) + { + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because subscription was renamed", + MySubscription->name))); + + walrcv_disconnect(wrconn); + proc_exit(0); + } + + /* * Exit if publication list was changed. The launcher will start * new worker. */ @@ -1297,7 +1312,6 @@ reread_subscription(void) /* Check for other changes that should never happen too. */ if (newsub->dbid != MySubscription->dbid || - strcmp(newsub->name, MySubscription->name) != 0 || strcmp(newsub->slotname, MySubscription->slotname) != 0) { elog(ERROR, "subscription %u changed unexpectedly", diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index d6fffcf..a8256e8 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1438,7 +1438,8 @@ psql_completion(const char *text, int start, int end) /* ALTER PUBLICATION <name> ...*/ else if (Matches3("ALTER","PUBLICATION",MatchAny)) { - COMPLETE_WITH_LIST5("WITH", "ADD TABLE", "SET TABLE", "DROP TABLE", "OWNER TO"); + COMPLETE_WITH_LIST6("WITH", "ADD TABLE", "SET TABLE", "DROP TABLE", + "OWNER TO", "RENAME TO"); } /* ALTER PUBLICATION <name> .. WITH ( ... */ else if (HeadMatches3("ALTER", "PUBLICATION",MatchAny) && TailMatches2("WITH", "(")) @@ -1449,7 +1450,8 @@ psql_completion(const char *text, int start, int end) /* ALTER SUBSCRIPTION <name> ... */ else if (Matches3("ALTER","SUBSCRIPTION",MatchAny)) { - COMPLETE_WITH_LIST6("WITH", "CONNECTION", "SET PUBLICATION", "ENABLE", "DISABLE", "OWNER TO"); + COMPLETE_WITH_LIST7("WITH", "CONNECTION", "SET PUBLICATION", "ENABLE", + "DISABLE", "OWNER TO", "RENAME TO"); } else if (HeadMatches3("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches2("WITH", "(")) { diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 5784b0f..6416fbb 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -148,7 +148,15 @@ DROP TABLE testpub_tbl1; t | t | t (1 row) -DROP PUBLICATION testpub_default; +ALTER PUBLICATION testpub_default RENAME TO testpub_foo; +\dRp testpub_foo + List of publications + Name | Owner | Inserts | Updates | Deletes +-------------+--------------------------+---------+---------+--------- + testpub_foo | regress_publication_user | t | t | t +(1 row) + +DROP PUBLICATION testpub_foo; DROP PUBLICATION testpib_ins_trunct; DROP PUBLICATION testpub_fortbl; DROP SCHEMA pub_test CASCADE; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 2ccec98..cb1ab4e 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -61,6 +61,14 @@ ALTER SUBSCRIPTION testsub DISABLE; (1 row) COMMIT; -DROP SUBSCRIPTION testsub NODROP SLOT; +ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; +\dRs + List of subscriptions + Name | Owner | Enabled | Publication +-------------+---------------------------+---------+-------------------- + testsub_foo | regress_subscription_user | f | {testpub,testpub1} +(1 row) + +DROP SUBSCRIPTION testsub_foo NODROP SLOT; RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 8779788..9563ea1 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -73,7 +73,11 @@ DROP TABLE testpub_tbl1; \dRp+ testpub_default -DROP PUBLICATION testpub_default; +ALTER PUBLICATION testpub_default RENAME TO testpub_foo; + +\dRp testpub_foo + +DROP PUBLICATION testpub_foo; DROP PUBLICATION testpib_ins_trunct; DROP PUBLICATION testpub_fortbl; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 68c17d5..fce6069 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -38,7 +38,11 @@ ALTER SUBSCRIPTION testsub DISABLE; COMMIT; -DROP SUBSCRIPTION testsub NODROP SLOT; +ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; + +\dRs + +DROP SUBSCRIPTION testsub_foo NODROP SLOT; RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index b51740b..a9c4b01 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -169,8 +169,17 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full"); is($result, qq(11|0|100), 'check replicated insert after alter publication'); +# check restart on rename +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed"); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';") + or die "Timed out while waiting for apply to restart"; + # check all the cleanup -$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed"); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); -- 2.7.4
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers