On Tue, Jun 18, 2019 at 3:24 PM Melanie Plageman <melanieplage...@gmail.com>
wrote:

>
> These questions will probably make a lot more sense with corresponding
> code, so I will follow up with the second version of the state machine
> patch once I finish it.
>
>
I have changed the state machine and resolved the questions I had
raised in the previous email. This seems to work for the parallel and
non-parallel cases. I have not yet rewritten the unmatched outer tuple
status as a bitmap in a spill file (for ease of debugging).

Before doing that, I wanted to ask what a desirable fallback condition
would be. In this patch, fallback to hashloop join happens only when
inserting tuples into the hashtable after batch 0 when inserting
another tuple from the batch file would exceed work_mem. This means
you can't increase nbatches, which, I would think is undesirable.

I thought a bit about when fallback should happen. So, let's say that
we would like to fallback to hashloop join when we have increased
nbatches X times. At that point, since we do not want to fall back to
hashloop join for all batches, we have to make a decision. After
increasing nbatches the Xth time, do we then fall back for all batches
for which inserting inner tuples exceeds work_mem? Do we use this
strategy but work_mem + some fudge factor?

Or, do we instead try to determine if data skew led us to increase
nbatches both times and then determine which batch, given new
nbatches, contains that data, set fallback to true only for that
batch, and let all other batches use the existing logic (with no
fallback option) unless they contain a value which leads to increasing
nbatches X number of times?

-- 
Melanie Plageman
From 2d6fec7d2bac90a41d4ec88ad5ac2011562a14a1 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Mon, 10 Jun 2019 10:54:42 -0700
Subject: [PATCH v2] hashloop fallback

First part is to "chunk" the inner file into arbitrary partitions of
work_mem size

This chunks inner file and makes it so that the offset is along tuple
bounds.

Note that this makes it impossible to increase nbatches during the
loading of batches after initial hashtable creation

In preparation for doing this chunking, separate advance batch and load
batch. advance batch only if page offset is reset to 0, then load that
part of the batch

Second part was to: implement outer tuple batch rewinding per chunk of
inner batch

Would be a simple rewind and replay of outer side for each chunk of
inner if it weren't for LOJ.
Because we need to wait to emit NULL-extended tuples for LOJ until after
all chunks of inner have been processed.
To do this, make a list with an entry for each outer tuple and keep
track of its match status. Also, keep track of its offset so that we can
access the file at that offset in case the tuples are not processed in
order (like in parallel case)

For non-hashloop fallback scenario, this list should not be constructed
and unmatched outer tuples should be emitted as they are encountered.
---
 src/backend/executor/nodeHashjoin.c       | 379 ++++++++++++++++----
 src/include/executor/hashjoin.h           |  10 +
 src/include/nodes/execnodes.h             |  12 +
 src/test/regress/expected/adaptive_hj.out | 402 ++++++++++++++++++++++
 src/test/regress/parallel_schedule        |   2 +-
 src/test/regress/serial_schedule          |   1 +
 src/test/regress/sql/adaptive_hj.sql      |  39 +++
 7 files changed, 770 insertions(+), 75 deletions(-)
 create mode 100644 src/test/regress/expected/adaptive_hj.out
 create mode 100644 src/test/regress/sql/adaptive_hj.sql

diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 8484a287e7..e46b453a9b 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -124,9 +124,11 @@
 #define HJ_BUILD_HASHTABLE		1
 #define HJ_NEED_NEW_OUTER		2
 #define HJ_SCAN_BUCKET			3
-#define HJ_FILL_OUTER_TUPLE		4
-#define HJ_FILL_INNER_TUPLES	5
-#define HJ_NEED_NEW_BATCH		6
+#define HJ_FILL_INNER_TUPLES    4
+#define HJ_NEED_NEW_BATCH		5
+#define HJ_NEED_NEW_INNER_CHUNK 6
+#define HJ_ADAPTIVE_EMIT_UNMATCHED_OUTER_INIT 7
+#define HJ_ADAPTIVE_EMIT_UNMATCHED_OUTER 8
 
 /* Returns true if doing null-fill on outer relation */
 #define HJ_FILL_OUTER(hjstate)	((hjstate)->hj_NullInnerTupleSlot != NULL)
@@ -143,10 +145,15 @@ static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
 												 BufFile *file,
 												 uint32 *hashvalue,
 												 TupleTableSlot *tupleSlot);
-static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
+static bool ExecHashJoinAdvanceBatch(HashJoinState *hjstate);
 static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
 static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
+static bool LoadInner(HashJoinState *hjstate);
+static TupleTableSlot *ExecHashJoinGetOuterTupleAtOffset(HashJoinState *hjstate, off_t offset);
+static void rewindOuter(BufFile *bufFile);
 
+static TupleTableSlot *
+emitUnmatchedOuterTuple(ExprState *otherqual, ExprContext *econtext, HashJoinState *hjstate);
 
 /* ----------------------------------------------------------------
  *		ExecHashJoinImpl
@@ -198,6 +205,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 	 */
 	for (;;)
 	{
+		bool done = false;
+
 		/*
 		 * It's possible to iterate this loop many times before returning a
 		 * tuple, in some pathological cases such as needing to move much of
@@ -209,7 +218,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 		switch (node->hj_JoinState)
 		{
 			case HJ_BUILD_HASHTABLE:
-
+				elog(DEBUG1, "HJ_BUILD_HASHTABLE");
 				/*
 				 * First time through: build hash table for inner relation.
 				 */
@@ -343,7 +352,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				/* FALL THRU */
 
 			case HJ_NEED_NEW_OUTER:
-
+				elog(DEBUG1, "HJ_NEED_NEW_OUTER");
 				/*
 				 * We don't have an outer tuple, try to get the next one
 				 */
@@ -357,20 +366,29 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 
 				if (TupIsNull(outerTupleSlot))
 				{
-					/* end of batch, or maybe whole join */
+					/*
+					 * end of batch, or maybe whole join
+					 * for hashloop fallback, all we know is outer batch is exhausted
+					 * inner could have more chunks
+					 */
 					if (HJ_FILL_INNER(node))
 					{
 						/* set up to scan for unmatched inner tuples */
 						ExecPrepHashTableForUnmatched(node);
 						node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+						break;
 					}
-					else
-						node->hj_JoinState = HJ_NEED_NEW_BATCH;
-					continue;
+					node->hj_JoinState = HJ_NEED_NEW_INNER_CHUNK;
+					break;
 				}
-
+				/*
+				 * only initialize this to false during the first chunk --
+				 * otherwise, we will be resetting hj_MatchedOuter
+				 * to false for an outer tuple that has already matched an inner tuple
+				 */
+				if (node->first_chunk || hashtable->curbatch == 0)
+					node->hj_MatchedOuter = false;
 				econtext->ecxt_outertuple = outerTupleSlot;
-				node->hj_MatchedOuter = false;
 
 				/*
 				 * Find the corresponding bucket for this tuple in the main
@@ -410,6 +428,48 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					continue;
 				}
 
+				/*
+				 * We need to construct the linked list of match statuses on the first chunk.
+				 * Note that node->first_chunk isn't true until HJ_NEED_NEW_BATCH
+				 * so this means that we don't construct this list on batch 0.
+				 * This list should also only be constructed for hashloop fallback
+				 */
+				if (node->first_chunk && hashtable->outerBatchFile && node->hashloop_fallback == true)
+				{
+					BufFile *outerFile = hashtable->outerBatchFile[batchno];
+
+					if (outerFile != NULL)
+					{
+						OuterOffsetMatchStatus *outerOffsetMatchStatus = NULL;
+
+						outerOffsetMatchStatus = palloc(sizeof(struct OuterOffsetMatchStatus));
+						outerOffsetMatchStatus->match_status = false;
+						outerOffsetMatchStatus->outer_tuple_start_offset = 0L;
+						outerOffsetMatchStatus->next = NULL;
+
+						if (node->first_outer_offset_match_status != NULL)
+						{
+							node->current_outer_offset_match_status->next = outerOffsetMatchStatus;
+							node->current_outer_offset_match_status = outerOffsetMatchStatus;
+						}
+						else /* node->first_outer_offset_match_status == NULL */
+						{
+							node->first_outer_offset_match_status = outerOffsetMatchStatus;
+							node->current_outer_offset_match_status = node->first_outer_offset_match_status;
+						}
+
+						outerOffsetMatchStatus->outer_tuple_val = DatumGetInt32(outerTupleSlot->tts_values[0]);
+						outerOffsetMatchStatus->outer_tuple_start_offset = node->HJ_NEED_NEW_OUTER_tup_start;
+					}
+				}
+				else if (node->hj_HashTable->curbatch > 0)
+				{
+					if (node->current_outer_offset_match_status == NULL)
+						node->current_outer_offset_match_status = node->first_outer_offset_match_status;
+					else
+						node->current_outer_offset_match_status = node->current_outer_offset_match_status->next;
+				}
+
 				/* OK, let's scan the bucket for matches */
 				node->hj_JoinState = HJ_SCAN_BUCKET;
 
@@ -417,28 +477,32 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 
 			case HJ_SCAN_BUCKET:
 
+				elog(DEBUG1, "HJ_SCAN_BUCKET");
 				/*
 				 * Scan the selected hash bucket for matches to current outer
 				 */
 				if (parallel)
-				{
-					if (!ExecParallelScanHashBucket(node, econtext))
-					{
-						/* out of matches; check for possible outer-join fill */
-						node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
-						continue;
-					}
-				}
+					done = !ExecParallelScanHashBucket(node, econtext);
 				else
+					done = !ExecScanHashBucket(node, econtext);
+
+				if (done)
 				{
-					if (!ExecScanHashBucket(node, econtext))
+					/*
+					 * The current outer tuple has run out of matches, so check
+					 * whether to emit a dummy outer-join tuple.  Whether we emit
+					 * one or not, the next state is NEED_NEW_OUTER.
+					 */
+					node->hj_JoinState = HJ_NEED_NEW_OUTER;
+
+					if (node->hj_HashTable->curbatch == 0 || node->hashloop_fallback == false)
 					{
-						/* out of matches; check for possible outer-join fill */
-						node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
-						continue;
+						TupleTableSlot *slot = emitUnmatchedOuterTuple(otherqual, econtext, node);
+						if (slot != NULL)
+							return slot;
 					}
+					continue;
 				}
-
 				/*
 				 * We've got a match, but still need to test non-hashed quals.
 				 * ExecScanHashBucket already set up all the state needed to
@@ -455,6 +519,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				{
 					node->hj_MatchedOuter = true;
 					HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
+					if (node->current_outer_offset_match_status)
+						node->current_outer_offset_match_status->match_status = true;
 
 					/* In an antijoin, we never return a matched tuple */
 					if (node->js.jointype == JOIN_ANTI)
@@ -480,33 +546,9 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					InstrCountFiltered1(node, 1);
 				break;
 
-			case HJ_FILL_OUTER_TUPLE:
-
-				/*
-				 * The current outer tuple has run out of matches, so check
-				 * whether to emit a dummy outer-join tuple.  Whether we emit
-				 * one or not, the next state is NEED_NEW_OUTER.
-				 */
-				node->hj_JoinState = HJ_NEED_NEW_OUTER;
-
-				if (!node->hj_MatchedOuter &&
-					HJ_FILL_OUTER(node))
-				{
-					/*
-					 * Generate a fake join tuple with nulls for the inner
-					 * tuple, and return it if it passes the non-join quals.
-					 */
-					econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
-
-					if (otherqual == NULL || ExecQual(otherqual, econtext))
-						return ExecProject(node->js.ps.ps_ProjInfo);
-					else
-						InstrCountFiltered2(node, 1);
-				}
-				break;
-
 			case HJ_FILL_INNER_TUPLES:
 
+				elog(DEBUG1, "HJ_FILL_INNER_TUPLES");
 				/*
 				 * We have finished a batch, but we are doing right/full join,
 				 * so any unmatched inner tuples in the hashtable have to be
@@ -515,7 +557,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				if (!ExecScanHashTableForUnmatched(node, econtext))
 				{
 					/* no more unmatched tuples */
-					node->hj_JoinState = HJ_NEED_NEW_BATCH;
+					node->hj_JoinState = HJ_NEED_NEW_INNER_CHUNK;
 					continue;
 				}
 
@@ -533,6 +575,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 
 			case HJ_NEED_NEW_BATCH:
 
+				elog(DEBUG1, "HJ_NEED_NEW_BATCH");
 				/*
 				 * Try to advance to next batch.  Done if there are no more.
 				 */
@@ -543,10 +586,100 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				}
 				else
 				{
-					if (!ExecHashJoinNewBatch(node))
-						return NULL;	/* end of parallel-oblivious join */
+					if (node->first_outer_offset_match_status && HJ_FILL_OUTER(node) && node->hashloop_fallback == true)
+					{
+						/*
+						 * For hashloop fallback, outer tuples are not emitted
+						 * until directly before advancing the batch (after all inner
+						 * chunks have been processed). node->hashloop_fallback should be
+						 * true because it is not reset to false until advancing the batches
+						 */
+						node->hj_JoinState = HJ_ADAPTIVE_EMIT_UNMATCHED_OUTER_INIT;
+						break;
+					}
+
+					if (!ExecHashJoinAdvanceBatch(node))
+						return NULL;    /* end of parallel-oblivious join */
+
+					rewindOuter(node->hj_HashTable->outerBatchFile[node->hj_HashTable->curbatch]);
+					LoadInner(node);
+
+					/*
+					 * If we just loaded the first chunk of a new inner batch,
+					 * we should reset head of the list of outer tuple match statuses
+					 * so we can construct a new list for the new corresponding outer batch file
+					 * Doing it here works because we have not created any of the structs
+					 * of match statuses for the outer tuples until HJ_NEED_NEW_OUTER
+					 *
+					 * Even if we are not at the beginning of a new inner batch, we need
+					 * to reset the pointer to the current match status object for the current
+					 * outer tuple before transitioning to HJ_NEED_NEW_OUTER as a way of
+					 * rewinding the list.
+					 * we use the status of current -- NULL or non-NULL to determine in
+					 * HJ_NEED_NEW_OUTER if we should advance to the next item in the list or
+					 * "rewind" by setting head to current.
+					 */
+					if (node->first_chunk)
+						node->first_outer_offset_match_status = NULL;
+					node->current_outer_offset_match_status = NULL;
+				}
+				node->hj_JoinState = HJ_NEED_NEW_OUTER;
+				break;
+
+			case HJ_NEED_NEW_INNER_CHUNK:
+
+				elog(DEBUG1, "HJ_NEED_NEW_INNER_CHUNK");
+
+				// ?? Will inner_page_offset != 0 ever when curbatch == 0 ?
+				if (node->inner_page_offset == 0L || node->hj_HashTable->curbatch == 0) // inner batch is exhausted
+				{
+					/*
+					 * either it is the fallback case and there are no more chunks
+					 * or, there were never chunks because this is the non-fallback case
+					 * or this is batch 0
+					 * in any of these cases, load next batch
+					 */
+					node->hj_JoinState = HJ_NEED_NEW_BATCH;
+					break;
 				}
 				node->hj_JoinState = HJ_NEED_NEW_OUTER;
+				/*
+				 * Rewind outer batch file (if present), so that we can start reading it.
+				 */
+				rewindOuter(node->hj_HashTable->outerBatchFile[node->hj_HashTable->curbatch]);
+				LoadInner(node);
+				node->current_outer_offset_match_status = NULL;
+				break;
+
+			case HJ_ADAPTIVE_EMIT_UNMATCHED_OUTER_INIT:
+
+				node->cursor = node->first_outer_offset_match_status;
+				node->first_outer_offset_match_status = NULL;
+				node->hj_JoinState = HJ_ADAPTIVE_EMIT_UNMATCHED_OUTER;
+				/* fall through */
+
+			case HJ_ADAPTIVE_EMIT_UNMATCHED_OUTER:
+				while (node->cursor)
+				{
+					if (node->cursor->match_status == true)
+					{
+						node->cursor = node->cursor->next;
+						continue;
+					}
+					/*
+					 * if it is not a match, go to the offset in the page that it specifies
+					 * and emit it NULL-extended
+					 */
+					econtext->ecxt_outertuple = ExecHashJoinGetOuterTupleAtOffset(node, node->cursor->outer_tuple_start_offset);
+					econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
+					node->cursor = node->cursor->next;
+					return ExecProject(node->js.ps.ps_ProjInfo);
+				}
+				node->cursor = NULL;
+				/*
+				 * came here from HJ_NEED_NEW_BATCH, so go back there
+				 */
+				node->hj_JoinState = HJ_NEED_NEW_BATCH;
 				break;
 
 			default:
@@ -628,6 +761,13 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
 	hjstate->js.ps.ExecProcNode = ExecHashJoin;
 	hjstate->js.jointype = node->join.jointype;
 
+	hjstate->hashloop_fallback = false;
+	hjstate->inner_page_offset = 0L;
+	hjstate->first_chunk = false;
+	hjstate->HJ_NEED_NEW_OUTER_tup_start = 0L;
+	hjstate->HJ_NEED_NEW_OUTER_tup_end = 0L;
+	hjstate->current_outer_offset_match_status = NULL;
+	hjstate->first_outer_offset_match_status = NULL;
 	/*
 	 * Miscellaneous initialization
 	 *
@@ -765,6 +905,8 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
 	hjstate->hj_MatchedOuter = false;
 	hjstate->hj_OuterNotEmpty = false;
 
+	hjstate->cursor = NULL;
+
 	return hjstate;
 }
 
@@ -805,6 +947,59 @@ ExecEndHashJoin(HashJoinState *node)
 	ExecEndNode(innerPlanState(node));
 }
 
+static TupleTableSlot *
+ExecHashJoinGetOuterTupleAtOffset(HashJoinState *hjstate, off_t offset)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int curbatch = hashtable->curbatch;
+	TupleTableSlot *slot;
+	uint32 hashvalue;
+
+	BufFile    *file = hashtable->outerBatchFile[curbatch];
+	/* ? should fileno always be 0? */
+	if (BufFileSeek(file, 0, offset, SEEK_SET))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+						errmsg("could not rewind hash-join temporary file: %m")));
+
+	slot = ExecHashJoinGetSavedTuple(hjstate,
+									 file,
+									 &hashvalue,
+									 hjstate->hj_OuterTupleSlot);
+	return slot;
+}
+
+static void rewindOuter(BufFile *bufFile)
+{
+	if (bufFile != NULL)
+	{
+		if (BufFileSeek(bufFile, 0, 0L, SEEK_SET))
+			ereport(ERROR,
+				(errcode_for_file_access(),
+					errmsg("could not rewind hash-join temporary file: %m")));
+	}
+}
+
+static TupleTableSlot *
+emitUnmatchedOuterTuple(ExprState *otherqual, ExprContext *econtext, HashJoinState *hjstate)
+{
+	if (hjstate->hj_MatchedOuter)
+		return NULL;
+
+	if (!HJ_FILL_OUTER(hjstate))
+		return NULL;
+
+	econtext->ecxt_innertuple = hjstate->hj_NullInnerTupleSlot;
+	/*
+	 * Generate a fake join tuple with nulls for the inner
+	 * tuple, and return it if it passes the non-join quals.
+	 */
+	if (otherqual == NULL || ExecQual(otherqual, econtext))
+		return ExecProject(hjstate->js.ps.ps_ProjInfo);
+
+	InstrCountFiltered2(hjstate, 1);
+	return NULL;
+}
 /*
  * ExecHashJoinOuterGetTuple
  *
@@ -951,20 +1146,17 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
 }
 
 /*
- * ExecHashJoinNewBatch
+ * ExecHashJoinAdvanceBatch
  *		switch to a new hashjoin batch
  *
  * Returns true if successful, false if there are no more batches.
  */
 static bool
-ExecHashJoinNewBatch(HashJoinState *hjstate)
+ExecHashJoinAdvanceBatch(HashJoinState *hjstate)
 {
 	HashJoinTable hashtable = hjstate->hj_HashTable;
 	int			nbatch;
 	int			curbatch;
-	BufFile    *innerFile;
-	TupleTableSlot *slot;
-	uint32		hashvalue;
 
 	nbatch = hashtable->nbatch;
 	curbatch = hashtable->curbatch;
@@ -1039,10 +1231,32 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 		curbatch++;
 	}
 
+	hjstate->inner_page_offset = 0L;
+	hjstate->first_chunk = true;
+	hjstate->hashloop_fallback = false; /* new batch, so start it off false */
 	if (curbatch >= nbatch)
 		return false;			/* no more batches */
 
 	hashtable->curbatch = curbatch;
+	return true;
+}
+
+/*
+ * Returns true if there are more chunks left, false otherwise
+ */
+static bool LoadInner(HashJoinState *hjstate)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int curbatch = hashtable->curbatch;
+	BufFile    *innerFile;
+	TupleTableSlot *slot;
+	uint32		hashvalue;
+
+	off_t tup_start_offset;
+	off_t chunk_start_offset;
+	off_t tup_end_offset;
+	int64 current_saved_size;
+	int current_fileno;
 
 	/*
 	 * Reload the hash table with the new inner batch (which could be empty)
@@ -1051,45 +1265,58 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 
 	innerFile = hashtable->innerBatchFile[curbatch];
 
+	/*
+	 * Reset this even if the innerfile is not null
+	 */
+	hjstate->first_chunk = hjstate->inner_page_offset == 0L;
+
 	if (innerFile != NULL)
 	{
-		if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
+		/* should fileno always be 0? */
+		if (BufFileSeek(innerFile, 0, hjstate->inner_page_offset, SEEK_SET))
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not rewind hash-join temporary file: %m")));
 
+		chunk_start_offset = hjstate->inner_page_offset;
+		tup_end_offset = hjstate->inner_page_offset;
 		while ((slot = ExecHashJoinGetSavedTuple(hjstate,
 												 innerFile,
 												 &hashvalue,
 												 hjstate->hj_HashTupleSlot)))
 		{
+			/* next tuple's start is last tuple's end */
+			tup_start_offset = tup_end_offset;
+			/* after we got the tuple, figure out what the offset is */
+			BufFileTell(innerFile, &current_fileno, &tup_end_offset);
+			current_saved_size = tup_end_offset - chunk_start_offset;
+			if (current_saved_size > work_mem)
+			{
+				hjstate->inner_page_offset = tup_start_offset;
+				hjstate->hashloop_fallback = true;
+				return true;
+			}
+			hjstate->inner_page_offset = tup_end_offset;
 			/*
-			 * NOTE: some tuples may be sent to future batches.  Also, it is
-			 * possible for hashtable->nbatch to be increased here!
+			 * NOTE: some tuples may be sent to future batches.
+			 * With current hashloop patch, however, it is not possible
+			 * for hashtable->nbatch to be increased here
 			 */
 			ExecHashTableInsert(hashtable, slot, hashvalue);
 		}
 
+		/* this is the end of the file */
+		hjstate->inner_page_offset = 0L;
+
 		/*
-		 * after we build the hash table, the inner batch file is no longer
+		 * after we processed all chunks, the inner batch file is no longer
 		 * needed
 		 */
 		BufFileClose(innerFile);
 		hashtable->innerBatchFile[curbatch] = NULL;
 	}
 
-	/*
-	 * Rewind outer batch file (if present), so that we can start reading it.
-	 */
-	if (hashtable->outerBatchFile[curbatch] != NULL)
-	{
-		if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not rewind hash-join temporary file: %m")));
-	}
-
-	return true;
+	return false;
 }
 
 /*
@@ -1270,6 +1497,8 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
 	uint32		header[2];
 	size_t		nread;
 	MinimalTuple tuple;
+	int dummy_fileno;
+
 
 	/*
 	 * We check for interrupts here because this is typically taken as an
@@ -1278,6 +1507,7 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
 	 */
 	CHECK_FOR_INTERRUPTS();
 
+	BufFileTell(file, &dummy_fileno, &hjstate->HJ_NEED_NEW_OUTER_tup_start);
 	/*
 	 * Since both the hash value and the MinimalTuple length word are uint32,
 	 * we can read them both in one BufFileRead() call without any type
@@ -1304,6 +1534,7 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
 				(errcode_for_file_access(),
 				 errmsg("could not read from hash-join temporary file: %m")));
 	ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
+	BufFileTell(file, &dummy_fileno, &hjstate->HJ_NEED_NEW_OUTER_tup_end);
 	return tupleSlot;
 }
 
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 2c94b926d3..bd5aeba74c 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -59,6 +59,16 @@
  * if so, we just dump them out to the correct batch file.
  * ----------------------------------------------------------------
  */
+struct OuterOffsetMatchStatus;
+typedef struct OuterOffsetMatchStatus OuterOffsetMatchStatus;
+
+struct OuterOffsetMatchStatus
+{
+	bool match_status;
+	off_t outer_tuple_start_offset;
+	int32 outer_tuple_val;
+	struct OuterOffsetMatchStatus *next;
+};
 
 /* these are in nodes/execnodes.h: */
 /* typedef struct HashJoinTupleData *HashJoinTuple; */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 99b9fa414f..5edf48ae67 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -42,6 +42,8 @@ struct RangeTblEntry;			/* avoid including parsenodes.h here */
 struct ExprEvalStep;			/* avoid including execExpr.h everywhere */
 struct CopyMultiInsertBuffer;
 
+struct OuterOffsetMatchStatus;
+
 
 /* ----------------
  *		ExprState node
@@ -1899,6 +1901,16 @@ typedef struct HashJoinState
 	int			hj_JoinState;
 	bool		hj_MatchedOuter;
 	bool		hj_OuterNotEmpty;
+
+	bool hashloop_fallback;
+	off_t inner_page_offset;
+	bool first_chunk;
+	struct OuterOffsetMatchStatus *first_outer_offset_match_status;
+	struct OuterOffsetMatchStatus *current_outer_offset_match_status;
+	struct OuterOffsetMatchStatus *cursor;
+
+	off_t HJ_NEED_NEW_OUTER_tup_start;
+	off_t HJ_NEED_NEW_OUTER_tup_end;
 } HashJoinState;
 
 
diff --git a/src/test/regress/expected/adaptive_hj.out b/src/test/regress/expected/adaptive_hj.out
new file mode 100644
index 0000000000..a687ecf759
--- /dev/null
+++ b/src/test/regress/expected/adaptive_hj.out
@@ -0,0 +1,402 @@
+drop table if exists t1;
+NOTICE:  table "t1" does not exist, skipping
+drop table if exists t2;
+NOTICE:  table "t2" does not exist, skipping
+create table t1(a int);
+create table t2(b int);
+insert into t1 values(1),(2);
+insert into t2 values(2),(3),(11);
+insert into t1 select i from generate_series(1,10)i;
+insert into t2 select i from generate_series(2,10)i;
+insert into t1 select 2 from generate_series(1,5)i;
+insert into t2 select 2 from generate_series(2,7)i;
+set work_mem=64;
+set enable_mergejoin to off;
+select * from t1 left outer join t2 on a = b order by b;
+ a  | b  
+----+----
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  3 |  3
+  3 |  3
+  4 |  4
+  5 |  5
+  6 |  6
+  7 |  7
+  8 |  8
+  9 |  9
+ 10 | 10
+  1 |   
+  1 |   
+(67 rows)
+
+select count(*) from t1 left outer join t2 on a = b;
+ count 
+-------
+    67
+(1 row)
+
+select * from t1, t2 where a = b;
+ a  | b  
+----+----
+  5 |  5
+  3 |  3
+  3 |  3
+  4 |  4
+  7 |  7
+  6 |  6
+  9 |  9
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  8 |  8
+ 10 | 10
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+(65 rows)
+
+select count(*) from t1, t2 where a = b;
+ count 
+-------
+    65
+(1 row)
+
+select * from t1 right outer join t2 on a = b order by b;
+ a  | b  
+----+----
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  3 |  3
+  3 |  3
+  4 |  4
+  5 |  5
+  6 |  6
+  7 |  7
+  8 |  8
+  9 |  9
+ 10 | 10
+    | 11
+(66 rows)
+
+select count(*) from t1 right outer join t2 on a = b;
+ count 
+-------
+    66
+(1 row)
+
+select * from t1 full outer join t2 on a = b order by b;
+ a  | b  
+----+----
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  2 |  2
+  3 |  3
+  3 |  3
+  4 |  4
+  5 |  5
+  6 |  6
+  7 |  7
+  8 |  8
+  9 |  9
+ 10 | 10
+    | 11
+  1 |   
+  1 |   
+(68 rows)
+
+select count(*) from t1 full outer join t2 on a = b;
+ count 
+-------
+    68
+(1 row)
+
+truncate table t1;
+insert into t1 values (1),(2),(2),(3);
+truncate table t2;
+insert into t2 values(2),(2),(3),(3),(4);
+set work_mem=64;
+set enable_mergejoin to off;
+select * from t1 left outer join t2 on a = b order by b;
+ a | b 
+---+---
+ 2 | 2
+ 2 | 2
+ 2 | 2
+ 2 | 2
+ 3 | 3
+ 3 | 3
+ 1 |  
+(7 rows)
+
+select count(*) from t1 left outer join t2 on a = b;
+ count 
+-------
+     7
+(1 row)
+
+select * from t1, t2 where a = b;
+ a | b 
+---+---
+ 3 | 3
+ 3 | 3
+ 2 | 2
+ 2 | 2
+ 2 | 2
+ 2 | 2
+(6 rows)
+
+select count(*) from t1, t2 where a = b;
+ count 
+-------
+     6
+(1 row)
+
+select * from t1 right outer join t2 on a = b order by b;
+ a | b 
+---+---
+ 2 | 2
+ 2 | 2
+ 2 | 2
+ 2 | 2
+ 3 | 3
+ 3 | 3
+   | 4
+(7 rows)
+
+select count(*) from t1 right outer join t2 on a = b;
+ count 
+-------
+     7
+(1 row)
+
+select * from t1 full outer join t2 on a = b order by b;
+ a | b 
+---+---
+ 2 | 2
+ 2 | 2
+ 2 | 2
+ 2 | 2
+ 3 | 3
+ 3 | 3
+   | 4
+ 1 |  
+(8 rows)
+
+select count(*) from t1 full outer join t2 on a = b;
+ count 
+-------
+     8
+(1 row)
+
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 8fb55f045e..7492c2c45b 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -78,7 +78,7 @@ test: brin gin gist spgist privileges init_privs security_label collate matview
 # ----------
 # Another group of parallel tests
 # ----------
-test: create_table_like alter_generic alter_operator misc async dbsize misc_functions sysviews tsrf tidscan
+test: create_table_like alter_generic alter_operator misc async dbsize misc_functions sysviews tsrf tidscan adaptive_hj
 
 # rules cannot run concurrently with any test that creates
 # a view or rule in the public schema
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index a39ca1012a..17099bf604 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -91,6 +91,7 @@ test: subselect
 test: union
 test: case
 test: join
+test: adaptive_hj
 test: aggregates
 test: transactions
 ignore: random
diff --git a/src/test/regress/sql/adaptive_hj.sql b/src/test/regress/sql/adaptive_hj.sql
new file mode 100644
index 0000000000..76c041e6f1
--- /dev/null
+++ b/src/test/regress/sql/adaptive_hj.sql
@@ -0,0 +1,39 @@
+drop table if exists t1;
+drop table if exists t2;
+create table t1(a int);
+create table t2(b int);
+
+insert into t1 values(1),(2);
+insert into t2 values(2),(3),(11);
+insert into t1 select i from generate_series(1,10)i;
+insert into t2 select i from generate_series(2,10)i;
+insert into t1 select 2 from generate_series(1,5)i;
+insert into t2 select 2 from generate_series(2,7)i;
+set work_mem=64;
+set enable_mergejoin to off;
+
+select * from t1 left outer join t2 on a = b order by b;
+select count(*) from t1 left outer join t2 on a = b;
+select * from t1, t2 where a = b;
+select count(*) from t1, t2 where a = b;
+select * from t1 right outer join t2 on a = b order by b;
+select count(*) from t1 right outer join t2 on a = b;
+select * from t1 full outer join t2 on a = b order by b;
+select count(*) from t1 full outer join t2 on a = b;
+
+truncate table t1;
+insert into t1 values (1),(2),(2),(3);
+truncate table t2;
+insert into t2 values(2),(2),(3),(3),(4);
+
+set work_mem=64;
+set enable_mergejoin to off;
+
+select * from t1 left outer join t2 on a = b order by b;
+select count(*) from t1 left outer join t2 on a = b;
+select * from t1, t2 where a = b;
+select count(*) from t1, t2 where a = b;
+select * from t1 right outer join t2 on a = b order by b;
+select count(*) from t1 right outer join t2 on a = b;
+select * from t1 full outer join t2 on a = b order by b;
+select count(*) from t1 full outer join t2 on a = b;
-- 
2.22.0

Reply via email to