On Tue, Apr 06, 2021 at 02:25:05PM +0900, Amit Langote wrote:
> While updating the patch to do so, it occurred to me that maybe we
> could move the ExecInitResultRelation() call into
> create_estate_for_relation() too, in the spirit of removing
> duplicative code.  See attached updated patch.  Actually I remember
> proposing that as part of the commit you shared in your earlier email,
> but for some reason it didn't end up in the commit.  I now think maybe
> we should do that after all.

So, I have been studying 1375422c and this thread.  Using
ExecCloseRangeTableRelations() when cleaning up the executor state is
reasonable as of the equivalent call to ExecInitRangeTable().  Now,
there is something that itched me quite a lot while reviewing the
proposed patch: ExecCloseResultRelations() is missing, but other
code paths make an effort to mirror any calls of ExecInitRangeTable()
with it.  Looking closer, I think that the worker code is actually
confused with the handling of the opening and closing of the indexes
needed by a result relation once you use that, because some code paths
related to tuple routing for partitions may, or may not, need to do
that.  However, once the code integrates with ExecInitRangeTable() and
ExecCloseResultRelations(), the index handlings could get much simpler
to understand as the internal apply routines for INSERT/UPDATE/DELETE
have no need to think about the opening or closing them.  Well,
mostly, to be honest.

There are two exceptions when it comes the tuple routing for
partitioned tables, one for INSERT/DELETE as the result relation found
at the top of apply_handle_tuple_routing() can be used, and a second
for the UPDATE case as it is necessary to re-route the tuple to the
new partition, as it becomes necessary to open and close the indexes
of the new partition relation where a tuple is sent to.  I think that
there is a lot of room for a much better integration in terms of
estate handling for this stuff with the executor, but that would be
too invasive for v14 post feature freeze, and I am not sure what a
good design would be.

Related to that, I found confusing that the patch was passing down a
result relation from create_estate_for_relation() for something that's
just stored in the executor state.  Having a "close" routine that maps
to the "create" routine gives a good vibe, though "close" is usually
used in parallel of "open" in the PG code, and instead of "free" I
have found "finish" a better term.

Another thing, and I think that it is a good change, is that it is
necessary to push a snapshot in the worker process before creating the
executor state as any index predicates of the result relation are
going to need that when opened.  My impression of the code of worker.c
is that the amount of code duplication is quite high between the three
DML code paths, with the update tuple routing logic being a space of
improvements on its own, and that it could gain in clarity with more
refactoring work around the executor states, but I am fine to leave
that as future work.  That's too late for v14.

Attached is v5 that I am finishing with.  Much more could be done but
I don't want to do something too invasive at this stage of the game.
There are a couple of extra relations in terms of relations opened for
a partitioned table within create_estate_for_relation() when
redirecting to the tuple routing, but my guess is that this would be
better in the long-term.  We could bypass doing that when working on a
partitioned table, but I really don't think that this code should be
made more confusing and that we had better apply
ExecCloseResultRelations() for all the relations faced.  That's
simpler to reason about IMO.
--
Michael
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fb3ba5c415..704fba28b3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -239,8 +239,7 @@ static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
 									LogicalRepRelation *remoterel,
 									TupleTableSlot *remoteslot,
 									TupleTableSlot **localslot);
-static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
-									   EState *estate,
+static void apply_handle_tuple_routing(EState *estate,
 									   TupleTableSlot *remoteslot,
 									   LogicalRepTupleData *newtup,
 									   LogicalRepRelMapEntry *relmapentry,
@@ -337,14 +336,16 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
 /*
  * Executor state preparation for evaluation of constraint expressions,
  * indexes and triggers.
- *
- * This is based on similar code in copy.c
  */
 static EState *
 create_estate_for_relation(LogicalRepRelMapEntry *rel)
 {
 	EState	   *estate;
 	RangeTblEntry *rte;
+	ResultRelInfo *resultRelInfo;
+
+	/* This had better make sure that a snapshot is active */
+	Assert(ActiveSnapshotSet());
 
 	estate = CreateExecutorState();
 
@@ -357,12 +358,41 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
 
 	estate->es_output_cid = GetCurrentCommandId(true);
 
+	resultRelInfo = makeNode(ResultRelInfo);
+	ExecInitResultRelation(estate, resultRelInfo, 1);
+	ExecOpenIndices(resultRelInfo, false);
+
 	/* Prepare to catch AFTER triggers. */
 	AfterTriggerBeginQuery();
 
 	return estate;
 }
 
+/*
+ * Close and cleanup the executor state created by
+ * create_estate_for_relation().
+ */
+static void
+finish_estate_for_relation(EState *estate)
+{
+	/* only one result relation should be set here */
+	Assert(list_length(estate->es_opened_result_relations) == 1);
+
+	/* Handle any queued AFTER triggers. */
+	AfterTriggerEndQuery(estate);
+
+	/* Cleanup. */
+	ExecResetTupleTable(estate->es_tupleTable, false);
+
+	/*
+	 * Close any Relations that have been opened for range table entries or
+	 * result relations.
+	 */
+	ExecCloseResultRelations(estate);
+	ExecCloseRangeTableRelations(estate);
+	FreeExecutorState(estate);
+}
+
 /*
  * Executes default values for columns for which we can't map to remote
  * relation columns.
@@ -1142,7 +1172,6 @@ GetRelationIdentityOrPK(Relation rel)
 static void
 apply_handle_insert(StringInfo s)
 {
-	ResultRelInfo *resultRelInfo;
 	LogicalRepRelMapEntry *rel;
 	LogicalRepTupleData newtup;
 	LogicalRepRelId relid;
@@ -1167,16 +1196,14 @@ apply_handle_insert(StringInfo s)
 		return;
 	}
 
+	/* Input functions may need an active snapshot, so get one */
+	PushActiveSnapshot(GetTransactionSnapshot());
+
 	/* Initialize the executor state. */
 	estate = create_estate_for_relation(rel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel),
 										&TTSOpsVirtual);
-	resultRelInfo = makeNode(ResultRelInfo);
-	InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
-
-	/* Input functions may need an active snapshot, so get one */
-	PushActiveSnapshot(GetTransactionSnapshot());
 
 	/* Process and store remote tuple in the slot */
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
@@ -1186,19 +1213,14 @@ apply_handle_insert(StringInfo s)
 
 	/* For a partitioned table, insert the tuple into a partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-		apply_handle_tuple_routing(resultRelInfo, estate,
-								   remoteslot, NULL, rel, CMD_INSERT);
+		apply_handle_tuple_routing(estate, remoteslot, NULL, rel, CMD_INSERT);
 	else
-		apply_handle_insert_internal(resultRelInfo, estate,
+		apply_handle_insert_internal(estate->es_result_relations[0], estate,
 									 remoteslot);
 
 	PopActiveSnapshot();
 
-	/* Handle queued AFTER triggers. */
-	AfterTriggerEndQuery(estate);
-
-	ExecResetTupleTable(estate->es_tupleTable, false);
-	FreeExecutorState(estate);
+	finish_estate_for_relation(estate);
 
 	logicalrep_rel_close(rel, NoLock);
 
@@ -1210,13 +1232,8 @@ static void
 apply_handle_insert_internal(ResultRelInfo *relinfo,
 							 EState *estate, TupleTableSlot *remoteslot)
 {
-	ExecOpenIndices(relinfo, false);
-
 	/* Do the insert. */
 	ExecSimpleRelationInsert(relinfo, estate, remoteslot);
-
-	/* Cleanup. */
-	ExecCloseIndices(relinfo);
 }
 
 /*
@@ -1260,7 +1277,6 @@ check_relation_updatable(LogicalRepRelMapEntry *rel)
 static void
 apply_handle_update(StringInfo s)
 {
-	ResultRelInfo *resultRelInfo;
 	LogicalRepRelMapEntry *rel;
 	LogicalRepRelId relid;
 	EState	   *estate;
@@ -1292,13 +1308,13 @@ apply_handle_update(StringInfo s)
 	/* Check if we can do the update. */
 	check_relation_updatable(rel);
 
+	PushActiveSnapshot(GetTransactionSnapshot());
+
 	/* Initialize the executor state. */
 	estate = create_estate_for_relation(rel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel),
 										&TTSOpsVirtual);
-	resultRelInfo = makeNode(ResultRelInfo);
-	InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
 
 	/*
 	 * Populate updatedCols so that per-column triggers can fire, and so
@@ -1327,8 +1343,6 @@ apply_handle_update(StringInfo s)
 	/* Also populate extraUpdatedCols, in case we have generated columns */
 	fill_extraUpdatedCols(target_rte, rel->localrel);
 
-	PushActiveSnapshot(GetTransactionSnapshot());
-
 	/* Build the search tuple. */
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 	slot_store_data(remoteslot, rel,
@@ -1337,19 +1351,14 @@ apply_handle_update(StringInfo s)
 
 	/* For a partitioned table, apply update to correct partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-		apply_handle_tuple_routing(resultRelInfo, estate,
-								   remoteslot, &newtup, rel, CMD_UPDATE);
+		apply_handle_tuple_routing(estate, remoteslot, &newtup, rel, CMD_UPDATE);
 	else
-		apply_handle_update_internal(resultRelInfo, estate,
+		apply_handle_update_internal(estate->es_result_relations[0], estate,
 									 remoteslot, &newtup, rel);
 
 	PopActiveSnapshot();
 
-	/* Handle queued AFTER triggers. */
-	AfterTriggerEndQuery(estate);
-
-	ExecResetTupleTable(estate->es_tupleTable, false);
-	FreeExecutorState(estate);
+	finish_estate_for_relation(estate);
 
 	logicalrep_rel_close(rel, NoLock);
 
@@ -1370,7 +1379,6 @@ apply_handle_update_internal(ResultRelInfo *relinfo,
 	MemoryContext oldctx;
 
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
-	ExecOpenIndices(relinfo, false);
 
 	found = FindReplTupleInLocalRel(estate, localrel,
 									&relmapentry->remoterel,
@@ -1409,7 +1417,6 @@ apply_handle_update_internal(ResultRelInfo *relinfo,
 	}
 
 	/* Cleanup. */
-	ExecCloseIndices(relinfo);
 	EvalPlanQualEnd(&epqstate);
 }
 
@@ -1421,7 +1428,6 @@ apply_handle_update_internal(ResultRelInfo *relinfo,
 static void
 apply_handle_delete(StringInfo s)
 {
-	ResultRelInfo *resultRelInfo;
 	LogicalRepRelMapEntry *rel;
 	LogicalRepTupleData oldtup;
 	LogicalRepRelId relid;
@@ -1449,15 +1455,13 @@ apply_handle_delete(StringInfo s)
 	/* Check if we can do the delete. */
 	check_relation_updatable(rel);
 
+	PushActiveSnapshot(GetTransactionSnapshot());
+
 	/* Initialize the executor state. */
 	estate = create_estate_for_relation(rel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel),
 										&TTSOpsVirtual);
-	resultRelInfo = makeNode(ResultRelInfo);
-	InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
-
-	PushActiveSnapshot(GetTransactionSnapshot());
 
 	/* Build the search tuple. */
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
@@ -1466,19 +1470,14 @@ apply_handle_delete(StringInfo s)
 
 	/* For a partitioned table, apply delete to correct partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-		apply_handle_tuple_routing(resultRelInfo, estate,
-								   remoteslot, NULL, rel, CMD_DELETE);
+		apply_handle_tuple_routing(estate, remoteslot, NULL, rel, CMD_DELETE);
 	else
-		apply_handle_delete_internal(resultRelInfo, estate,
+		apply_handle_delete_internal(estate->es_result_relations[0], estate,
 									 remoteslot, &rel->remoterel);
 
 	PopActiveSnapshot();
 
-	/* Handle queued AFTER triggers. */
-	AfterTriggerEndQuery(estate);
-
-	ExecResetTupleTable(estate->es_tupleTable, false);
-	FreeExecutorState(estate);
+	finish_estate_for_relation(estate);
 
 	logicalrep_rel_close(rel, NoLock);
 
@@ -1497,7 +1496,6 @@ apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
 	bool		found;
 
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
-	ExecOpenIndices(relinfo, false);
 
 	found = FindReplTupleInLocalRel(estate, localrel, remoterel,
 									remoteslot, &localslot);
@@ -1520,7 +1518,6 @@ apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
 	}
 
 	/* Cleanup. */
-	ExecCloseIndices(relinfo);
 	EvalPlanQualEnd(&epqstate);
 }
 
@@ -1561,13 +1558,13 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
  * This handles insert, update, delete on a partitioned table.
  */
 static void
-apply_handle_tuple_routing(ResultRelInfo *relinfo,
-						   EState *estate,
+apply_handle_tuple_routing(EState *estate,
 						   TupleTableSlot *remoteslot,
 						   LogicalRepTupleData *newtup,
 						   LogicalRepRelMapEntry *relmapentry,
 						   CmdType operation)
 {
+	ResultRelInfo *relinfo = estate->es_result_relations[0];
 	Relation	parentrel = relinfo->ri_RelationDesc;
 	ModifyTableState *mtstate = NULL;
 	PartitionTupleRouting *proute = NULL;
@@ -1577,6 +1574,9 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
 	TupleConversionMap *map;
 	MemoryContext oldctx;
 
+	/* only the partitioned table should be set as result relation */
+	Assert(list_length(estate->es_opened_result_relations) == 1);
+
 	/* ModifyTableState is needed for ExecFindPartition(). */
 	mtstate = makeNode(ModifyTableState);
 	mtstate->ps.plan = NULL;
@@ -1614,17 +1614,22 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
 	}
 	MemoryContextSwitchTo(oldctx);
 
+
 	switch (operation)
 	{
 		case CMD_INSERT:
+			ExecOpenIndices(partrelinfo, false);
 			apply_handle_insert_internal(partrelinfo, estate,
 										 remoteslot_part);
+			ExecCloseIndices(partrelinfo);
 			break;
 
 		case CMD_DELETE:
+			ExecOpenIndices(partrelinfo, false);
 			apply_handle_delete_internal(partrelinfo, estate,
 										 remoteslot_part,
 										 &relmapentry->remoterel);
+			ExecCloseIndices(partrelinfo);
 			break;
 
 		case CMD_UPDATE:
@@ -1766,8 +1771,11 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
 						slot_getallattrs(remoteslot);
 					}
 					MemoryContextSwitchTo(oldctx);
+
+					ExecOpenIndices(partrelinfo_new, false);
 					apply_handle_insert_internal(partrelinfo_new, estate,
 												 remoteslot_part);
+					ExecCloseIndices(partrelinfo_new);
 				}
 			}
 			break;

Attachment: signature.asc
Description: PGP signature

Reply via email to