On 2020-Sep-15, Alvaro Herrera wrote: > I overlooked this in 2f9661311b83. From this perspective, I agree it > looks wrong. We still have to send *some* completion tag (the 'C' > message), but maybe we can invent a separate entry point in dest.c for > that -- EndReplicationCommand() or some such -- that takes values from a > separate enum?
It seems simpler than that actually; we don't need to build a lot of infrastructure. -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
commit 2c1eab8dd3eade63598a34f81d8b829d819f6ab5 Author: Alvaro Herrera <alvhe...@alvh.no-ip.org> AuthorDate: Wed Sep 16 13:04:38 2020 -0300 CommitDate: Wed Sep 16 13:04:38 2020 -0300 replication command end diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 67093383e6..a48f2c3c64 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1122,11 +1122,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - QueryCompletion qc; - ReplicationSlotDrop(cmd->slotname, !cmd->wait); - SetQueryCompletion(&qc, CMDTAG_DROP_REPLICATION_SLOT, 0); - EndCommand(&qc, DestRemote, false); } /* @@ -1519,7 +1515,6 @@ exec_replication_command(const char *cmd_string) Node *cmd_node; MemoryContext cmd_context; MemoryContext old_context; - QueryCompletion qc; /* * If WAL sender has been told that shutdown is getting close, switch its @@ -1620,19 +1615,23 @@ exec_replication_command(const char *cmd_string) { case T_IdentifySystemCmd: IdentifySystem(); + EndReplicationCommand("IDENTIFY_SYSTEM"); break; case T_BaseBackupCmd: PreventInTransactionBlock(true, "BASE_BACKUP"); SendBaseBackup((BaseBackupCmd *) cmd_node); + EndReplicationCommand("BASE_BACKUP"); break; case T_CreateReplicationSlotCmd: CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node); + EndReplicationCommand("CREATE_REPLICATION_SLOT"); break; case T_DropReplicationSlotCmd: DropReplicationSlot((DropReplicationSlotCmd *) cmd_node); + EndReplicationCommand("DROP_REPLICATION_SLOT"); break; case T_StartReplicationCmd: @@ -1646,6 +1645,8 @@ exec_replication_command(const char *cmd_string) else StartLogicalReplication(cmd); + /* callees already sent their own completion message */ + Assert(xlogreader != NULL); break; } @@ -1653,6 +1654,7 @@ exec_replication_command(const char *cmd_string) case T_TimeLineHistoryCmd: PreventInTransactionBlock(true, "TIMELINE_HISTORY"); SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); + EndReplicationCommand("TIMELINE_HISTORY"); break; case T_VariableShowStmt: @@ -1664,6 +1666,7 @@ exec_replication_command(const char *cmd_string) StartTransactionCommand(); GetPGVariable(n->name, dest); CommitTransactionCommand(); + EndReplicationCommand("SHOW"); } break; @@ -1676,10 +1679,6 @@ exec_replication_command(const char *cmd_string) MemoryContextSwitchTo(old_context); MemoryContextDelete(cmd_context); - /* Send CommandComplete message */ - SetQueryCompletion(&qc, CMDTAG_SELECT, 0); - EndCommand(&qc, DestRemote, true); - /* Report to pgstat that this process is now idle */ pgstat_report_activity(STATE_IDLE, NULL); debug_query_string = NULL; diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index 7208751ec7..3a1be6b999 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -211,6 +211,15 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o } } +/* + * Simplified version EndCommand, for use of replication commands + */ +void +EndReplicationCommand(const char *commandTag) +{ + pq_putmessage('C', commandTag, strlen(commandTag) + 1); +} + /* ---------------- * NullCommand - tell dest that an empty query string was recognized * diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h index 8ef0f55e74..be94852bbd 100644 --- a/src/include/tcop/cmdtaglist.h +++ b/src/include/tcop/cmdtaglist.h @@ -157,7 +157,6 @@ PG_CMDTAG(CMDTAG_DROP_OWNED, "DROP OWNED", true, false, false) PG_CMDTAG(CMDTAG_DROP_POLICY, "DROP POLICY", true, false, false) PG_CMDTAG(CMDTAG_DROP_PROCEDURE, "DROP PROCEDURE", true, false, false) PG_CMDTAG(CMDTAG_DROP_PUBLICATION, "DROP PUBLICATION", true, false, false) -PG_CMDTAG(CMDTAG_DROP_REPLICATION_SLOT, "DROP REPLICATION SLOT", false, false, false) PG_CMDTAG(CMDTAG_DROP_ROLE, "DROP ROLE", false, false, false) PG_CMDTAG(CMDTAG_DROP_ROUTINE, "DROP ROUTINE", true, false, false) PG_CMDTAG(CMDTAG_DROP_RULE, "DROP RULE", true, false, false) diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index 662ce8a56f..2e07f1516d 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -139,6 +139,7 @@ extern void BeginCommand(CommandTag commandTag, CommandDest dest); extern DestReceiver *CreateDestReceiver(CommandDest dest); extern void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output); +extern void EndReplicationCommand(const char *commandTag); /* Additional functions that go with destination management, more or less. */