From 43eeea0bc35204440d262224b56efca958b33428 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Mon, 4 Dec 2017 11:52:11 +1300
Subject: [PATCH] Fix EXPLAIN ANALYZE of hash join when the leader doesn't
 execute.

If a hash join appears in a parallel query, there may be no hash table
available for explain.c to inspect even though a hash table may have been
built in other processes.  This could happen either because
parallel_leader_participation was set to off or because the leader happened to
hit the end of the outer relation immediately (even though the complete
relation is not empty) and decided not to build the hash table.

Commit bf11e7ee introduced a way for workers to exchange instrumentation via
the DSM segment for Sort nodes even though they are not parallel-aware.  This
commit does the same for Hash nodes, so that explain.c has a way to find
instrumentation data from an arbitrary participant that actually built the
hash table.

Author: Thomas Munro
Reviewed-By:
Discussion: https://postgr.es/m/CAEepm%3D3DUQC2-z252N55eOcZBer6DPdM%3DFzrxH9dZc5vYLsjaA%40mail.gmail.com
---
 src/backend/commands/explain.c      | 58 +++++++++++++++++------
 src/backend/executor/execParallel.c | 31 ++++++++++---
 src/backend/executor/execProcnode.c |  3 ++
 src/backend/executor/nodeHash.c     | 91 +++++++++++++++++++++++++++++++++++++
 src/include/executor/nodeHash.h     |  8 ++++
 src/include/nodes/execnodes.h       | 26 +++++++++++
 src/test/regress/expected/join.out  | 15 ++++++
 src/test/regress/sql/join.sql       | 11 +++++
 8 files changed, 222 insertions(+), 21 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 447f69d044e..1ffe635d66c 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2379,34 +2379,62 @@ show_sort_info(SortState *sortstate, ExplainState *es)
 static void
 show_hash_info(HashState *hashstate, ExplainState *es)
 {
-	HashJoinTable hashtable;
+	HashInstrumentation *hinstrument = NULL;
 
-	hashtable = hashstate->hashtable;
+	/*
+	 * In a parallel query, the leader process may or may not have run the
+	 * hash join, and even if it did it may not have built a hash table due to
+	 * timing (if it started late it might have seen no tuples in the outer
+	 * relation and skipped building the hash table).  Therefore we have to be
+	 * prepared to get instrumentation data from a worker if there is no hash
+	 * table.
+	 */
+	if (hashstate->hashtable)
+	{
+		hinstrument = (HashInstrumentation *)
+			palloc(sizeof(HashInstrumentation));
+		ExecHashGetInstrumentation(hinstrument, hashstate->hashtable);
+	}
+	else if (hashstate->shared_info)
+	{
+		SharedHashInfo *shared_info = hashstate->shared_info;
+		int		i;
+
+		/* Find the first worker that built a hash table. */
+		for (i = 0; i < shared_info->num_workers; ++i)
+		{
+			if (shared_info->hinstrument[i].nbatch > 0)
+			{
+				hinstrument = &shared_info->hinstrument[i];
+				break;
+			}
+		}
+	}
 
-	if (hashtable)
+	if (hinstrument)
 	{
-		long		spacePeakKb = (hashtable->spacePeak + 1023) / 1024;
+		long		spacePeakKb = (hinstrument->space_peak + 1023) / 1024;
 
 		if (es->format != EXPLAIN_FORMAT_TEXT)
 		{
-			ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es);
+			ExplainPropertyLong("Hash Buckets", hinstrument->nbuckets, es);
 			ExplainPropertyLong("Original Hash Buckets",
-								hashtable->nbuckets_original, es);
-			ExplainPropertyLong("Hash Batches", hashtable->nbatch, es);
+								hinstrument->nbuckets_original, es);
+			ExplainPropertyLong("Hash Batches", hinstrument->nbatch, es);
 			ExplainPropertyLong("Original Hash Batches",
-								hashtable->nbatch_original, es);
+								hinstrument->nbatch_original, es);
 			ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
 		}
-		else if (hashtable->nbatch_original != hashtable->nbatch ||
-				 hashtable->nbuckets_original != hashtable->nbuckets)
+		else if (hinstrument->nbatch_original != hinstrument->nbatch ||
+				 hinstrument->nbuckets_original != hinstrument->nbuckets)
 		{
 			appendStringInfoSpaces(es->str, es->indent * 2);
 			appendStringInfo(es->str,
 							 "Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-							 hashtable->nbuckets,
-							 hashtable->nbuckets_original,
-							 hashtable->nbatch,
-							 hashtable->nbatch_original,
+							 hinstrument->nbuckets,
+							 hinstrument->nbuckets_original,
+							 hinstrument->nbatch,
+							 hinstrument->nbatch_original,
 							 spacePeakKb);
 		}
 		else
@@ -2414,7 +2442,7 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 			appendStringInfoSpaces(es->str, es->indent * 2);
 			appendStringInfo(es->str,
 							 "Buckets: %d  Batches: %d  Memory Usage: %ldkB\n",
-							 hashtable->nbuckets, hashtable->nbatch,
+							 hinstrument->nbuckets, hinstrument->nbatch,
 							 spacePeakKb);
 		}
 	}
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 53c5254be13..f2adfb15ef5 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -29,6 +29,7 @@
 #include "executor/nodeBitmapHeapscan.h"
 #include "executor/nodeCustom.h"
 #include "executor/nodeForeignscan.h"
+#include "executor/nodeHash.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeSeqscan.h"
@@ -259,6 +260,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
 				ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
 									   e->pcxt);
 			break;
+		case T_HashState:
+			/* even when not parallel-aware */
+			ExecHashEstimate((HashState *) planstate, e->pcxt);
+			break;
 		case T_SortState:
 			/* even when not parallel-aware */
 			ExecSortEstimate((SortState *) planstate, e->pcxt);
@@ -458,6 +463,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
 				ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
 											d->pcxt);
 			break;
+		case T_HashState:
+			/* even when not parallel-aware */
+			ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
+			break;
 		case T_SortState:
 			/* even when not parallel-aware */
 			ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
@@ -928,12 +937,18 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 	planstate->worker_instrument->num_workers = instrumentation->num_workers;
 	memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
 
-	/*
-	 * Perform any node-type-specific work that needs to be done.  Currently,
-	 * only Sort nodes need to do anything here.
-	 */
-	if (IsA(planstate, SortState))
-		ExecSortRetrieveInstrumentation((SortState *) planstate);
+	/* Perform any node-type-specific work that needs to be done. */
+	switch (nodeTag(planstate))
+	{
+		case T_SortState:
+			ExecSortRetrieveInstrumentation((SortState *) planstate);
+			break;
+		case T_HashState:
+			ExecHashRetrieveInstrumentation((HashState *) planstate);
+			break;
+		default:
+			break;
+	}
 
 	return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
 								 instrumentation);
@@ -1160,6 +1175,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
 				ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
 											   pwcxt);
 			break;
+		case T_HashState:
+			/* even when not parallel-aware */
+			ExecHashInitializeWorker((HashState *) planstate, pwcxt);
+			break;
 		case T_SortState:
 			/* even when not parallel-aware */
 			ExecSortInitializeWorker((SortState *) planstate, pwcxt);
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index c1aa5064c90..9befca90161 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -751,6 +751,9 @@ ExecShutdownNode(PlanState *node)
 		case T_GatherMergeState:
 			ExecShutdownGatherMerge((GatherMergeState *) node);
 			break;
+		case T_HashState:
+			ExecShutdownHash((HashState *) node);
+			break;
 		default:
 			break;
 	}
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index f7cd8fb3472..afd7384e945 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -1637,6 +1637,97 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
 	}
 }
 
+/*
+ * Reserve space in the DSM segment for instrumentation data.
+ */
+void
+ExecHashEstimate(HashState *node, ParallelContext *pcxt)
+{
+	size_t		size;
+
+	size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
+	size = add_size(size, offsetof(SharedHashInfo, hinstrument));
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/*
+ * Set up a space in the DSM for all workers to record instrumentation data
+ * about their hash table.
+ */
+void
+ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
+{
+	size_t		size;
+
+	size = offsetof(SharedHashInfo, hinstrument) +
+		pcxt->nworkers * sizeof(HashInstrumentation);
+	node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
+	memset(node->shared_info, 0, size);
+	node->shared_info->num_workers = pcxt->nworkers;
+	shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
+				   node->shared_info);
+}
+
+/*
+ * Locate the DSM space for hash table instrumentation data that we'll write
+ * to at shutdown time.
+ */
+void
+ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
+{
+	SharedHashInfo *shared_info;
+
+	shared_info = (SharedHashInfo *)
+		shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, true);
+	node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
+}
+
+/*
+ * Copy instrumentation data from this worker's hash table (if it built one)
+ * to DSM memory so the leader can retrieve it.  This must be done in an
+ * ExecShutdownHash() rather than ExecEndHash() because the latter runs after
+ * we've detached from the DSM segment.
+ */
+void
+ExecShutdownHash(HashState *node)
+{
+	if (node->hinstrument && node->hashtable)
+		ExecHashGetInstrumentation(node->hinstrument, node->hashtable);
+}
+
+/*
+ * Retrieve instrumentation data from workers before the DSM segment is
+ * detached, so that EXPLAIN can access it.
+ */
+void
+ExecHashRetrieveInstrumentation(HashState *node)
+{
+	SharedHashInfo *shared_info = node->shared_info;
+	size_t		size;
+
+	/* Replace node->shared_info with a copy in backend-local memory. */
+	size = offsetof(SharedHashInfo, hinstrument) +
+		shared_info->num_workers * sizeof(HashInstrumentation);
+	node->shared_info = palloc(size);
+	memcpy(node->shared_info, shared_info, size);
+}
+
+/*
+ * Copy the instrumentation data from 'hashtable' into a HashInstrumentation
+ * struct.
+ */
+void
+ExecHashGetInstrumentation(HashInstrumentation *instrument,
+						   HashJoinTable hashtable)
+{
+	instrument->nbuckets = hashtable->nbuckets;
+	instrument->nbuckets_original = hashtable->nbuckets_original;
+	instrument->nbatch = hashtable->nbatch;
+	instrument->nbatch_original = hashtable->nbatch_original;
+	instrument->space_peak = hashtable->spacePeak;
+}
+
 /*
  * Allocate 'size' bytes from the currently active HashMemoryChunk
  */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 3ae556fb6c5..0974f1edc21 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -14,6 +14,7 @@
 #ifndef NODEHASH_H
 #define NODEHASH_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 
 extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags);
@@ -48,5 +49,12 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 						int *numbatches,
 						int *num_skew_mcvs);
 extern int	ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
+extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
+extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt);
+extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt);
+extern void ExecHashRetrieveInstrumentation(HashState *node);
+extern void ExecShutdownHash(HashState *node);
+extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
+									   HashJoinTable hashtable);
 
 #endif							/* NODEHASH_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index e05bc04f525..6c322e57c00 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1980,6 +1980,29 @@ typedef struct GatherMergeState
 	struct binaryheap *gm_heap; /* binary heap of slot indices */
 } GatherMergeState;
 
+/* ----------------
+ *	 Values displayed by EXPLAIN ANALYZE
+ * ----------------
+ */
+typedef struct HashInstrumentation
+{
+	int			nbuckets;
+	int			nbuckets_original;
+	int			nbatch;
+	int			nbatch_original;
+	size_t		space_peak;
+} HashInstrumentation;
+
+/* ----------------
+ *	 Shared memory container for per-worker hash information
+ * ----------------
+ */
+typedef struct SharedHashInfo
+{
+	int			num_workers;
+	HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER];
+} SharedHashInfo;
+
 /* ----------------
  *	 HashState information
  * ----------------
@@ -1990,6 +2013,9 @@ typedef struct HashState
 	HashJoinTable hashtable;	/* hash table for the hashjoin */
 	List	   *hashkeys;		/* list of ExprState nodes */
 	/* hashkeys is same as parent's hj_InnerHashKeys */
+
+	SharedHashInfo *shared_info;	/* one entry per worker */
+	HashInstrumentation *hinstrument;	/* this worker's entry */
 } HashState;
 
 /* ----------------
diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out
index b7d17900978..001d96dc2d8 100644
--- a/src/test/regress/expected/join.out
+++ b/src/test/regress/expected/join.out
@@ -6173,6 +6173,21 @@ $$);
 
 rollback to settings;
 -- A couple of other hash join tests unrelated to work_mem management.
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     1
+(1 row)
+
+rollback to settings;
 -- A full outer join where every record is matched.
 -- non-parallel
 savepoint settings;
diff --git a/src/test/regress/sql/join.sql b/src/test/regress/sql/join.sql
index c6d4a513e86..882601b3388 100644
--- a/src/test/regress/sql/join.sql
+++ b/src/test/regress/sql/join.sql
@@ -2159,6 +2159,17 @@ rollback to settings;
 
 -- A couple of other hash join tests unrelated to work_mem management.
 
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
 -- A full outer join where every record is matched.
 
 -- non-parallel
-- 
2.15.0

