On Sun, Oct 14, 2018 at 11:11 PM Andrey Borodin <x4...@yandex-team.ru> wrote:
> > 14 окт. 2018 г., в 9:18, Thomas Munro <thomas.mu...@enterprisedb.com> 
> > написал(а):
> >
> > +               /* Prefetch the bucket for the next key */
> > +               uint32 next_hash = hash_uint32(DatumGetInt32(keyval) + 1);
> > +               uint32 next_bucket = next_hash % hashtable->nbuckets;
> > +               
> > __builtin_prefetch(&hashtable->buckets.unshared[next_bucket]);
>
>
> +1, I also think that we should use __builtin_prefetch these days (years, 
> actually).
> Exactly after listening Anastassia Ailamaki's (author of referenced paper) 
> talk on VLDB I've proposed to do that for B-tree [0], but did not really 
> pursuit that idea.

The above was obviously just a hard-coded hack that "knew" the next
key would be n + 1.  I've been wondering how you might abstract real
look-ahead with the shiny new TupleTableSlot design.  Here's a concept
I came up with: ExecScrollSlot(slot, 1) -> bool, to try to look ahead
to the next tuple if possible.  I suppose there could be a kind of
scrolling that actually consumes tuples (for true batch-mode tuple
processing in tight inner loops, for example hash table build), and
scrolling that merely peeks ahead (what I'm describing so far).  I'm
keen to hear other ideas about how that could look, because I know
that "vector" and "batch" mode tuple processing are ideas that people
have been bouncing around for a while.  Flame away.

POC patch attached.  I never got around to figuring out why it breaks
anti-joins (hence some regression test failures) or filling out
various other important details (see commit message for a list), but I
figured it was better on the mailing list than hiding in a drawer,
anyway.

Here is an example of times for a trivial join on my laptop.  Note
that this is prefetching only the probing phase, not while building
which should also be possible.  I didn't get around to trying deeper
prefetch pipelines as discussed earlier, but those seemed to produce
diminishing returns with hardcoded tests like in the earlier message.

shared_buffers = '3GB'

create table r as select generate_series(1, 40000000)::int i;
create table s as select generate_series(1, 10000000)::int i;
analyze;

set max_parallel_workers_per_gather = 0;
set work_mem = '2GB';
select pg_prewarm('r'::regclass);
select pg_prewarm('s'::regclass);

select count(*) from r join s using (i);

Master:  00:14.230
Patched: 00:11.818


--
Thomas Munro
https://enterprisedb.com
From a352a861372aa0738746fce0b5a5dc962bb87d95 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Wed, 10 Apr 2019 15:19:23 +1200
Subject: [PATCH] Experimental ExecScrollSlot() for hash join prefetch.

Incomplet, inkorrect, proof-of-concept code only.

Problems with ExecScrollSlot():
* needs a configure test
* not preserving deform state when scrolling back and forth
* should ExecScrollSlot(n) be absolute or relative?
* how could we have a "consuming" mode, so that has build could use a tight
  inner loop over all the tuples in a page?
* look-ahead size of just 1 for now!
* only works with page-mode BufferHeapTupleTableSlot with no key test
* ...

Problems with hash join prefetch:
* anti-joins are broken (!), see make check, haven't looked into why yet
* ignoring skew hash table
* not prefetching in hash build phase
* need parallel hash join support
* ...

Author: Thomas Munro
Discussion: https://postgr.es/m/flat/CA%2Bq6zcXg5-Rc4k0JY%2B7%3DgEDGWjCVp0X9t7JdnCuaAfeNmtTEZQ%40mail.gmail.com#35d34634efe8315587efd0b0da9775fc
---
 src/backend/access/heap/heapam.c         | 21 +++++++++++++-
 src/backend/access/heap/heapam_handler.c | 10 +++----
 src/backend/executor/execTuples.c        | 36 +++++++++++++++++++++--
 src/backend/executor/nodeHash.c          | 37 ++++++++++++++++++++++++
 src/backend/executor/nodeHashjoin.c      |  4 +++
 src/include/access/heapam.h              |  1 +
 src/include/executor/hashjoin.h          |  5 ++++
 src/include/executor/nodeHash.h          |  6 ++++
 src/include/executor/tuptable.h          | 26 ++++++++++++++++-
 9 files changed, 136 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a05b6a07ad..6e66acb688 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -292,6 +292,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 	scan->rs_numblocks = InvalidBlockNumber;
 	scan->rs_inited = false;
 	scan->rs_ctup.t_data = NULL;
+	scan->rs_ntup.t_data = NULL;
 	ItemPointerSetInvalid(&scan->rs_ctup.t_self);
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
@@ -493,6 +494,8 @@ heapgettup(HeapScanDesc scan,
 	int			linesleft;
 	ItemId		lpp;
 
+	scan->rs_ntup.t_data = NULL;
+
 	/*
 	 * calculate next starting lineoff, given scan direction
 	 */
@@ -807,6 +810,8 @@ heapgettup_pagemode(HeapScanDesc scan,
 	int			linesleft;
 	ItemId		lpp;
 
+	scan->rs_ntup.t_data = NULL;
+
 	/*
 	 * calculate next starting lineindex, given scan direction
 	 */
@@ -983,6 +988,18 @@ heapgettup_pagemode(HeapScanDesc scan,
 			else
 			{
 				scan->rs_cindex = lineindex;
+
+				/* Allow executor nodes to scroll to the next tuple. */
+				if (linesleft > 1)
+				{
+					lineoff = scan->rs_vistuples[lineindex + (backward ? -1 : 1)];
+					lpp = PageGetItemId(dp, lineoff);
+					Assert(ItemIdIsNormal(lpp));
+					tuple = &scan->rs_ntup;
+					tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+					tuple->t_len = ItemIdGetLength(lpp);
+					ItemPointerSet(&(tuple->t_self), page, lineoff);
+				}
 				return;
 			}
 
@@ -1356,7 +1373,9 @@ heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *s
 
 	pgstat_count_heap_getnext(scan->rs_base.rs_rd);
 
-	ExecStoreBufferHeapTuple(&scan->rs_ctup, slot,
+	ExecStoreBufferHeapTuple(&scan->rs_ctup,
+							 scan->rs_ntup.t_data ? &scan->rs_ntup : NULL,
+							 slot,
 							 scan->rs_cbuf);
 	return true;
 }
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 1d8ca2429f..8c1a3c6564 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -163,7 +163,7 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
 		*call_again = !IsMVCCSnapshot(snapshot);
 
 		slot->tts_tableOid = RelationGetRelid(scan->rel);
-		ExecStoreBufferHeapTuple(&bslot->base.tupdata, slot, hscan->xs_cbuf);
+		ExecStoreBufferHeapTuple(&bslot->base.tupdata, NULL, slot, hscan->xs_cbuf);
 	}
 	else
 	{
@@ -1104,7 +1104,7 @@ heapam_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
 
 		if (sample_it)
 		{
-			ExecStoreBufferHeapTuple(targtuple, slot, hscan->rs_cbuf);
+			ExecStoreBufferHeapTuple(targtuple, NULL, slot, hscan->rs_cbuf);
 			hscan->rs_cindex++;
 
 			/* note that we leave the buffer locked here! */
@@ -1578,7 +1578,7 @@ heapam_index_build_range_scan(Relation heapRelation,
 		MemoryContextReset(econtext->ecxt_per_tuple_memory);
 
 		/* Set up for predicate or expression evaluation */
-		ExecStoreBufferHeapTuple(heapTuple, slot, hscan->rs_cbuf);
+		ExecStoreBufferHeapTuple(heapTuple, NULL, slot, hscan->rs_cbuf);
 
 		/*
 		 * In a partial index, discard tuples that don't satisfy the
@@ -2220,7 +2220,7 @@ heapam_scan_bitmap_next_tuple(TableScanDesc scan,
 	 * Set up the result slot to point to this tuple.  Note that the slot
 	 * acquires a pin on the buffer.
 	 */
-	ExecStoreBufferHeapTuple(&hscan->rs_ctup,
+	ExecStoreBufferHeapTuple(&hscan->rs_ctup, NULL,
 							 slot,
 							 hscan->rs_cbuf);
 
@@ -2374,7 +2374,7 @@ heapam_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate,
 			if (!pagemode)
 				LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_UNLOCK);
 
-			ExecStoreBufferHeapTuple(tuple, slot, hscan->rs_cbuf);
+			ExecStoreBufferHeapTuple(tuple, NULL, slot, hscan->rs_cbuf);
 
 			/* Count successfully-fetched tuples as heap fetches */
 			pgstat_count_heap_getnext(scan->rs_rd);
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 41fa374b6f..715b7dcb7d 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -76,6 +76,7 @@ slot_deform_heap_tuple(TupleTableSlot *slot, HeapTuple tuple, uint32 *offp,
 					   int natts);
 static inline void tts_buffer_heap_store_tuple(TupleTableSlot *slot,
 											   HeapTuple tuple,
+											   HeapTuple next,
 											   Buffer buffer,
 											   bool transfer_pin);
 static void tts_heap_store_tuple(TupleTableSlot *slot, HeapTuple tuple, bool shouldFree);
@@ -664,6 +665,7 @@ tts_buffer_heap_clear(TupleTableSlot *slot)
 	ItemPointerSetInvalid(&slot->tts_tid);
 	bslot->base.tuple = NULL;
 	bslot->base.off = 0;
+	bslot->ntuples = 0;
 	bslot->buffer = InvalidBuffer;
 }
 
@@ -737,6 +739,9 @@ tts_buffer_heap_materialize(TupleTableSlot *slot)
 	 */
 	bslot->base.off = 0;
 	slot->tts_nvalid = 0;
+
+	bslot->ntuples = 1;
+	bslot->tuples[0] = bslot->base.tuple;
 }
 
 static void
@@ -767,7 +772,7 @@ tts_buffer_heap_copyslot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
 	{
 		Assert(BufferIsValid(bsrcslot->buffer));
 
-		tts_buffer_heap_store_tuple(dstslot, bsrcslot->base.tuple,
+		tts_buffer_heap_store_tuple(dstslot, bsrcslot->base.tuple, NULL,
 									bsrcslot->buffer, false);
 
 		/*
@@ -779,6 +784,24 @@ tts_buffer_heap_copyslot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
 		memcpy(&bdstslot->base.tupdata, bdstslot->base.tuple, sizeof(HeapTupleData));
 		bdstslot->base.tuple = &bdstslot->base.tupdata;
 	}
+	bdstslot->tuples[0] = bdstslot->base.tuple;
+	bdstslot->ntuples = 1;
+}
+
+static bool
+tts_buffer_heap_scrollslot(TupleTableSlot *slot, int n)
+{
+	BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+
+	if (n >= bslot->ntuples)
+		return false;
+
+	bslot->base.tuple = bslot->tuples[n];
+	bslot->base.off = 0;
+	slot->tts_nvalid = 0;
+	slot->tts_tid = bslot->base.tuple->t_self;
+
+	return true;
 }
 
 static HeapTuple
@@ -822,6 +845,7 @@ tts_buffer_heap_copy_minimal_tuple(TupleTableSlot *slot)
 
 static inline void
 tts_buffer_heap_store_tuple(TupleTableSlot *slot, HeapTuple tuple,
+							HeapTuple next_tuple,
 							Buffer buffer, bool transfer_pin)
 {
 	BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
@@ -838,6 +862,9 @@ tts_buffer_heap_store_tuple(TupleTableSlot *slot, HeapTuple tuple,
 	slot->tts_flags &= ~TTS_FLAG_EMPTY;
 	slot->tts_nvalid = 0;
 	bslot->base.tuple = tuple;
+	bslot->tuples[0] = tuple;
+	bslot->tuples[1] = next_tuple;
+	bslot->ntuples = next_tuple ? 2 : 1;
 	bslot->base.off = 0;
 	slot->tts_tid = tuple->t_self;
 
@@ -1050,6 +1077,7 @@ const TupleTableSlotOps TTSOpsBufferHeapTuple = {
 	.getsysattr = tts_buffer_heap_getsysattr,
 	.materialize = tts_buffer_heap_materialize,
 	.copyslot = tts_buffer_heap_copyslot,
+	.scrollslot= tts_buffer_heap_scrollslot,
 	.get_heap_tuple = tts_buffer_heap_get_heap_tuple,
 
 	/* A buffer heap tuple table slot can not "own" a minimal tuple. */
@@ -1339,6 +1367,7 @@ ExecStoreHeapTuple(HeapTuple tuple,
  *		into a specified slot in the tuple table.
  *
  *		tuple:	tuple to store
+ *		next_tuple: if available, the next tuple of a scan, in the same buffer
  *		slot:	TTSOpsBufferHeapTuple type slot to store it in
  *		buffer: disk buffer if tuple is in a disk page, else InvalidBuffer
  *
@@ -1353,6 +1382,7 @@ ExecStoreHeapTuple(HeapTuple tuple,
  */
 TupleTableSlot *
 ExecStoreBufferHeapTuple(HeapTuple tuple,
+						 HeapTuple next_tuple,
 						 TupleTableSlot *slot,
 						 Buffer buffer)
 {
@@ -1366,7 +1396,7 @@ ExecStoreBufferHeapTuple(HeapTuple tuple,
 
 	if (unlikely(!TTS_IS_BUFFERTUPLE(slot)))
 		elog(ERROR, "trying to store an on-disk heap tuple into wrong type of slot");
-	tts_buffer_heap_store_tuple(slot, tuple, buffer, false);
+	tts_buffer_heap_store_tuple(slot, tuple, next_tuple, buffer, false);
 
 	slot->tts_tableOid = tuple->t_tableOid;
 
@@ -1392,7 +1422,7 @@ ExecStorePinnedBufferHeapTuple(HeapTuple tuple,
 
 	if (unlikely(!TTS_IS_BUFFERTUPLE(slot)))
 		elog(ERROR, "trying to store an on-disk heap tuple into wrong type of slot");
-	tts_buffer_heap_store_tuple(slot, tuple, buffer, true);
+	tts_buffer_heap_store_tuple(slot, tuple, NULL, buffer, true);
 
 	slot->tts_tableOid = tuple->t_tableOid;
 
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 64eec91f8b..cdf5f4db1b 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 	hashtable->parallel_state = state->parallel_state;
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
+	hashtable->have_prefetch = false;
 
 #ifdef HJDEBUG
 	printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
@@ -1769,6 +1770,34 @@ ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable,
 		heap_free_minimal_tuple(tuple);
 }
 
+void
+ExecHashPrefetch(HashJoinTable hashtable,
+				 TupleTableSlot *slot,
+				 ExprContext *econtext,
+				 List *hashkeys,
+				 bool outer_tuple,
+				 bool keep_nulls)
+{
+	hashtable->have_prefetch = false;
+
+	if (ExecScrollSlot(slot, 1))
+	{
+		uint32		next_bucket;
+
+		hashtable->prefetch_result =
+			ExecHashGetHashValue(hashtable,
+								 econtext,
+								 hashkeys,
+								 outer_tuple,
+								 keep_nulls,
+								 &hashtable->prefetch_hash);
+		hashtable->have_prefetch = true;
+		next_bucket = hashtable->prefetch_hash % hashtable->nbuckets;
+		__builtin_prefetch(&hashtable->buckets.unshared[next_bucket]);
+		ExecScrollSlot(slot, 0);
+	}
+}
+
 /*
  * ExecHashGetHashValue
  *		Compute the hash value for a tuple
@@ -1796,6 +1825,14 @@ ExecHashGetHashValue(HashJoinTable hashtable,
 	int			i = 0;
 	MemoryContext oldContext;
 
+	/* Do we have a pre-computed result from ExecHashPrefetch()? */
+	if (hashtable->have_prefetch)
+	{
+		hashtable->have_prefetch = false;
+		*hashvalue = hashtable->prefetch_hash;
+		return hashtable->prefetch_result;
+	}
+
 	/*
 	 * We reset the eval context each time to reclaim any memory leaked in the
 	 * hashkey expressions.
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index aa43296e26..853eff4682 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -855,6 +855,10 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
 				/* remember outer relation is not empty for possible rescan */
 				hjstate->hj_OuterNotEmpty = true;
 
+				ExecHashPrefetch(hashtable, slot, econtext,
+								 hjstate->hj_OuterHashKeys,
+								 true,	/* outer tuple */
+								 HJ_FILL_OUTER(hjstate));
 				return slot;
 			}
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 77e5e603b0..1b2cd06dc5 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -63,6 +63,7 @@ typedef struct HeapScanDescData
 	BufferAccessStrategy rs_strategy;	/* access strategy for reads */
 
 	HeapTupleData rs_ctup;		/* current tuple in scan, if any */
+	HeapTupleData rs_ntup;		/* next tuple in scan, if any */
 
 	/* these fields only used in page-at-a-time mode and for bitmap scans */
 	int			rs_cindex;		/* current tuple's index in vistuples */
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 2c94b926d3..dd0d3ded3d 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -329,6 +329,11 @@ typedef struct HashJoinTableData
 	BufFile   **innerBatchFile; /* buffered virtual temp file per batch */
 	BufFile   **outerBatchFile; /* buffered virtual temp file per batch */
 
+	/* State used for prefetching cache lines for future tuples. */
+	bool		have_prefetch;
+	uint32		prefetch_hash;
+	bool		prefetch_result;
+
 	/*
 	 * Info about the datatype-specific hash functions for the datatypes being
 	 * hashed. These are arrays of the same length as the number of hash join
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 1233766023..8dab0c4e99 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -49,6 +49,12 @@ extern bool ExecHashGetHashValue(HashJoinTable hashtable,
 					 bool outer_tuple,
 					 bool keep_nulls,
 					 uint32 *hashvalue);
+extern void ExecHashPrefetch(HashJoinTable hashtable,
+							 TupleTableSlot *slot,
+							 ExprContext *econtext,
+							 List *hashkeys,
+							 bool outer_tuple,
+							 bool keep_nulls);
 extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 						  uint32 hashvalue,
 						  int *bucketno,
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index b0561ebe29..951ade0de8 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -179,6 +179,11 @@ struct TupleTableSlotOps
 	 */
 	void (*copyslot) (TupleTableSlot *dstslot, TupleTableSlot *srcslot);
 
+	/*
+	 * Scroll forward to a future tuple, if the slot allows it.
+	 */
+	bool (*scrollslot) (TupleTableSlot *slot, int n);
+
 	/*
 	 * Return a heap tuple "owned" by the slot. It is slot's responsibility to
 	 * free the memory consumed by the heap tuple. If the slot can not "own" a
@@ -258,6 +263,12 @@ typedef struct BufferHeapTupleTableSlot
 {
 	HeapTupleTableSlot base;
 
+#define BUFFERHEAPTUPLE_SCROLL_SIZE 2
+	/* Buffer to support scrolling. */
+	HeapTuple	tuples[BUFFERHEAPTUPLE_SCROLL_SIZE];
+	int			ntuples;
+	int			ctuple;
+
 	/*
 	 * If buffer is not InvalidBuffer, then the slot is holding a pin on the
 	 * indicated buffer page; drop the pin when we release the slot's
@@ -265,7 +276,7 @@ typedef struct BufferHeapTupleTableSlot
 	 * false in such a case, since presumably tts_tuple is pointing at the
 	 * buffer page.)
 	 */
-	Buffer		buffer;		/* tuple's buffer, or InvalidBuffer */
+	Buffer		buffer;		/* tuples' buffer, or InvalidBuffer */
 } BufferHeapTupleTableSlot;
 
 typedef struct MinimalTupleTableSlot
@@ -308,6 +319,7 @@ extern TupleTableSlot *ExecStoreHeapTuple(HeapTuple tuple,
 				   bool shouldFree);
 extern void ExecForceStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot);
 extern TupleTableSlot *ExecStoreBufferHeapTuple(HeapTuple tuple,
+						 HeapTuple peek_next,
 						 TupleTableSlot *slot,
 						 Buffer buffer);
 extern TupleTableSlot *ExecStorePinnedBufferHeapTuple(HeapTuple tuple,
@@ -480,6 +492,18 @@ ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
 	return dstslot;
 }
 
+/*
+ * Try to scroll to another tuple.
+ */
+static inline bool
+ExecScrollSlot(TupleTableSlot *slot, int n)
+{
+	Assert(!TTS_EMPTY(slot));
+
+	return slot->tts_ops->scrollslot && slot->tts_ops->scrollslot(slot, n);
+}
+
+
 #endif							/* FRONTEND */
 
 #endif							/* TUPTABLE_H */
-- 
2.21.0

Reply via email to