HIVE-10565: Native Vector Map Join doesn't handle filtering and matching on LEFT OUTER JOIN repeated key correctly (Matt McCline via Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2b9f2f5e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2b9f2f5e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2b9f2f5e Branch: refs/heads/master Commit: 2b9f2f5e2574e6e64ce9496dfe9ff6e085036fb1 Parents: 3fa7489 Author: Gunther Hagleitner <gunt...@apache.org> Authored: Thu May 14 15:42:04 2015 -0700 Committer: Gunther Hagleitner <gunt...@apache.org> Committed: Thu May 14 15:42:04 2015 -0700 ---------------------------------------------------------------------- .../test/resources/testconfiguration.properties | 10 + .../ql/exec/vector/VectorizedBatchUtil.java | 5 +- .../mapjoin/VectorMapJoinCommonOperator.java | 8 +- .../VectorMapJoinGenerateResultOperator.java | 47 +- ...pJoinInnerBigOnlyGenerateResultOperator.java | 53 +- .../VectorMapJoinInnerBigOnlyLongOperator.java | 15 +- ...ctorMapJoinInnerBigOnlyMultiKeyOperator.java | 15 +- ...VectorMapJoinInnerBigOnlyStringOperator.java | 12 +- ...ectorMapJoinInnerGenerateResultOperator.java | 39 +- .../mapjoin/VectorMapJoinInnerLongOperator.java | 17 +- .../VectorMapJoinInnerMultiKeyOperator.java | 19 +- .../VectorMapJoinInnerStringOperator.java | 17 +- ...orMapJoinLeftSemiGenerateResultOperator.java | 40 +- .../VectorMapJoinLeftSemiLongOperator.java | 13 +- .../VectorMapJoinLeftSemiMultiKeyOperator.java | 17 +- .../VectorMapJoinLeftSemiStringOperator.java | 17 +- ...ectorMapJoinOuterGenerateResultOperator.java | 805 ++++--- .../mapjoin/VectorMapJoinOuterLongOperator.java | 189 +- .../VectorMapJoinOuterMultiKeyOperator.java | 184 +- .../VectorMapJoinOuterStringOperator.java | 185 +- .../mapjoin/VectorMapJoinRowBytesContainer.java | 2 +- .../fast/VectorMapJoinFastBytesHashMap.java | 8 +- .../VectorMapJoinFastBytesHashMultiSet.java | 4 +- .../fast/VectorMapJoinFastBytesHashTable.java | 10 +- .../mapjoin/fast/VectorMapJoinFastKeyStore.java | 10 +- .../fast/VectorMapJoinFastLongHashMap.java | 2 +- .../fast/VectorMapJoinFastLongHashTable.java | 18 +- .../fast/VectorMapJoinFastTableContainer.java | 2 +- .../fast/VectorMapJoinFastValueStore.java | 8 +- .../VectorMapJoinOptimizedLongCommon.java | 4 +- .../hive/ql/optimizer/physical/Vectorizer.java | 24 +- .../test/queries/clientpositive/vector_join30.q | 160 ++ .../clientpositive/vector_join_filters.q | 38 + .../queries/clientpositive/vector_join_nulls.q | 33 + .../clientpositive/vector_left_outer_join2.q | 2 + .../queries/clientpositive/vector_outer_join5.q | 173 ++ .../tez/acid_vectorization_partition.q.out | 20 +- .../clientpositive/tez/vector_join30.q.out | 1367 +++++++++++ .../tez/vector_join_filters.q.out | 222 ++ .../clientpositive/tez/vector_join_nulls.q.out | 195 ++ .../tez/vector_left_outer_join2.q.out | 20 +- .../tez/vector_left_outer_join3.q.out | 222 ++ .../clientpositive/tez/vector_outer_join5.q.out | 1328 +++++++++++ .../tez/vectorized_timestamp_ints_casts.q.out | 234 ++ .../results/clientpositive/vector_join30.q.out | 2194 ++++++++++++++++++ .../clientpositive/vector_join_filters.q.out | 222 ++ .../clientpositive/vector_join_nulls.q.out | 195 ++ .../vector_left_outer_join2.q.out | 8 +- .../clientpositive/vector_outer_join5.q.out | 1300 +++++++++++ 49 files changed, 8936 insertions(+), 796 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index f9c9351..c79c36c 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -220,8 +220,12 @@ minitez.query.files.shared=alter_merge_2_orc.q,\ vector_groupby_3.q,\ vector_groupby_reduce.q,\ vector_if_expr.q,\ + vector_inner_join.q,\ vector_interval_1.q,\ vector_interval_2.q,\ + vector_join30.q,\ + vector_join_filters.q,\ + vector_join_nulls.q,\ vector_left_outer_join.q,\ vector_left_outer_join2.q,\ vector_leftsemi_mapjoin.q,\ @@ -230,6 +234,12 @@ minitez.query.files.shared=alter_merge_2_orc.q,\ vector_multi_insert.q,\ vector_non_string_partition.q,\ vector_orderby_5.q,\ + vector_outer_join0.q,\ + vector_outer_join1.q,\ + vector_outer_join2.q,\ + vector_outer_join3.q,\ + vector_outer_join4.q,\ + vector_outer_join5.q,\ vector_partition_diff_num_cols.q,\ vector_partitioned_date_time.q,\ vector_reduce_groupby_decimal.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index dcea8ae..4a16b4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -645,8 +645,7 @@ public class VectorizedBatchUtil { public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) { StringBuffer sb = new StringBuffer(); sb.append(prefix + " row " + index + " "); - for (int i = 0; i < batch.projectionSize; i++) { - int column = batch.projectedColumns[i]; + for (int column = 0; column < batch.cols.length; column++) { ColumnVector colVector = batch.cols[column]; if (colVector == null) { sb.append("(null colVector " + column + ")"); @@ -666,7 +665,7 @@ public class VectorizedBatchUtil { if (bytes == null) { sb.append("(Unexpected null bytes with start " + start + " length " + length + ")"); } else { - sb.append(displayBytes(bytes, start, length)); + sb.append("bytes: '" + displayBytes(bytes, start, length) + "'"); } } else if (colVector instanceof DecimalColumnVector) { sb.append(((DecimalColumnVector) colVector).vector[index].toString()); http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java index a9082eb..af78776 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java @@ -470,8 +470,8 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor bigTableByteColumnVectorColumns " + Arrays.toString(bigTableByteColumnVectorColumns)); LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor smallTableByteColumnVectorColumns " + Arrays.toString(smallTableByteColumnVectorColumns)); - LOG.info(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor outputProjection " + Arrays.toString(outputProjection)); - LOG.info(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor outputTypeNames " + Arrays.toString(outputTypeNames)); + LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor outputProjection " + Arrays.toString(outputProjection)); + LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor outputTypeNames " + Arrays.toString(outputTypeNames)); } setupVOutContext(conf.getOutputColumnNames()); @@ -503,7 +503,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem */ protected void setupVOutContext(List<String> outputColumnNames) { if (LOG.isDebugEnabled()) { - LOG.info(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor outputColumnNames " + outputColumnNames); + LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor outputColumnNames " + outputColumnNames); } if (outputColumnNames.size() != outputProjection.length) { throw new RuntimeException("Output column names " + outputColumnNames + " length and output projection " + Arrays.toString(outputProjection) + " / " + Arrays.toString(outputTypeNames) + " length mismatch"); @@ -729,9 +729,9 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem * Common one time setup by native vectorized map join operator's processOp. */ protected void commonSetup(VectorizedRowBatch batch) throws HiveException { - LOG.info("VectorMapJoinInnerCommonOperator commonSetup begin..."); if (LOG.isDebugEnabled()) { + LOG.debug("VectorMapJoinInnerCommonOperator commonSetup begin..."); displayBatchColumns(batch, "batch"); displayBatchColumns(overflowBatch, "overflowBatch"); } http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 860ebb5..32c126c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -373,10 +373,8 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC * The big table batch. * @param hashMapResult * The hash map results for the repeated key. - * @return - * The new count of selected rows. */ - protected int generateHashMapResultRepeatedAll(VectorizedRowBatch batch, + protected void generateHashMapResultRepeatedAll(VectorizedRowBatch batch, VectorMapJoinHashMapResult hashMapResult) throws IOException, HiveException { int[] selected = batch.selected; @@ -400,7 +398,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC batch.selected, 0, batch.size); } - return numSel; + batch.size = numSel; } //----------------------------------------------------------------------------------------------- @@ -462,7 +460,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC // int length = output.getLength() - offset; rowBytesContainer.finishRow(); -// LOG.info("spillSerializeRow spilled batchIndex " + batchIndex + ", length " + length); +// LOG.debug("spillSerializeRow spilled batchIndex " + batchIndex + ", length " + length); } protected void spillHashMapBatch(VectorizedRowBatch batch, @@ -514,14 +512,18 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC smallTable); needHashTableSetup = true; - LOG.info(CLASS_NAME + " reloadHashTable!"); + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " reloadHashTable!"); + } } @Override protected void reProcessBigTable(int partitionId) throws HiveException { - LOG.info(CLASS_NAME + " reProcessBigTable enter..."); + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " reProcessBigTable enter..."); + } if (spillReplayBatch == null) { // The process method was not called -- no big table rows. @@ -544,14 +546,14 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC int offset = bigTable.currentOffset(); int length = bigTable.currentLength(); -// LOG.info(CLASS_NAME + " reProcessBigTable serialized row #" + rowCount + ", offset " + offset + ", length " + length); +// LOG.debug(CLASS_NAME + " reProcessBigTable serialized row #" + rowCount + ", offset " + offset + ", length " + length); bigTableVectorDeserializeRow.setBytes(bytes, offset, length); bigTableVectorDeserializeRow.deserializeByValue(spillReplayBatch, spillReplayBatch.size); spillReplayBatch.size++; if (spillReplayBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { - LOG.info("reProcessBigTable going to call process with spillReplayBatch.size " + spillReplayBatch.size + " rows"); + // LOG.debug("reProcessBigTable going to call process with spillReplayBatch.size " + spillReplayBatch.size + " rows"); process(spillReplayBatch, posBigTable); // call process once we have a full batch spillReplayBatch.reset(); batchCount++; @@ -559,7 +561,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC } // Process the row batch that has less than DEFAULT_SIZE rows if (spillReplayBatch.size > 0) { - LOG.info("reProcessBigTable going to call process with spillReplayBatch.size " + spillReplayBatch.size + " rows"); + // LOG.debug("reProcessBigTable going to call process with spillReplayBatch.size " + spillReplayBatch.size + " rows"); process(spillReplayBatch, posBigTable); spillReplayBatch.reset(); batchCount++; @@ -570,7 +572,9 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC throw new HiveException(e); } - LOG.info(CLASS_NAME + " reProcessBigTable exit! " + rowCount + " row processed and " + batchCount + " batches processed"); + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " reProcessBigTable exit! " + rowCount + " row processed and " + batchCount + " batches processed"); + } } @@ -632,7 +636,9 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC if (!aborted && overflowBatch.size > 0) { forwardOverflow(); } - LOG.info("VectorMapJoinInnerLongOperator closeOp " + batchCounter + " batches processed"); + if (LOG.isDebugEnabled()) { + LOG.debug("VectorMapJoinInnerLongOperator closeOp " + batchCounter + " batches processed"); + } } //----------------------------------------------------------------------------------------------- @@ -641,6 +647,23 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC * Debug. */ + public boolean verifyMonotonicallyIncreasing(int[] selected, int size) { + + if (size == 0) { + return true; + } + int prevBatchIndex = selected[0]; + + for (int i = 1; i < size; i++) { + int batchIndex = selected[i]; + if (batchIndex <= prevBatchIndex) { + return false; + } + prevBatchIndex = batchIndex; + } + return true; + } + public static String intArrayToRangesString(int selection[], int size) { if (size == 0) { return "[]"; http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java index 3132531..f18b982 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java @@ -129,22 +129,10 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator * @param batch * The big table batch with any matching and any non matching rows both as * selected in use. - * @param allMatchs - * A subset of the rows of the batch that are matches. * @param allMatchCount * Number of matches in allMatchs. - * @param equalKeySeriesValueCounts - * For each equal key series, whether the number of (empty) small table values. - * @param equalKeySeriesAllMatchIndices - * For each equal key series, the logical index into allMatchs. - * @param equalKeySeriesDuplicateCounts - * For each equal key series, the number of duplicates or equal keys. * @param equalKeySeriesCount * Number of single value matches. - * @param spills - * A subset of the rows of the batch that are spills. - * @param spillHashMapResultIndices - * For each entry in spills, the index into the hashMapResult. * @param spillCount * Number of spills in spills. * @param hashTableResults @@ -154,15 +142,16 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator * Number of entries in hashMapResults. * **/ - protected int finishInnerBigOnly(VectorizedRowBatch batch, - int[] allMatchs, int allMatchCount, - long[] equalKeySeriesValueCounts, int[] equalKeySeriesAllMatchIndices, - int[] equalKeySeriesDuplicateCounts, int equalKeySeriesCount, - int[] spills, int[] spillHashMapResultIndices, int spillCount, + protected void finishInnerBigOnly(VectorizedRowBatch batch, + int allMatchCount, int equalKeySeriesCount, int spillCount, VectorMapJoinHashTableResult[] hashTableResults, int hashMapResultCount) throws HiveException, IOException { - int numSel = 0; + // Get rid of spills before we start modifying the batch. + if (spillCount > 0) { + spillHashMapBatch(batch, hashTableResults, + spills, spillHashMapResultIndices, spillCount); + } /* * Optimize by running value expressions only over the matched rows. @@ -171,6 +160,7 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator performValueExpressions(batch, allMatchs, allMatchCount); } + int numSel = 0; for (int i = 0; i < equalKeySeriesCount; i++) { long count = equalKeySeriesValueCounts[i]; int allMatchesIndex = equalKeySeriesAllMatchIndices[i]; @@ -185,13 +175,8 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator duplicateCount, count); } } - - if (spillCount > 0) { - spillHashMapBatch(batch, hashTableResults, - spills, spillHashMapResultIndices, spillCount); - } - - return numSel; + batch.size = numSel; + batch.selectedInUse = true; } /** @@ -215,11 +200,11 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator int[] allMatchs, int allMatchesIndex, int duplicateCount, int numSel) throws HiveException, IOException { - // LOG.info("generateHashMultiSetResultSingleValue enter..."); + // LOG.debug("generateHashMultiSetResultSingleValue enter..."); // Generate result within big table batch itself. - // LOG.info("generateHashMultiSetResultSingleValue with big table..."); + // LOG.debug("generateHashMultiSetResultSingleValue with big table..."); for (int i = 0; i < duplicateCount; i++) { @@ -250,7 +235,7 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator int[] allMatchs, int allMatchesIndex, int duplicateCount, long count) throws HiveException, IOException { - // LOG.info("generateHashMultiSetResultMultiValue allMatchesIndex " + allMatchesIndex + " duplicateCount " + duplicateCount + " count " + count); + // LOG.debug("generateHashMultiSetResultMultiValue allMatchesIndex " + allMatchesIndex + " duplicateCount " + duplicateCount + " count " + count); // TODO: Look at repeating optimizations... @@ -309,11 +294,9 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator return 0; } - protected int finishInnerBigOnlyRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult, + protected void finishInnerBigOnlyRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult, VectorMapJoinHashMultiSetResult hashMultiSetResult) throws HiveException, IOException { - int numSel = 0; - switch (joinResult) { case MATCH: @@ -325,19 +308,21 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator } // Generate special repeated case. - numSel = generateHashMultiSetResultRepeatedAll(batch, hashMultiSetResult); + int numSel = generateHashMultiSetResultRepeatedAll(batch, hashMultiSetResult); + batch.size = numSel; + batch.selectedInUse = true; break; case SPILL: // Whole batch is spilled. spillBatchRepeated(batch, (VectorMapJoinHashTableResult) hashMultiSetResult); + batch.size = 0; break; case NOMATCH: // No match for entire batch. + batch.size = 0; break; } - - return numSel; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java index 53a91d8..bb7efda 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java @@ -151,9 +151,6 @@ public class VectorMapJoinInnerBigOnlyLongOperator extends VectorMapJoinInnerBig } } - // We rebuild in-place the selected array with rows destine to be forwarded. - int numSel = 0; - /* * Single-Column Long specific declarations. */ @@ -198,7 +195,7 @@ public class VectorMapJoinInnerBigOnlyLongOperator extends VectorMapJoinInnerBig if (LOG.isDebugEnabled()) { LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); } - numSel = finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]); + finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]); } else { /* @@ -358,17 +355,11 @@ public class VectorMapJoinInnerBigOnlyLongOperator extends VectorMapJoinInnerBig " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); } - numSel = finishInnerBigOnly(batch, - allMatchs, allMatchCount, - equalKeySeriesValueCounts, equalKeySeriesAllMatchIndices, - equalKeySeriesDuplicateCounts, equalKeySeriesCount, - spills, spillHashMapResultIndices, spillCount, + finishInnerBigOnly(batch, + allMatchCount, equalKeySeriesCount, spillCount, (VectorMapJoinHashTableResult[]) hashMultiSetResults, hashMultiSetResultCount); } - batch.selectedInUse = true; - batch.size = numSel; - if (batch.size > 0) { // Forward any remaining selected rows. forwardBigTableBatch(batch); http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java index 9553fa0..c36f668 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java @@ -156,9 +156,6 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne } } - // We rebuild in-place the selected array with rows destine to be forwarded. - int numSel = 0; - /* * Multi-Key specific declarations. */ @@ -210,7 +207,7 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne if (LOG.isDebugEnabled()) { LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); } - numSel = finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]); + finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]); } else { /* @@ -371,17 +368,11 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); } - numSel = finishInnerBigOnly(batch, - allMatchs, allMatchCount, - equalKeySeriesValueCounts, equalKeySeriesAllMatchIndices, - equalKeySeriesDuplicateCounts, equalKeySeriesCount, - spills, spillHashMapResultIndices, spillCount, + finishInnerBigOnly(batch, + allMatchCount, equalKeySeriesCount, spillCount, (VectorMapJoinHashTableResult[]) hashMultiSetResults, hashMultiSetResultCount); } - batch.selectedInUse = true; - batch.size = numSel; - if (batch.size > 0) { // Forward any remaining selected rows. forwardBigTableBatch(batch); http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java index 17d0b63..87a11c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java @@ -187,7 +187,7 @@ public class VectorMapJoinInnerBigOnlyStringOperator extends VectorMapJoinInnerB if (LOG.isDebugEnabled()) { LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); } - numSel = finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]); + finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]); } else { /* @@ -347,17 +347,11 @@ public class VectorMapJoinInnerBigOnlyStringOperator extends VectorMapJoinInnerB " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); } - numSel = finishInnerBigOnly(batch, - allMatchs, allMatchCount, - equalKeySeriesValueCounts, equalKeySeriesAllMatchIndices, - equalKeySeriesDuplicateCounts, equalKeySeriesCount, - spills, spillHashMapResultIndices, spillCount, + finishInnerBigOnly(batch, + allMatchCount, equalKeySeriesCount, spillCount, (VectorMapJoinHashTableResult[]) hashMultiSetResults, hashMultiSetResultCount); } - batch.selectedInUse = true; - batch.size = numSel; - if (batch.size > 0) { // Forward any remaining selected rows. forwardBigTableBatch(batch); http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java index 3a5e4b2..ee1abd3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java @@ -147,38 +147,17 @@ public abstract class VectorMapJoinInnerGenerateResultOperator * @param batch * The big table batch with any matching and any non matching rows both as * selected in use. - * @param allMatchs - * A subset of the rows of the batch that are matches. * @param allMatchCount * Number of matches in allMatchs. - * @param equalKeySeriesHashMapResultIndices - * For each equal key series, the index into the hashMapResult. - * @param equalKeySeriesAllMatchIndices - * For each equal key series, the logical index into allMatchs. - * @param equalKeySeriesIsSingleValue - * For each equal key series, whether there is 1 or multiple small table values. - * @param equalKeySeriesDuplicateCounts - * For each equal key series, the number of duplicates or equal keys. * @param equalKeySeriesCount * Number of single value matches. - * @param spills - * A subset of the rows of the batch that are spills. - * @param spillHashMapResultIndices - * For each entry in spills, the index into the hashMapResult. * @param spillCount * Number of spills in spills. - * @param hashMapResults - * The array of all hash map results for the batch. * @param hashMapResultCount * Number of entries in hashMapResults. */ - protected int finishInner(VectorizedRowBatch batch, - int[] allMatchs, int allMatchCount, - int[] equalKeySeriesHashMapResultIndices, int[] equalKeySeriesAllMatchIndices, - boolean[] equalKeySeriesIsSingleValue, int[] equalKeySeriesDuplicateCounts, - int equalKeySeriesCount, - int[] spills, int[] spillHashMapResultIndices, int spillCount, - VectorMapJoinHashMapResult[] hashMapResults, int hashMapResultCount) + protected void finishInner(VectorizedRowBatch batch, + int allMatchCount, int equalKeySeriesCount, int spillCount, int hashMapResultCount) throws HiveException, IOException { int numSel = 0; @@ -211,10 +190,11 @@ public abstract class VectorMapJoinInnerGenerateResultOperator spills, spillHashMapResultIndices, spillCount); } - return numSel; + batch.size = numSel; + batch.selectedInUse = true; } - protected int finishInnerRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult, + protected void finishInnerRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult, VectorMapJoinHashTableResult hashMapResult) throws HiveException, IOException { int numSel = 0; @@ -230,22 +210,19 @@ public abstract class VectorMapJoinInnerGenerateResultOperator } // Generate special repeated case. - numSel = generateHashMapResultRepeatedAll(batch, hashMapResults[0]); + generateHashMapResultRepeatedAll(batch, hashMapResults[0]); break; case SPILL: // Whole batch is spilled. spillBatchRepeated(batch, (VectorMapJoinHashTableResult) hashMapResults[0]); + batch.size = 0; break; case NOMATCH: // No match for entire batch. + batch.size = 0; break; } - /* - * Common repeated join result processing. - */ - - return numSel; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java index b77a93c..9005d00 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java @@ -149,9 +149,6 @@ public class VectorMapJoinInnerLongOperator extends VectorMapJoinInnerGenerateRe } } - // We rebuild in-place the selected array with rows destine to be forwarded. - int numSel = 0; - /* * Single-Column Long specific declarations. */ @@ -196,7 +193,7 @@ public class VectorMapJoinInnerLongOperator extends VectorMapJoinInnerGenerateRe if (LOG.isDebugEnabled()) { LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); } - numSel = finishInnerRepeated(batch, joinResult, hashMapResults[0]); + finishInnerRepeated(batch, joinResult, hashMapResults[0]); } else { /* @@ -356,18 +353,10 @@ public class VectorMapJoinInnerLongOperator extends VectorMapJoinInnerGenerateRe " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMapResults, 0, hashMapResultCount))); } - numSel = finishInner(batch, - allMatchs, allMatchCount, - equalKeySeriesHashMapResultIndices, equalKeySeriesAllMatchIndices, - equalKeySeriesIsSingleValue, equalKeySeriesDuplicateCounts, - equalKeySeriesCount, - spills, spillHashMapResultIndices, spillCount, - hashMapResults, hashMapResultCount); + finishInner(batch, + allMatchCount, equalKeySeriesCount, spillCount, hashMapResultCount); } - batch.selectedInUse = true; - batch.size = numSel; - if (batch.size > 0) { // Forward any remaining selected rows. forwardBigTableBatch(batch); http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java index 938506b..b13ded6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java @@ -153,9 +153,6 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera } } - // We rebuild in-place the selected array with rows destine to be forwarded. - int numSel = 0; - /* * Multi-Key specific declarations. */ @@ -207,7 +204,7 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera if (LOG.isDebugEnabled()) { LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); } - numSel = finishInnerRepeated(batch, joinResult, hashMapResults[0]); + finishInnerRepeated(batch, joinResult, hashMapResults[0]); } else { /* @@ -279,7 +276,7 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera haveSaveKey = true; /* - * Multi-Key specific save key and lookup. + * Multi-Key specific save key. */ temp = saveKeyOutput; @@ -368,18 +365,10 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMapResults, 0, hashMapResultCount))); } - numSel = finishInner(batch, - allMatchs, allMatchCount, - equalKeySeriesHashMapResultIndices, equalKeySeriesAllMatchIndices, - equalKeySeriesIsSingleValue, equalKeySeriesDuplicateCounts, - equalKeySeriesCount, - spills, spillHashMapResultIndices, spillCount, - hashMapResults, hashMapResultCount); + finishInner(batch, + allMatchCount, equalKeySeriesCount, spillCount, hashMapResultCount); } - batch.selectedInUse = true; - batch.size = numSel; - if (batch.size > 0) { // Forward any remaining selected rows. forwardBigTableBatch(batch); http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java index f7dd8e2..9f10ff1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java @@ -140,9 +140,6 @@ public class VectorMapJoinInnerStringOperator extends VectorMapJoinInnerGenerate } } - // We rebuild in-place the selected array with rows destine to be forwarded. - int numSel = 0; - /* * Single-Column String specific declarations. */ @@ -185,7 +182,7 @@ public class VectorMapJoinInnerStringOperator extends VectorMapJoinInnerGenerate if (LOG.isDebugEnabled()) { LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); } - numSel = finishInnerRepeated(batch, joinResult, hashMapResults[0]); + finishInnerRepeated(batch, joinResult, hashMapResults[0]); } else { /* @@ -345,18 +342,10 @@ public class VectorMapJoinInnerStringOperator extends VectorMapJoinInnerGenerate " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMapResults, 0, hashMapResultCount))); } - numSel = finishInner(batch, - allMatchs, allMatchCount, - equalKeySeriesHashMapResultIndices, equalKeySeriesAllMatchIndices, - equalKeySeriesIsSingleValue, equalKeySeriesDuplicateCounts, - equalKeySeriesCount, - spills, spillHashMapResultIndices, spillCount, - hashMapResults, hashMapResultCount); + finishInner(batch, + allMatchCount, equalKeySeriesCount, spillCount, hashMapResultCount); } - batch.selectedInUse = true; - batch.size = numSel; - if (batch.size > 0) { // Forward any remaining selected rows. forwardBigTableBatch(batch); http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java index 230f9fe..07393b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java @@ -111,26 +111,23 @@ public abstract class VectorMapJoinLeftSemiGenerateResultOperator * @param batch * The big table batch with any matching and any non matching rows both as * selected in use. - * @param allMatchs - * A subset of the rows of the batch that are matches. * @param allMatchCount * Number of matches in allMatchs. - * @param spills - * A subset of the rows of the batch that are spills. - * @param spillHashMapResultIndices - * For each entry in spills, the index into the hashTableResults. * @param spillCount * Number of spills in spills. * @param hashTableResults * The array of all hash table results for the batch. We need the * VectorMapJoinHashTableResult for the spill information. */ - protected int finishLeftSemi(VectorizedRowBatch batch, - int[] allMatchs, int allMatchCount, - int[] spills, int[] spillHashMapResultIndices, int spillCount, + protected void finishLeftSemi(VectorizedRowBatch batch, + int allMatchCount, int spillCount, VectorMapJoinHashTableResult[] hashTableResults) throws HiveException, IOException { - int numSel; + // Get rid of spills before we start modifying the batch. + if (spillCount > 0) { + spillHashMapBatch(batch, hashTableResults, + spills, spillHashMapResultIndices, spillCount); + } /* * Optimize by running value expressions only over the matched rows. @@ -139,14 +136,9 @@ public abstract class VectorMapJoinLeftSemiGenerateResultOperator performValueExpressions(batch, allMatchs, allMatchCount); } - numSel = generateHashSetResults(batch, allMatchs, allMatchCount); - - if (spillCount > 0) { - spillHashMapBatch(batch, hashTableResults, - spills, spillHashMapResultIndices, spillCount); - } - - return numSel; + int numSel = generateHashSetResults(batch, allMatchs, allMatchCount); + batch.size = numSel; + batch.selectedInUse = true; } /** @@ -199,11 +191,9 @@ public abstract class VectorMapJoinLeftSemiGenerateResultOperator return batch.size; } - protected int finishLeftSemiRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult, + protected void finishLeftSemiRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult, VectorMapJoinHashTableResult hashSetResult) throws HiveException, IOException { - int numSel = 0; - switch (joinResult) { case MATCH: @@ -215,19 +205,21 @@ public abstract class VectorMapJoinLeftSemiGenerateResultOperator } // Generate special repeated case. - numSel = generateHashSetResultRepeatedAll(batch); + int numSel = generateHashSetResultRepeatedAll(batch); + batch.size = numSel; + batch.selectedInUse = true; break; case SPILL: // Whole batch is spilled. spillBatchRepeated(batch, (VectorMapJoinHashTableResult) hashSetResult); + batch.size = 0; break; case NOMATCH: // No match for entire batch. + batch.size = 0; break; } - - return numSel; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java index 75aeefb..712978a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java @@ -151,9 +151,6 @@ public class VectorMapJoinLeftSemiLongOperator extends VectorMapJoinLeftSemiGene } } - // We rebuild in-place the selected array with rows destine to be forwarded. - int numSel = 0; - /* * Single-Column Long specific declarations. */ @@ -198,7 +195,7 @@ public class VectorMapJoinLeftSemiLongOperator extends VectorMapJoinLeftSemiGene if (LOG.isDebugEnabled()) { LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); } - numSel = finishLeftSemiRepeated(batch, joinResult, hashSetResults[0]); + finishLeftSemiRepeated(batch, joinResult, hashSetResults[0]); } else { /* @@ -348,15 +345,11 @@ public class VectorMapJoinLeftSemiLongOperator extends VectorMapJoinLeftSemiGene " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashSetResults, 0, hashSetResultCount))); } - numSel = finishLeftSemi(batch, - allMatchs, allMatchCount, - spills, spillHashMapResultIndices, spillCount, + finishLeftSemi(batch, + allMatchCount, spillCount, (VectorMapJoinHashTableResult[]) hashSetResults); } - batch.selectedInUse = true; - batch.size = numSel; - if (batch.size > 0) { // Forward any remaining selected rows. forwardBigTableBatch(batch); http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java index ea287f4..b941431 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java @@ -155,9 +155,6 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi } } - // We rebuild in-place the selected array with rows destine to be forwarded. - int numSel = 0; - /* * Multi-Key specific declarations. */ @@ -210,7 +207,7 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi if (LOG.isDebugEnabled()) { LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); } - numSel = finishLeftSemiRepeated(batch, joinResult, hashSetResults[0]); + finishLeftSemiRepeated(batch, joinResult, hashSetResults[0]); } else { /* @@ -291,6 +288,10 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi saveKeyOutput = currentKeyOutput; currentKeyOutput = temp; + /* + * Multi-key specific lookup key. + */ + byte[] keyBytes = saveKeyOutput.getData(); int keyLength = saveKeyOutput.getLength(); saveJoinResult = hashSet.contains(keyBytes, 0, keyLength, hashSetResults[hashSetResultCount]); @@ -360,15 +361,11 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashSetResults, 0, hashSetResultCount))); } - numSel = finishLeftSemi(batch, - allMatchs, allMatchCount, - spills, spillHashMapResultIndices, spillCount, + finishLeftSemi(batch, + allMatchCount, spillCount, (VectorMapJoinHashTableResult[]) hashSetResults); } - batch.selectedInUse = true; - batch.size = numSel; - if (batch.size > 0) { // Forward any remaining selected rows. forwardBigTableBatch(batch); http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java index 116cb81..9ff1141 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java @@ -142,9 +142,6 @@ public class VectorMapJoinLeftSemiStringOperator extends VectorMapJoinLeftSemiGe } } - // We rebuild in-place the selected array with rows destine to be forwarded. - int numSel = 0; - /* * Single-Column String specific declarations. */ @@ -187,7 +184,7 @@ public class VectorMapJoinLeftSemiStringOperator extends VectorMapJoinLeftSemiGe if (LOG.isDebugEnabled()) { LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); } - numSel = finishLeftSemiRepeated(batch, joinResult, hashSetResults[0]); + finishLeftSemiRepeated(batch, joinResult, hashSetResults[0]); } else { /* @@ -263,6 +260,10 @@ public class VectorMapJoinLeftSemiStringOperator extends VectorMapJoinLeftSemiGe saveKeyBatchIndex = batchIndex; + /* + * Single-Column String specific lookup key. + */ + byte[] keyBytes = vector[batchIndex]; int keyStart = start[batchIndex]; int keyLength = length[batchIndex]; @@ -333,15 +334,11 @@ public class VectorMapJoinLeftSemiStringOperator extends VectorMapJoinLeftSemiGe " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashSetResults, 0, hashSetResultCount))); } - numSel = finishLeftSemi(batch, - allMatchs, allMatchCount, - spills, spillHashMapResultIndices, spillCount, + finishLeftSemi(batch, + allMatchCount, spillCount, (VectorMapJoinHashTableResult[]) hashSetResults); } - batch.selectedInUse = true; - batch.size = numSel; - if (batch.size > 0) { // Forward any remaining selected rows. forwardBigTableBatch(batch); http://git-wip-us.apache.org/repos/asf/hive/blob/2b9f2f5e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java index 7ef5574..57814fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java @@ -70,15 +70,34 @@ public abstract class VectorMapJoinOuterGenerateResultOperator // generation. protected transient VectorMapJoinHashMapResult hashMapResults[]; - // Pre-allocated member for storing any matching row indexes during a processOp call. - protected transient int[] matchs; + // Pre-allocated member for remembering the big table's selected array at the beginning of + // the process method before applying any filter. For outer join we need to remember which + // rows did not match since they will appear the in outer join result with NULLs for the + // small table. + protected transient int[] inputSelected; - // Pre-allocated member for storing the mapping to the row batchIndex of the first of a series of - // equal keys that was looked up during a processOp call. - protected transient int[] matchHashMapResultIndices; + // Pre-allocated member for storing the (physical) batch index of matching row (single- or + // multi-small-table-valued) indexes during a process call. + protected transient int[] allMatchs; - // All matching and non-matching big table rows. - protected transient int[] nonSpills; + /* + * Pre-allocated members for storing information equal key series for small-table matches. + * + * ~HashMapResultIndices + * Index into the hashMapResults array for the match. + * ~AllMatchIndices + * (Logical) indices into allMatchs to the first row of a match of a + * possible series of duplicate keys. + * ~IsSingleValue + * Whether there is 1 or multiple small table values. + * ~DuplicateCounts + * The duplicate count for each matched key. + * + */ + protected transient int[] equalKeySeriesHashMapResultIndices; + protected transient int[] equalKeySeriesAllMatchIndices; + protected transient boolean[] equalKeySeriesIsSingleValue; + protected transient int[] equalKeySeriesDuplicateCounts; // Pre-allocated member for storing the (physical) batch index of rows that need to be spilled. protected transient int[] spills; @@ -86,8 +105,11 @@ public abstract class VectorMapJoinOuterGenerateResultOperator // Pre-allocated member for storing index into the hashSetResults for each spilled row. protected transient int[] spillHashMapResultIndices; - // Pre-allocated member for storing any non-matching row indexes during a processOp call. - protected transient int[] scratch1; + // Pre-allocated member for storing any non-spills, non-matches, or merged row indexes during a + // process method call. + protected transient int[] nonSpills; + protected transient int[] noMatchs; + protected transient int[] merged; public VectorMapJoinOuterGenerateResultOperator() { super(); @@ -111,12 +133,23 @@ public abstract class VectorMapJoinOuterGenerateResultOperator for (int i = 0; i < hashMapResults.length; i++) { hashMapResults[i] = baseHashMap.createHashMapResult(); } - matchs = new int[batch.DEFAULT_SIZE]; - matchHashMapResultIndices = new int[batch.DEFAULT_SIZE]; - nonSpills = new int[batch.DEFAULT_SIZE]; + + inputSelected = new int[batch.DEFAULT_SIZE]; + + allMatchs = new int[batch.DEFAULT_SIZE]; + + equalKeySeriesHashMapResultIndices = new int[batch.DEFAULT_SIZE]; + equalKeySeriesAllMatchIndices = new int[batch.DEFAULT_SIZE]; + equalKeySeriesIsSingleValue = new boolean[batch.DEFAULT_SIZE]; + equalKeySeriesDuplicateCounts = new int[batch.DEFAULT_SIZE]; + spills = new int[batch.DEFAULT_SIZE]; spillHashMapResultIndices = new int[batch.DEFAULT_SIZE]; - scratch1 = new int[batch.DEFAULT_SIZE]; + + nonSpills = new int[batch.DEFAULT_SIZE]; + noMatchs = new int[batch.DEFAULT_SIZE]; + merged = new int[batch.DEFAULT_SIZE]; + } //----------------------------------------------------------------------------------------------- @@ -145,260 +178,372 @@ public abstract class VectorMapJoinOuterGenerateResultOperator } /** - * Generate the outer join output results for one vectorized row batch. - * - * Any filter expressions will apply now since hash map lookup for outer join is complete. + * Apply the value expression to rows in the (original) input selected array. * * @param batch - * The big table batch with any matching and any non matching rows both as - * selected in use. - * @param matchs - * A subset of the rows of the batch that are matches. - * @param matchHashMapResultIndices - * For each entry in matches, the index into the hashMapResult. - * @param matchSize - * Number of matches in matchs. - * @param nonSpills - * The rows of the batch that are both matches and non-matches. - * @param nonspillCount - * Number of rows in nonSpills. - * @param spills - * A subset of the rows of the batch that are spills. - * @param spillHashMapResultIndices - * For each entry in spills, the index into the hashMapResult. - * @param spillCount - * Number of spills in spills. - * @param hashMapResults - * The array of all hash map results for the batch. - * @param hashMapResultCount - * Number of entries in hashMapResults. - * @param scratch1 - * Pre-allocated storage to internal use. + * The vectorized row batch. + * @param inputSelectedInUse + * Whether the (original) input batch is selectedInUse. + * @param inputLogicalSize + * The (original) input batch size. */ - public int finishOuter(VectorizedRowBatch batch, - int[] matchs, int[] matchHashMapResultIndices, int matchCount, - int[] nonSpills, int nonSpillCount, - int[] spills, int[] spillHashMapResultIndices, int spillCount, - VectorMapJoinHashMapResult[] hashMapResults, int hashMapResultCount, - int[] scratch1) throws IOException, HiveException { - - int numSel = 0; - - // At this point we have determined the matching rows only for the ON equality condition(s). - // Implicitly, non-matching rows are those in the selected array minus matchs. + private void doValueExprOnInputSelected(VectorizedRowBatch batch, + boolean inputSelectedInUse, int inputLogicalSize) { - // Next, for outer join, apply any ON predicates to filter down the matches. - if (matchCount > 0 && bigTableFilterExpressions.length > 0) { + int saveBatchSize = batch.size; + int[] saveSelected = batch.selected; + boolean saveSelectedInUse = batch.selectedInUse; - System.arraycopy(matchs, 0, batch.selected, 0, matchCount); - batch.size = matchCount; + batch.size = inputLogicalSize; + batch.selected = inputSelected; + batch.selectedInUse = inputSelectedInUse; - // Non matches will be removed from the selected array. - for (VectorExpression ve : bigTableFilterExpressions) { + if (bigTableValueExpressions != null) { + for(VectorExpression ve: bigTableValueExpressions) { ve.evaluate(batch); } + } - // LOG.info("finishOuter" + - // " filtered batch.selected " + Arrays.toString(Arrays.copyOfRange(batch.selected, 0, batch.size))); - - // Fixup the matchHashMapResultIndices array. - if (batch.size < matchCount) { - int numMatch = 0; - int[] selected = batch.selected; - for (int i = 0; i < batch.size; i++) { - if (selected[i] == matchs[numMatch]) { - matchHashMapResultIndices[numMatch] = matchHashMapResultIndices[i]; - numMatch++; - if (numMatch == matchCount) { - break; - } - } - } - System.arraycopy(batch.selected, 0, matchs, 0, matchCount); + batch.size = saveBatchSize; + batch.selected = saveSelected; + batch.selectedInUse = saveSelectedInUse; + } + + /** + * Apply the value expression to rows specified by a selected array. + * + * @param batch + * The vectorized row batch. + * @param selected + * The (physical) batch indices to apply the expression to. + * @param size + * The size of selected. + */ + private void doValueExpr(VectorizedRowBatch batch, + int[] selected, int size) { + + int saveBatchSize = batch.size; + int[] saveSelected = batch.selected; + boolean saveSelectedInUse = batch.selectedInUse; + + batch.size = size; + batch.selected = selected; + batch.selectedInUse = true; + + if (bigTableValueExpressions != null) { + for(VectorExpression ve: bigTableValueExpressions) { + ve.evaluate(batch); } } - // LOG.info("finishOuter" + - // " matchs[" + matchCount + "] " + intArrayToRangesString(matchs, matchCount) + - // " matchHashMapResultIndices " + Arrays.toString(Arrays.copyOfRange(matchHashMapResultIndices, 0, matchCount))); - // Big table value expressions apply to ALL matching and non-matching rows. - if (bigTableValueExpressions != null) { + batch.size = saveBatchSize; + batch.selected = saveSelected; + batch.selectedInUse = saveSelectedInUse; + } - System.arraycopy(nonSpills, 0, batch.selected, 0, nonSpillCount); - batch.size = nonSpillCount; + /** + * Remove (subtract) members from the input selected array and produce the results into + * a difference array. + * + * @param inputSelectedInUse + * Whether the (original) input batch is selectedInUse. + * @param inputLogicalSize + * The (original) input batch size. + * @param remove + * The indices to remove. They must all be present in input selected array. + * @param removeSize + * The size of remove. + * @param difference + * The resulting difference -- the input selected array indices not in the + * remove array. + * @return + * The resulting size of the difference array. + * @throws HiveException + */ + private int subtractFromInputSelected(boolean inputSelectedInUse, int inputLogicalSize, + int[] remove, int removeSize, int[] difference) throws HiveException { - for (VectorExpression ve: bigTableValueExpressions) { - ve.evaluate(batch); + // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { + // throw new HiveException("remove is not in sort order and unique"); + // } + + int differenceCount = 0; + + // Determine which rows are left. + int removeIndex = 0; + if (inputSelectedInUse) { + for (int i = 0; i < inputLogicalSize; i++) { + int candidateIndex = inputSelected[i]; + if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { + removeIndex++; + } else { + difference[differenceCount++] = candidateIndex; + } + } + } else { + for (int candidateIndex = 0; candidateIndex < inputLogicalSize; candidateIndex++) { + if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { + removeIndex++; + } else { + difference[differenceCount++] = candidateIndex; + } + } + } + + if (removeIndex != removeSize) { + throw new HiveException("Not all batch indices removed"); + } + + // if (!verifyMonotonicallyIncreasing(difference, differenceCount)) { + // throw new HiveException("difference is not in sort order and unique"); + // } + + return differenceCount; + } + + /** + * Remove (subtract) members from an array and produce the results into + * a difference array. + + * @param all + * The selected array containing all members. + * @param allSize + * The size of all. + * @param remove + * The indices to remove. They must all be present in input selected array. + * @param removeSize + * The size of remove. + * @param difference + * The resulting difference -- the all array indices not in the + * remove array. + * @return + * The resulting size of the difference array. + * @throws HiveException + */ + private int subtract(int[] all, int allSize, + int[] remove, int removeSize, int[] difference) throws HiveException { + + // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { + // throw new HiveException("remove is not in sort order and unique"); + // } + + int differenceCount = 0; + + // Determine which rows are left. + int removeIndex = 0; + for (int i = 0; i < allSize; i++) { + int candidateIndex = all[i]; + if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { + removeIndex++; + } else { + difference[differenceCount++] = candidateIndex; } } - // Determine which rows are non matches by determining the delta between selected and - // matchs. - int[] noMatchs = scratch1; - int noMatchCount = 0; - if (matchCount < nonSpillCount) { - // Determine which rows are non matches. - int matchIndex = 0; - for (int i = 0; i < nonSpillCount; i++) { - int candidateIndex = nonSpills[i]; - if (matchIndex < matchCount && candidateIndex == matchs[matchIndex]) { - matchIndex++; + if (removeIndex != removeSize) { + throw new HiveException("Not all batch indices removed"); + } + + return differenceCount; + } + + /** + * Sort merge two select arrays so the resulting array is ordered by (batch) index. + * + * @param selected1 + * @param selected1Count + * @param selected2 + * @param selected2Count + * @param sortMerged + * The resulting sort merge of selected1 and selected2. + * @return + * The resulting size of the sortMerged array. + * @throws HiveException + */ + private int sortMerge(int[] selected1, int selected1Count, + int[] selected2, int selected2Count, int[] sortMerged) throws HiveException { + + // if (!verifyMonotonicallyIncreasing(selected1, selected1Count)) { + // throw new HiveException("selected1 is not in sort order and unique"); + // } + + // if (!verifyMonotonicallyIncreasing(selected2, selected2Count)) { + // throw new HiveException("selected1 is not in sort order and unique"); + // } + + + int sortMergeCount = 0; + + int selected1Index = 0; + int selected2Index = 0; + for (int i = 0; i < selected1Count + selected2Count; i++) { + if (selected1Index < selected1Count && selected2Index < selected2Count) { + if (selected1[selected1Index] < selected2[selected2Index]) { + sortMerged[sortMergeCount++] = selected1[selected1Index++]; } else { - noMatchs[noMatchCount++] = candidateIndex; + sortMerged[sortMergeCount++] = selected2[selected2Index++]; } + } else if (selected1Index < selected1Count) { + sortMerged[sortMergeCount++] = selected1[selected1Index++]; + } else { + sortMerged[sortMergeCount++] = selected2[selected2Index++]; } } - // LOG.info("finishOuter" + - // " noMatchs[" + noMatchCount + "] " + intArrayToRangesString(noMatchs, noMatchCount)); + // if (!verifyMonotonicallyIncreasing(sortMerged, sortMergeCount)) { + // throw new HiveException("sortMerged is not in sort order and unique"); + // } - // When we generate results into the overflow batch, we may still end up with fewer rows - // in the big table batch. So, nulSel and the batch's selected array will be rebuilt with - // just the big table rows that need to be forwarded, minus any rows processed with the - // overflow batch. - if (matchCount > 0) { - numSel = generateOuterHashMapMatchResults(batch, - matchs, matchHashMapResultIndices, matchCount, - hashMapResults, numSel); - } + return sortMergeCount; + } - if (noMatchCount > 0) { - numSel = generateOuterHashMapNoMatchResults(batch, noMatchs, noMatchCount, numSel); - } + /** + * Generate the outer join output results for one vectorized row batch. + * + * @param batch + * The big table batch with any matching and any non matching rows both as + * selected in use. + * @param allMatchCount + * Number of matches in allMatchs. + * @param equalKeySeriesCount + * Number of single value matches. + * @param atLeastOneNonMatch + * Whether at least one row was a non-match. + * @param inputSelectedInUse + * A copy of the batch's selectedInUse flag on input to the process method. + * @param inputLogicalSize + * The batch's size on input to the process method. + * @param spillCount + * Number of spills in spills. + * @param hashMapResultCount + * Number of entries in hashMapResults. + */ + public void finishOuter(VectorizedRowBatch batch, + int allMatchCount, int equalKeySeriesCount, boolean atLeastOneNonMatch, + boolean inputSelectedInUse, int inputLogicalSize, + int spillCount, int hashMapResultCount) throws IOException, HiveException { + // Get rid of spills before we start modifying the batch. if (spillCount > 0) { spillHashMapBatch(batch, (VectorMapJoinHashTableResult[]) hashMapResults, spills, spillHashMapResultIndices, spillCount); } - return numSel; - } - - /** - * Generate the matching outer join output results for one row of a vectorized row batch into - * the overflow batch. - * - * @param batch - * The big table batch. - * @param batchIndex - * Index of the big table row. - * @param hashMapResult - * The hash map result with the small table values. - */ - private void copyOuterHashMapResultToOverflow(VectorizedRowBatch batch, int batchIndex, - VectorMapJoinHashMapResult hashMapResult) throws HiveException, IOException { - - // if (hashMapResult.isCappedCountAvailable()) { - // LOG.info("copyOuterHashMapResultToOverflow cappedCount " + hashMapResult.cappedCount()); - // } - ByteSegmentRef byteSegmentRef = hashMapResult.first(); - while (byteSegmentRef != null) { - - // Copy the BigTable values into the overflow batch. Since the overflow batch may - // not get flushed here, we must copy by value. - if (bigTableRetainedVectorCopy != null) { - bigTableRetainedVectorCopy.copyByValue(batch, batchIndex, - overflowBatch, overflowBatch.size); - } - - // Reference the keys we just copied above. - if (bigTableVectorCopyOuterKeys != null) { - bigTableVectorCopyOuterKeys.copyByReference(overflowBatch, overflowBatch.size, - overflowBatch, overflowBatch.size); - } - - if (smallTableVectorDeserializeRow != null) { - - byte[] bytes = byteSegmentRef.getBytes(); - int offset = (int) byteSegmentRef.getOffset(); - int length = byteSegmentRef.getLength(); - smallTableVectorDeserializeRow.setBytes(bytes, offset, length); - - smallTableVectorDeserializeRow.deserializeByValue(overflowBatch, overflowBatch.size); - } + int noMatchCount = 0; + if (spillCount > 0) { - ++overflowBatch.size; - if (overflowBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { - forwardOverflow(); - } + // Subtract the spills to get all match and non-match rows. + int nonSpillCount = subtractFromInputSelected( + inputSelectedInUse, inputLogicalSize, spills, spillCount, nonSpills); - byteSegmentRef = hashMapResult.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter spillCount > 0" + + " nonSpills " + intArrayToRangesString(nonSpills, nonSpillCount)); + } + + // Big table value expressions apply to ALL matching and non-matching rows. + if (bigTableValueExpressions != null) { + + doValueExpr(batch, nonSpills, nonSpillCount); + } - // LOG.info("copyOuterHashMapResultToOverflow overflowBatch.size " + overflowBatch.size); + + if (atLeastOneNonMatch) { + noMatchCount = subtract(nonSpills, nonSpillCount, allMatchs, allMatchCount, + noMatchs); + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter spillCount > 0" + + " noMatchs " + intArrayToRangesString(noMatchs, noMatchCount)); + } - } + } + } else { - /** - * Generate the matching outer join output results for one vectorized row batch. - * - * For each matching row specified by parameter, get the one or more small table values and - * form join results. - * - * (Note: Since all matching and non-matching rows are selected and output for outer joins, - * we cannot use selected as the matching rows). - * - * @param batch - * The big table batch with any matching and any non matching rows both as - * selected in use. - * @param matchs - * A subset of the rows of the batch that are matches. - * @param matchHashMapResultIndices - * For each entry in matches, the index into the hashMapResult. - * @param matchSize - * Number of matches in matchs. - * @param hashMapResults - * The array of all hash map results for the batch. - * @param numSel - * The current count of rows in the rebuilding of the selected array. - * - * @return - * The new count of selected rows. - */ - protected int generateOuterHashMapMatchResults(VectorizedRowBatch batch, - int[] matchs, int[] matchHashMapResultIndices, int matchSize, - VectorMapJoinHashMapResult[] hashMapResults, int numSel) - throws IOException, HiveException { + // Run value expressions over original (whole) input batch. + doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize); - int[] selected = batch.selected; + if (atLeastOneNonMatch) { + noMatchCount = subtractFromInputSelected( + inputSelectedInUse, inputLogicalSize, allMatchs, allMatchCount, noMatchs); - // Generate result within big table batch when single small table value. Otherwise, copy - // to overflow batch. + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter spillCount == 0" + + " noMatchs " + intArrayToRangesString(noMatchs, noMatchCount)); + } + } + } - for (int i = 0; i < matchSize; i++) { - int batchIndex = matchs[i]; + // When we generate results into the overflow batch, we may still end up with fewer rows + // in the big table batch. So, nulSel and the batch's selected array will be rebuilt with + // just the big table rows that need to be forwarded, minus any rows processed with the + // overflow batch. + if (allMatchCount > 0) { + + int numSel = 0; + for (int i = 0; i < equalKeySeriesCount; i++) { + int hashMapResultIndex = equalKeySeriesHashMapResultIndices[i]; + VectorMapJoinHashMapResult hashMapResult = hashMapResults[hashMapResultIndex]; + int allMatchesIndex = equalKeySeriesAllMatchIndices[i]; + boolean isSingleValue = equalKeySeriesIsSingleValue[i]; + int duplicateCount = equalKeySeriesDuplicateCounts[i]; + + if (isSingleValue) { + numSel = generateHashMapResultSingleValue( + batch, hashMapResult, allMatchs, allMatchesIndex, duplicateCount, numSel); + } else { + generateHashMapResultMultiValue( + batch, hashMapResult, allMatchs, allMatchesIndex, duplicateCount); + } + } - int hashMapResultIndex = matchHashMapResultIndices[i]; - VectorMapJoinHashMapResult hashMapResult = hashMapResults[hashMapResultIndex]; + // The number of single value rows that were generated in the big table batch. + batch.size = numSel; + batch.selectedInUse = true; - if (!hashMapResult.isSingleRow()) { + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter allMatchCount > 0" + + " batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } - // Multiple small table rows require use of the overflow batch. - copyOuterHashMapResultToOverflow(batch, batchIndex, hashMapResult); - } else { + } else { + batch.size = 0; + } - // Generate join result in big table batch. - ByteSegmentRef byteSegmentRef = hashMapResult.first(); + if (noMatchCount > 0) { + if (batch.size > 0) { + + generateOuterNulls(batch, noMatchs, noMatchCount); + + // Merge noMatchs and (match) selected. + int mergeCount = sortMerge( + noMatchs, noMatchCount, batch.selected, batch.size, merged); + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter noMatchCount > 0 && batch.size > 0" + + " merged " + intArrayToRangesString(merged, mergeCount)); + } - if (bigTableVectorCopyOuterKeys != null) { - bigTableVectorCopyOuterKeys.copyByReference(batch, batchIndex, batch, batchIndex); - } + System.arraycopy(merged, 0, batch.selected, 0, mergeCount); + batch.size = mergeCount; + batch.selectedInUse = true; + } else { - if (smallTableVectorDeserializeRow != null) { + // We can use the whole batch for output of no matches. - byte[] bytes = byteSegmentRef.getBytes(); - int offset = (int) byteSegmentRef.getOffset(); - int length = byteSegmentRef.getLength(); - smallTableVectorDeserializeRow.setBytes(bytes, offset, length); + generateOuterNullsRepeatedAll(batch); - smallTableVectorDeserializeRow.deserializeByValue(batch, batchIndex); - } + System.arraycopy(noMatchs, 0, batch.selected, 0, noMatchCount); + batch.size = noMatchCount; + batch.selectedInUse = true; - // Remember this big table row was used for an output result. - selected[numSel++] = batchIndex; - } - } - return numSel; - } + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter noMatchCount > 0 && batch.size == 0" + + " batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } + } + } + } /** * Generate the non matching outer join output results for one vectorized row batch. @@ -412,72 +557,30 @@ public abstract class VectorMapJoinOuterGenerateResultOperator * A subset of the rows of the batch that are non matches. * @param noMatchSize * Number of non matches in noMatchs. - * @param numSel - * The current count of rows in the rebuilding of the selected array. - * - * @return - * The new count of selected rows. */ - protected int generateOuterHashMapNoMatchResults(VectorizedRowBatch batch, int[] noMatchs, - int noMatchSize, int numSel) throws IOException, HiveException { - int[] selected = batch.selected; - - // Generate result within big table batch with null small table results, using isRepeated - // if possible. + protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs, + int noMatchSize) throws IOException, HiveException { - if (numSel == 0) { + // Set null information in the small table results area. - // There were 0 matching rows -- so we can use the isRepeated optimization for the non - // matching rows. + for (int i = 0; i < noMatchSize; i++) { + int batchIndex = noMatchs[i]; // Mark any scratch small table scratch columns that would normally receive a copy of the - // key as null and repeating. + // key as null, too. for (int column : bigTableOuterKeyOutputVectorColumns) { ColumnVector colVector = batch.cols[column]; - colVector.isRepeating = true; colVector.noNulls = false; - colVector.isNull[0] = true; + colVector.isNull[batchIndex] = true; } - // Small table values are set to null and repeating. + // Small table values are set to null. for (int column : smallTableOutputVectorColumns) { ColumnVector colVector = batch.cols[column]; - colVector.isRepeating = true; colVector.noNulls = false; - colVector.isNull[0] = true; - } - - // Rebuild the selected array. - for (int i = 0; i < noMatchSize; i++) { - int batchIndex = noMatchs[i]; - selected[numSel++] = batchIndex; - } - } else { - - // Set null information in the small table results area. - - for (int i = 0; i < noMatchSize; i++) { - int batchIndex = noMatchs[i]; - - // Mark any scratch small table scratch columns that would normally receive a copy of the - // key as null, too. - for (int column : bigTableOuterKeyOutputVectorColumns) { - ColumnVector colVector = batch.cols[column]; - colVector.noNulls = false; - colVector.isNull[batchIndex] = true; - } - - // Small table values are set to null. - for (int column : smallTableOutputVectorColumns) { - ColumnVector colVector = batch.cols[column]; - colVector.noNulls = false; - colVector.isNull[batchIndex] = true; - } - - selected[numSel++] = batchIndex; + colVector.isNull[batchIndex] = true; } } - return numSel; } /** @@ -492,65 +595,114 @@ public abstract class VectorMapJoinOuterGenerateResultOperator * The hash map lookup result for the repeated key. * @param hashMapResults * The array of all hash map results for the batch. + * @param someRowsFilteredOut + * Whether some rows of the repeated key batch were knocked out by the filter. + * @param inputSelectedInUse + * A copy of the batch's selectedInUse flag on input to the process method. + * @param inputLogicalSize + * The batch's size on input to the process method. * @param scratch1 * Pre-allocated storage to internal use. + * @param scratch2 + * Pre-allocated storage to internal use. */ - public int finishOuterRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult, - VectorMapJoinHashMapResult hashMapResult, int[] scratch1) + public void finishOuterRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult, + VectorMapJoinHashMapResult hashMapResult, boolean someRowsFilteredOut, + boolean inputSelectedInUse, int inputLogicalSize) throws IOException, HiveException { - int numSel = 0; + // LOG.debug("finishOuterRepeated batch #" + batchCounter + " " + joinResult.name() + " batch.size " + batch.size + " someRowsFilteredOut " + someRowsFilteredOut); - if (joinResult == JoinUtil.JoinResult.MATCH && bigTableFilterExpressions.length > 0) { + switch (joinResult) { + case MATCH: - // Since it is repeated, the evaluation of the filter will knock the whole batch out. - // But since we are doing outer join, we want to keep non-matches. + // Rows we looked up as one repeated key are a match. But filtered out rows + // need to be generated as non-matches, too. - // First, remember selected; - int[] rememberSelected = scratch1; - int rememberBatchSize = batch.size; - if (batch.selectedInUse) { - System.arraycopy(batch.selected, 0, rememberSelected, 0, batch.size); - } + if (someRowsFilteredOut) { - // Filter. - for (VectorExpression ve : bigTableFilterExpressions) { - ve.evaluate(batch); - } + // For the filtered out rows that didn't (logically) get looked up in the hash table, + // we need to generate no match results for those too... - // Convert a filter out to a non match. - if (batch.size == 0) { - joinResult = JoinUtil.JoinResult.NOMATCH; - if (batch.selectedInUse) { - System.arraycopy(rememberSelected, 0, batch.selected, 0, rememberBatchSize); - // LOG.info("finishOuterRepeated batch #" + batchCounter + " filter out converted to no matchs " + - // Arrays.toString(Arrays.copyOfRange(batch.selected, 0, rememberBatchSize))); - } else { - // LOG.info("finishOuterRepeated batch #" + batchCounter + " filter out converted to no matchs batch size " + - // rememberBatchSize); - } - batch.size = rememberBatchSize; - } - } + // Run value expressions over original (whole) input batch. + doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize); - // LOG.info("finishOuterRepeated batch #" + batchCounter + " " + joinResult.name() + " batch.size " + batch.size); - switch (joinResult) { - case MATCH: - // Run our value expressions over whole batch. - if (bigTableValueExpressions != null) { - for(VectorExpression ve: bigTableValueExpressions) { - ve.evaluate(batch); + // Now calculate which rows were filtered out (they are logically no matches). + + // Determine which rows are non matches by determining the delta between inputSelected and + // (current) batch selected. + + int noMatchCount = subtractFromInputSelected( + inputSelectedInUse, inputLogicalSize, batch.selected, batch.size, noMatchs); + + generateOuterNulls(batch, noMatchs, noMatchCount); + + // Now generate the matchs. Single small table values will be put into the big table + // batch and come back in matchs. Any multiple small table value results will go into + // the overflow batch. + generateHashMapResultRepeatedAll(batch, hashMapResult); + + // Merge noMatchs and (match) selected. + int mergeCount = sortMerge( + noMatchs, noMatchCount, batch.selected, batch.size, merged); + + System.arraycopy(merged, 0, batch.selected, 0, mergeCount); + batch.size = mergeCount; + batch.selectedInUse = true; + } else { + + // Just run our value expressions over input batch. + + if (bigTableValueExpressions != null) { + for(VectorExpression ve: bigTableValueExpressions) { + ve.evaluate(batch); + } } - } - // Use a common method applicable for inner and outer. - numSel = generateHashMapResultRepeatedAll(batch, hashMapResult); + generateHashMapResultRepeatedAll(batch, hashMapResult); + } break; + case SPILL: - // Whole batch is spilled. + + // Rows we looked up as one repeated key need to spill. But filtered out rows + // need to be generated as non-matches, too. + spillBatchRepeated(batch, (VectorMapJoinHashTableResult) hashMapResult); + + // After using selected to generate spills, generate non-matches, if any. + if (someRowsFilteredOut) { + + // Determine which rows are non matches by determining the delta between inputSelected and + // (current) batch selected. + + int noMatchCount = subtractFromInputSelected( + inputSelectedInUse, inputLogicalSize, batch.selected, batch.size, noMatchs); + + System.arraycopy(noMatchs, 0, batch.selected, 0, noMatchCount); + batch.size = noMatchCount; + batch.selectedInUse = true; + + generateOuterNullsRepeatedAll(batch); + } else { + batch.size = 0; + } + break; + case NOMATCH: + + if (someRowsFilteredOut) { + + // When the repeated no match is due to filtering, we need to restore the + // selected information. + + if (inputSelectedInUse) { + System.arraycopy(inputSelected, 0, batch.selected, 0, inputLogicalSize); + } + batch.size = inputLogicalSize; + } + // Run our value expressions over whole batch. if (bigTableValueExpressions != null) { for(VectorExpression ve: bigTableValueExpressions) { @@ -558,11 +710,9 @@ public abstract class VectorMapJoinOuterGenerateResultOperator } } - numSel = generateOuterNullsRepeatedAll(batch); + generateOuterNullsRepeatedAll(batch); break; } - - return numSel; } /** @@ -573,24 +723,8 @@ public abstract class VectorMapJoinOuterGenerateResultOperator * * @param batch * The big table batch. - * @return - * The new count of selected rows. */ - protected int generateOuterNullsRepeatedAll(VectorizedRowBatch batch) throws HiveException { - - int[] selected = batch.selected; - boolean selectedInUse = batch.selectedInUse; - - // Generate result within big table batch using is repeated for null small table results. - - if (batch.selectedInUse) { - // The selected array is already filled in as we want it. - } else { - for (int i = 0; i < batch.size; i++) { - selected[i] = i; - } - batch.selectedInUse = true; - } + protected void generateOuterNullsRepeatedAll(VectorizedRowBatch batch) throws HiveException { for (int column : smallTableOutputVectorColumns) { ColumnVector colVector = batch.cols[column]; @@ -607,12 +741,5 @@ public abstract class VectorMapJoinOuterGenerateResultOperator colVector.isNull[0] = true; colVector.isRepeating = true; } - - // for (int i = 0; i < batch.size; i++) { - // int bigTableIndex = selected[i]; - // VectorizedBatchUtil.debugDisplayOneRow(batch, bigTableIndex, taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator generate generateOuterNullsRepeatedAll batch"); - // } - - return batch.size; } } \ No newline at end of file