HIVE-15573: Vectorization: Non-Uniform shuffle ReduceSink is not specialized (Matt McCline, reviewed by Gopal Vijayaraghavan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/27f27219 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/27f27219 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/27f27219 Branch: refs/heads/master Commit: 27f27219a2b965958f850a92bf581d7b9c3ddfb0 Parents: cd4fcef Author: Matt McCline <[email protected]> Authored: Mon Mar 20 01:03:12 2017 -0500 Committer: Matt McCline <[email protected]> Committed: Mon Mar 20 01:03:12 2017 -0500 ---------------------------------------------------------------------- .../hive/ql/exec/vector/VectorExtractRow.java | 19 + .../VectorReduceSinkCommonOperator.java | 173 ++--- .../VectorReduceSinkLongOperator.java | 2 +- .../VectorReduceSinkMultiKeyOperator.java | 2 +- .../VectorReduceSinkObjectHashOperator.java | 289 ++++++++ .../VectorReduceSinkStringOperator.java | 2 +- .../VectorReduceSinkUniformHashOperator.java | 218 ++++++ .../hive/ql/optimizer/physical/Vectorizer.java | 214 ++++-- .../hadoop/hive/ql/plan/ReduceSinkDesc.java | 15 +- .../hive/ql/plan/VectorReduceSinkDesc.java | 21 +- .../hive/ql/plan/VectorReduceSinkInfo.java | 96 +++ .../llap/dynpart_sort_opt_vectorization.q.out | 42 +- .../llap/vector_adaptor_usage_mode.q.out | 2 +- .../llap/vector_auto_smb_mapjoin_14.q.out | 15 +- .../llap/vector_between_columns.q.out | 18 +- .../clientpositive/llap/vector_between_in.q.out | 102 ++- .../llap/vector_binary_join_groupby.q.out | 13 +- .../clientpositive/llap/vector_char_2.q.out | 12 +- .../llap/vector_char_mapjoin1.q.out | 33 +- .../llap/vector_char_simple.q.out | 4 +- .../clientpositive/llap/vector_coalesce.q.out | 20 +- .../clientpositive/llap/vector_coalesce_2.q.out | 2 +- .../clientpositive/llap/vector_count.q.out | 16 +- .../llap/vector_count_distinct.q.out | 9 +- .../clientpositive/llap/vector_data_types.q.out | 4 +- .../llap/vector_decimal_aggregate.q.out | 2 +- .../llap/vector_decimal_expressions.q.out | 4 +- .../llap/vector_decimal_mapjoin.q.out | 2 +- .../llap/vector_decimal_round.q.out | 19 +- .../llap/vector_decimal_round_2.q.out | 36 +- .../clientpositive/llap/vector_distinct_2.q.out | 2 +- .../llap/vector_empty_where.q.out | 36 +- .../clientpositive/llap/vector_groupby4.q.out | 11 +- .../clientpositive/llap/vector_groupby6.q.out | 11 +- .../clientpositive/llap/vector_groupby_3.q.out | 2 +- .../llap/vector_groupby_grouping_id3.q.out | 4 +- .../llap/vector_groupby_mapjoin.q.out | 9 +- .../llap/vector_groupby_reduce.q.out | 34 +- .../llap/vector_grouping_sets.q.out | 4 +- .../clientpositive/llap/vector_if_expr.q.out | 9 +- .../llap/vector_include_no_sel.q.out | 8 +- .../clientpositive/llap/vector_inner_join.q.out | 18 +- .../clientpositive/llap/vector_interval_1.q.out | 72 +- .../clientpositive/llap/vector_interval_2.q.out | 90 +-- .../llap/vector_interval_arithmetic.q.out | 58 +- .../llap/vector_interval_mapjoin.q.out | 2 +- .../clientpositive/llap/vector_join30.q.out | 251 +++---- .../llap/vector_left_outer_join2.q.out | 8 +- .../llap/vector_leftsemi_mapjoin.q.out | 724 +++++++++---------- .../llap/vector_non_string_partition.q.out | 8 +- .../llap/vector_nullsafe_join.q.out | 36 +- .../llap/vector_number_compare_projection.q.out | 18 +- .../clientpositive/llap/vector_orderby_5.q.out | 9 +- .../llap/vector_outer_join0.q.out | 4 +- .../llap/vector_outer_join1.q.out | 15 +- .../llap/vector_outer_join2.q.out | 11 +- .../llap/vector_partition_diff_num_cols.q.out | 35 +- .../llap/vector_partitioned_date_time.q.out | 30 +- .../clientpositive/llap/vector_reduce1.q.out | 9 +- .../clientpositive/llap/vector_reduce2.q.out | 9 +- .../clientpositive/llap/vector_reduce3.q.out | 9 +- .../llap/vector_reduce_groupby_decimal.q.out | 6 +- .../llap/vector_string_concat.q.out | 6 +- .../llap/vector_tablesample_rows.q.out | 7 +- .../llap/vector_varchar_mapjoin1.q.out | 6 +- .../llap/vector_varchar_simple.q.out | 4 +- .../llap/vector_when_case_null.q.out | 2 +- .../clientpositive/llap/vectorization_0.q.out | 84 +-- .../clientpositive/llap/vectorization_17.q.out | 2 +- .../clientpositive/llap/vectorization_7.q.out | 8 +- .../clientpositive/llap/vectorization_8.q.out | 8 +- .../llap/vectorization_div0.q.out | 8 +- .../llap/vectorization_limit.q.out | 19 +- .../llap/vectorization_offset_limit.q.out | 4 +- .../llap/vectorization_short_regress.q.out | 72 +- .../llap/vectorized_bucketmapjoin1.q.out | 8 +- .../clientpositive/llap/vectorized_case.q.out | 14 +- .../llap/vectorized_date_funcs.q.out | 14 +- .../vectorized_dynamic_partition_pruning.q.out | 2 +- .../vectorized_dynamic_semijoin_reduction.q.out | 138 ++-- .../llap/vectorized_mapjoin.q.out | 2 +- .../llap/vectorized_mapjoin2.q.out | 9 +- .../clientpositive/llap/vectorized_ptf.q.out | 26 +- .../llap/vectorized_shufflejoin.q.out | 4 +- .../llap/vectorized_timestamp_funcs.q.out | 43 +- .../spark/vector_between_in.q.out | 122 ++-- .../spark/vector_count_distinct.q.out | 14 +- .../spark/vector_data_types.q.out | 4 +- .../spark/vector_decimal_aggregate.q.out | 7 +- .../spark/vector_distinct_2.q.out | 7 +- .../clientpositive/spark/vector_groupby_3.q.out | 7 +- .../clientpositive/spark/vector_orderby_5.q.out | 14 +- .../spark/vector_outer_join1.q.out | 7 +- .../spark/vector_outer_join2.q.out | 7 +- .../spark/vector_string_concat.q.out | 8 +- .../clientpositive/spark/vectorization_0.q.out | 84 +-- .../clientpositive/spark/vectorization_17.q.out | 2 +- .../spark/vectorization_div0.q.out | 8 +- .../spark/vectorization_short_regress.q.out | 72 +- .../clientpositive/spark/vectorized_case.q.out | 14 +- .../clientpositive/spark/vectorized_ptf.q.out | 48 +- .../spark/vectorized_shufflejoin.q.out | 18 +- .../spark/vectorized_timestamp_funcs.q.out | 43 +- .../clientpositive/tez/vector_aggregate_9.q.out | 4 - .../tez/vector_auto_smb_mapjoin_14.q.out | 15 +- .../tez/vector_between_columns.q.out | 18 +- .../clientpositive/tez/vector_between_in.q.out | 102 ++- .../tez/vector_binary_join_groupby.q.out | 13 +- .../tez/vector_cast_constant.q.out | 4 - .../clientpositive/tez/vector_char_2.q.out | 12 +- .../tez/vector_char_mapjoin1.q.out | 33 +- .../clientpositive/tez/vector_char_simple.q.out | 4 +- .../clientpositive/tez/vector_coalesce.q.out | 20 +- .../clientpositive/tez/vector_coalesce_2.q.out | 2 +- .../tez/vector_non_string_partition.q.out | 8 +- .../clientpositive/tez/vectorization_div0.q.out | 8 +- .../tez/vectorization_limit.q.out | 19 +- .../vector_binary_join_groupby.q.out | 8 +- .../results/clientpositive/vector_char_2.q.out | 8 +- .../clientpositive/vector_char_mapjoin1.q.out | 12 +- .../clientpositive/vector_char_simple.q.out | 4 +- .../clientpositive/vector_coalesce.q.out | 20 +- .../clientpositive/vector_coalesce_2.q.out | 4 +- .../results/clientpositive/vector_count.q.out | 16 +- .../clientpositive/vector_data_types.q.out | 4 +- .../vector_decimal_aggregate.q.out | 4 +- .../vector_decimal_expressions.q.out | 4 +- .../clientpositive/vector_decimal_round.q.out | 8 +- .../clientpositive/vector_decimal_round_2.q.out | 16 +- .../clientpositive/vector_distinct_2.q.out | 4 +- .../clientpositive/vector_empty_where.q.out | 16 +- .../clientpositive/vector_groupby4.q.out | 4 +- .../clientpositive/vector_groupby6.q.out | 4 +- .../clientpositive/vector_groupby_3.q.out | 4 +- .../clientpositive/vector_groupby_reduce.q.out | 16 +- .../clientpositive/vector_grouping_sets.q.out | 8 +- .../results/clientpositive/vector_if_expr.q.out | 4 +- .../clientpositive/vector_include_no_sel.q.out | 4 +- .../clientpositive/vector_interval_1.q.out | 32 +- .../vector_interval_arithmetic.q.out | 28 +- .../vector_non_string_partition.q.out | 8 +- .../clientpositive/vector_orderby_5.q.out | 4 +- .../clientpositive/vector_outer_join1.q.out | 4 +- .../clientpositive/vector_outer_join2.q.out | 4 +- .../clientpositive/vector_outer_join3.q.out | 6 +- .../clientpositive/vector_outer_join4.q.out | 2 +- .../results/clientpositive/vector_reduce1.q.out | 4 +- .../results/clientpositive/vector_reduce2.q.out | 4 +- .../results/clientpositive/vector_reduce3.q.out | 4 +- .../vector_reduce_groupby_decimal.q.out | 4 +- .../clientpositive/vector_string_concat.q.out | 4 +- .../clientpositive/vector_varchar_simple.q.out | 4 +- .../clientpositive/vector_when_case_null.q.out | 4 +- .../clientpositive/vectorization_7.q.out | 8 +- .../clientpositive/vectorization_8.q.out | 8 +- .../clientpositive/vectorization_div0.q.out | 8 +- .../clientpositive/vectorization_limit.q.out | 16 +- .../vectorization_offset_limit.q.out | 4 +- .../clientpositive/vectorized_case.q.out | 8 +- .../clientpositive/vectorized_date_funcs.q.out | 4 +- .../clientpositive/vectorized_mapjoin2.q.out | 4 +- .../clientpositive/vectorized_timestamp.q.out | 4 +- .../vectorized_timestamp_funcs.q.out | 20 +- 163 files changed, 2469 insertions(+), 2099 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java index 94eaf56..defaf90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java @@ -150,6 +150,25 @@ public class VectorExtractRow { } /* + * Initialize using an ObjectInspector array and a column projection array. + */ + public void init(TypeInfo[] typeInfos, int[] projectedColumns) + throws HiveException { + + final int count = typeInfos.length; + allocateArrays(count); + + for (int i = 0; i < count; i++) { + + int projectionColumnNum = projectedColumns[i]; + + TypeInfo typeInfo = typeInfos[i]; + + initEntry(i, projectionColumnNum, typeInfo); + } + } + + /* * Initialize using data type names. * No projection -- the column range 0 .. types.size()-1 */ http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java index 42ca4b7..fc5aea5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java @@ -102,36 +102,29 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re //--------------------------------------------------------------------------- // Whether there is to be a tag added to the end of each key and the tag value. - private transient boolean reduceSkipTag; - private transient byte reduceTagByte; + protected transient boolean reduceSkipTag; + protected transient byte reduceTagByte; // Binary sortable key serializer. protected transient BinarySortableSerializeWrite keyBinarySortableSerializeWrite; - // The serialized all null key and its hash code. - private transient byte[] nullBytes; - private transient int nullKeyHashCode; - // Lazy binary value serializer. - private transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite; + protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite; // This helper object serializes LazyBinary format reducer values from columns of a row // in a vectorized row batch. - private transient VectorSerializeRow<LazyBinarySerializeWrite> valueVectorSerializeRow; + protected transient VectorSerializeRow<LazyBinarySerializeWrite> valueVectorSerializeRow; // The output buffer used to serialize a value into. - private transient Output valueOutput; + protected transient Output valueOutput; // The hive key and bytes writable value needed to pass the key and value to the collector. - private transient HiveKey keyWritable; - private transient BytesWritable valueBytesWritable; + protected transient HiveKey keyWritable; + protected transient BytesWritable valueBytesWritable; // Where to write our key and value pairs. private transient OutputCollector out; - // The object that determines equal key series. - protected transient VectorKeySeriesSerialized serializedKeySeries; - private transient long numRows = 0; private transient long cntr = 1; private transient long logEveryNRows = 0; @@ -158,6 +151,8 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re VectorizationContext vContext, OperatorDesc conf) throws HiveException { this(ctx); + LOG.info("VectorReduceSinkCommonOperator constructor"); + ReduceSinkDesc desc = (ReduceSinkDesc) conf; this.conf = desc; vectorDesc = (VectorReduceSinkDesc) desc.getVectorDesc(); @@ -247,6 +242,46 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); + if (isLogDebugEnabled) { + LOG.debug("useUniformHash " + vectorReduceSinkInfo.getUseUniformHash()); + + LOG.debug("reduceSinkKeyColumnMap " + + (vectorReduceSinkInfo.getReduceSinkKeyColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyColumnMap()))); + LOG.debug("reduceSinkKeyTypeInfos " + + (vectorReduceSinkInfo.getReduceSinkKeyTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyTypeInfos()))); + LOG.debug("reduceSinkKeyColumnVectorTypes " + + (vectorReduceSinkInfo.getReduceSinkKeyColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyColumnVectorTypes()))); + LOG.debug("reduceSinkKeyExpressions " + + (vectorReduceSinkInfo.getReduceSinkKeyExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyExpressions()))); + + LOG.debug("reduceSinkValueColumnMap " + + (vectorReduceSinkInfo.getReduceSinkValueColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueColumnMap()))); + LOG.debug("reduceSinkValueTypeInfos " + + (vectorReduceSinkInfo.getReduceSinkValueTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueTypeInfos()))); + LOG.debug("reduceSinkValueColumnVectorTypes " + + (vectorReduceSinkInfo.getReduceSinkValueColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueColumnVectorTypes()))); + LOG.debug("reduceSinkValueExpressions " + + (vectorReduceSinkInfo.getReduceSinkValueExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueExpressions()))); + + LOG.debug("reduceSinkBucketColumnMap " + + (vectorReduceSinkInfo.getReduceSinkBucketColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketColumnMap()))); + LOG.debug("reduceSinkBucketTypeInfos " + + (vectorReduceSinkInfo.getReduceSinkBucketTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketTypeInfos()))); + LOG.debug("reduceSinkBucketColumnVectorTypes " + + (vectorReduceSinkInfo.getReduceSinkBucketColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketColumnVectorTypes()))); + LOG.debug("reduceSinkBucketExpressions " + + (vectorReduceSinkInfo.getReduceSinkBucketExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketExpressions()))); + + LOG.debug("reduceSinkPartitionColumnMap " + + (vectorReduceSinkInfo.getReduceSinkPartitionColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionColumnMap()))); + LOG.debug("reduceSinkPartitionTypeInfos " + + (vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos()))); + LOG.debug("reduceSinkPartitionColumnVectorTypes " + + (vectorReduceSinkInfo.getReduceSinkPartitionColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionColumnVectorTypes()))); + LOG.debug("reduceSinkPartitionExpressions " + + (vectorReduceSinkInfo.getReduceSinkPartitionExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionExpressions()))); + } + if (LOG.isDebugEnabled()) { // Determine the name of our map or reduce task for debug tracing. BaseWork work = Utilities.getMapWork(hconf); @@ -280,21 +315,6 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re keyBinarySortableSerializeWrite = new BinarySortableSerializeWrite(columnSortOrder, columnNullMarker, columnNotNullMarker); - // Create all nulls key. - try { - Output nullKeyOutput = new Output(); - keyBinarySortableSerializeWrite.set(nullKeyOutput); - for (int i = 0; i < reduceSinkKeyColumnMap.length; i++) { - keyBinarySortableSerializeWrite.writeNull(); - } - int nullBytesLength = nullKeyOutput.getLength(); - nullBytes = new byte[nullBytesLength]; - System.arraycopy(nullKeyOutput.getData(), 0, nullBytes, 0, nullBytesLength); - nullKeyHashCode = HashCodeUtil.calculateBytesHashCode(nullBytes, 0, nullBytesLength); - } catch (Exception e) { - throw new HiveException(e); - } - valueLazyBinarySerializeWrite = new LazyBinarySerializeWrite(reduceSinkValueColumnMap.length); valueVectorSerializeRow = @@ -312,101 +332,6 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re batchCounter = 0; } - @Override - public void process(Object row, int tag) throws HiveException { - - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - - batchCounter++; - - if (batch.size == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } - return; - } - - // Perform any key expressions. Results will go into scratch columns. - if (reduceSinkKeyExpressions != null) { - for (VectorExpression ve : reduceSinkKeyExpressions) { - ve.evaluate(batch); - } - } - - // Perform any value expressions. Results will go into scratch columns. - if (reduceSinkValueExpressions != null) { - for (VectorExpression ve : reduceSinkValueExpressions) { - ve.evaluate(batch); - } - } - - serializedKeySeries.processBatch(batch); - - boolean selectedInUse = batch.selectedInUse; - int[] selected = batch.selected; - - int keyLength; - int logical; - int end; - int batchIndex; - do { - if (serializedKeySeries.getCurrentIsAllNull()) { - - // Use the same logic as ReduceSinkOperator.toHiveKey. - // - if (tag == -1 || reduceSkipTag) { - keyWritable.set(nullBytes, 0, nullBytes.length); - } else { - keyWritable.setSize(nullBytes.length + 1); - System.arraycopy(nullBytes, 0, keyWritable.get(), 0, nullBytes.length); - keyWritable.get()[nullBytes.length] = reduceTagByte; - } - keyWritable.setDistKeyLength(nullBytes.length); - keyWritable.setHashCode(nullKeyHashCode); - - } else { - - // One serialized key for 1 or more rows for the duplicate keys. - // LOG.info("reduceSkipTag " + reduceSkipTag + " tag " + tag + " reduceTagByte " + (int) reduceTagByte + " keyLength " + serializedKeySeries.getSerializedLength()); - // LOG.info("process offset " + serializedKeySeries.getSerializedStart() + " length " + serializedKeySeries.getSerializedLength()); - keyLength = serializedKeySeries.getSerializedLength(); - if (tag == -1 || reduceSkipTag) { - keyWritable.set(serializedKeySeries.getSerializedBytes(), - serializedKeySeries.getSerializedStart(), keyLength); - } else { - keyWritable.setSize(keyLength + 1); - System.arraycopy(serializedKeySeries.getSerializedBytes(), - serializedKeySeries.getSerializedStart(), keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = reduceTagByte; - } - keyWritable.setDistKeyLength(keyLength); - keyWritable.setHashCode(serializedKeySeries.getCurrentHashCode()); - } - - logical = serializedKeySeries.getCurrentLogical(); - end = logical + serializedKeySeries.getCurrentDuplicateCount(); - do { - batchIndex = (selectedInUse ? selected[logical] : logical); - - valueLazyBinarySerializeWrite.reset(); - valueVectorSerializeRow.serializeWrite(batch, batchIndex); - - valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength()); - - collect(keyWritable, valueBytesWritable); - } while (++logical < end); - - if (!serializedKeySeries.next()) { - break; - } - } while (true); - - } catch (Exception e) { - throw new HiveException(e); - } - } - protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException { // Since this is a terminal operator, update counters explicitly - // forward is not called http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java index 325f773..0bc1cd1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; /* * Specialized class for native vectorized reduce sink that is reducing on a single long key column. */ -public class VectorReduceSinkLongOperator extends VectorReduceSinkCommonOperator { +public class VectorReduceSinkLongOperator extends VectorReduceSinkUniformHashOperator { private static final long serialVersionUID = 1L; private static final String CLASS_NAME = VectorReduceSinkLongOperator.class.getName(); http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java index 2027187..1cca94d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerialize * Specialized class for native vectorized reduce sink that is reducing on multiple key columns * (or a single non-long / non-string column). */ -public class VectorReduceSinkMultiKeyOperator extends VectorReduceSinkCommonOperator { +public class VectorReduceSinkMultiKeyOperator extends VectorReduceSinkUniformHashOperator { private static final long serialVersionUID = 1L; private static final String CLASS_NAME = VectorReduceSinkMultiKeyOperator.class.getName(); http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java new file mode 100644 index 0000000..6312c44 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.reducesink; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; +import org.apache.hadoop.hive.ql.exec.TerminalOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hive.common.util.HashCodeUtil; + +import com.google.common.base.Preconditions; + +/** + * This class is uniform hash (common) operator class for native vectorized reduce sink. + */ +public class VectorReduceSinkObjectHashOperator extends VectorReduceSinkCommonOperator { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorReduceSinkObjectHashOperator.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + protected int[] reduceSinkBucketColumnMap; + protected TypeInfo[] reduceSinkBucketTypeInfos; + + protected VectorExpression[] reduceSinkBucketExpressions; + + protected int[] reduceSinkPartitionColumnMap; + protected TypeInfo[] reduceSinkPartitionTypeInfos; + + protected VectorExpression[] reduceSinkPartitionExpressions; + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + protected transient Output keyOutput; + protected transient VectorSerializeRow<BinarySortableSerializeWrite> keyVectorSerializeRow; + + private transient boolean hasBuckets; + private transient int numBuckets; + private transient ObjectInspector[] bucketObjectInspectors; + private transient VectorExtractRow bucketVectorExtractRow; + private transient Object[] bucketFieldValues; + + private transient boolean isPartitioned; + private transient ObjectInspector[] partitionObjectInspectors; + private transient VectorExtractRow partitionVectorExtractRow; + private transient Object[] partitionFieldValues; + private transient Random nonPartitionRandom; + + /** Kryo ctor. */ + protected VectorReduceSinkObjectHashOperator() { + super(); + } + + public VectorReduceSinkObjectHashOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorReduceSinkObjectHashOperator(CompilationOpContext ctx, + VectorizationContext vContext, OperatorDesc conf) throws HiveException { + super(ctx, vContext, conf); + + LOG.info("VectorReduceSinkObjectHashOperator constructor vectorReduceSinkInfo " + vectorReduceSinkInfo); + + // This the is Object Hash class variation. + Preconditions.checkState(!vectorReduceSinkInfo.getUseUniformHash()); + + reduceSinkBucketColumnMap = vectorReduceSinkInfo.getReduceSinkBucketColumnMap(); + reduceSinkBucketTypeInfos = vectorReduceSinkInfo.getReduceSinkBucketTypeInfos(); + reduceSinkBucketExpressions = vectorReduceSinkInfo.getReduceSinkBucketExpressions(); + + reduceSinkPartitionColumnMap = vectorReduceSinkInfo.getReduceSinkPartitionColumnMap(); + reduceSinkPartitionTypeInfos = vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos(); + reduceSinkPartitionExpressions = vectorReduceSinkInfo.getReduceSinkPartitionExpressions(); + } + + private ObjectInspector[] getObjectInspectorArray(TypeInfo[] typeInfos) { + final int size = typeInfos.length; + ObjectInspector[] objectInspectors = new ObjectInspector[size]; + for(int i = 0; i < size; i++) { + TypeInfo typeInfo = typeInfos[i]; + ObjectInspector standardWritableObjectInspector = + TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo); + objectInspectors[i] = standardWritableObjectInspector; + } + return objectInspectors; + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + + keyOutput = new Output(); + keyBinarySortableSerializeWrite.set(keyOutput); + keyVectorSerializeRow = + new VectorSerializeRow<BinarySortableSerializeWrite>( + keyBinarySortableSerializeWrite); + keyVectorSerializeRow.init(reduceSinkKeyTypeInfos, reduceSinkKeyColumnMap); + + hasBuckets = false; + isPartitioned = false; + numBuckets = 0; + + // Object Hash. + + numBuckets = conf.getNumBuckets(); + hasBuckets = (numBuckets > 0); + + if (hasBuckets) { + bucketObjectInspectors = getObjectInspectorArray(reduceSinkBucketTypeInfos); + bucketVectorExtractRow = new VectorExtractRow(); + bucketVectorExtractRow.init(reduceSinkBucketTypeInfos, reduceSinkBucketColumnMap); + bucketFieldValues = new Object[reduceSinkBucketTypeInfos.length]; + } + + isPartitioned = (conf.getPartitionCols() != null); + if (!isPartitioned) { + nonPartitionRandom = new Random(12345); + } else { + partitionObjectInspectors = getObjectInspectorArray(reduceSinkPartitionTypeInfos); + LOG.debug("*NEW* partitionObjectInspectors " + Arrays.toString(partitionObjectInspectors)); + partitionVectorExtractRow = new VectorExtractRow(); + partitionVectorExtractRow.init(reduceSinkPartitionTypeInfos, reduceSinkPartitionColumnMap); + partitionFieldValues = new Object[reduceSinkPartitionTypeInfos.length]; + } + } + + @Override + public void process(Object row, int tag) throws HiveException { + + try { + + VectorizedRowBatch batch = (VectorizedRowBatch) row; + + batchCounter++; + + if (batch.size == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); + } + return; + } + + // Perform any key expressions. Results will go into scratch columns. + if (reduceSinkKeyExpressions != null) { + for (VectorExpression ve : reduceSinkKeyExpressions) { + ve.evaluate(batch); + } + } + + // Perform any value expressions. Results will go into scratch columns. + if (reduceSinkValueExpressions != null) { + for (VectorExpression ve : reduceSinkValueExpressions) { + ve.evaluate(batch); + } + } + + // Perform any bucket expressions. Results will go into scratch columns. + if (reduceSinkBucketExpressions != null) { + for (VectorExpression ve : reduceSinkBucketExpressions) { + ve.evaluate(batch); + } + } + + // Perform any partition expressions. Results will go into scratch columns. + if (reduceSinkPartitionExpressions != null) { + for (VectorExpression ve : reduceSinkPartitionExpressions) { + ve.evaluate(batch); + } + } + + final boolean selectedInUse = batch.selectedInUse; + int[] selected = batch.selected; + + final int size = batch.size; + for (int logical = 0; logical < size; logical++) { + final int batchIndex = (selectedInUse ? selected[logical] : logical); + + final int hashCode; + if (!hasBuckets) { + if (!isPartitioned) { + hashCode = nonPartitionRandom.nextInt(); + } else { + partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); + hashCode = + ObjectInspectorUtils.getBucketHashCode( + partitionFieldValues, partitionObjectInspectors); + } + } else { + bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); + final int bucketNum = + ObjectInspectorUtils.getBucketNumber( + bucketFieldValues, bucketObjectInspectors, numBuckets); + if (!isPartitioned) { + hashCode = nonPartitionRandom.nextInt() * 31 + bucketNum; + } else { + partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); + hashCode = + ObjectInspectorUtils.getBucketHashCode( + partitionFieldValues, partitionObjectInspectors) * 31 + bucketNum; + } + } + + keyBinarySortableSerializeWrite.reset(); + keyVectorSerializeRow.serializeWrite(batch, batchIndex); + + // One serialized key for 1 or more rows for the duplicate keys. + final int keyLength = keyOutput.getLength(); + if (tag == -1 || reduceSkipTag) { + keyWritable.set(keyOutput.getData(), 0, keyLength); + } else { + keyWritable.setSize(keyLength + 1); + System.arraycopy(keyOutput.getData(), 0, keyWritable.get(), 0, keyLength); + keyWritable.get()[keyLength] = reduceTagByte; + } + keyWritable.setDistKeyLength(keyLength); + keyWritable.setHashCode(hashCode); + + valueLazyBinarySerializeWrite.reset(); + valueVectorSerializeRow.serializeWrite(batch, batchIndex); + + valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength()); + + collect(keyWritable, valueBytesWritable); + } + } catch (Exception e) { + throw new HiveException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java index b655e6e..a838f4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; /* * Specialized class for native vectorized reduce sink that is reducing on a single long key column. */ -public class VectorReduceSinkStringOperator extends VectorReduceSinkCommonOperator { +public class VectorReduceSinkStringOperator extends VectorReduceSinkUniformHashOperator { private static final long serialVersionUID = 1L; private static final String CLASS_NAME = VectorReduceSinkStringOperator.class.getName(); http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java new file mode 100644 index 0000000..2dfa721 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.reducesink; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; +import org.apache.hadoop.hive.ql.exec.TerminalOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hive.common.util.HashCodeUtil; + +/** + * This class is uniform hash (common) operator class for native vectorized reduce sink. + */ +public abstract class VectorReduceSinkUniformHashOperator extends VectorReduceSinkCommonOperator { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorReduceSinkUniformHashOperator.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + // The serialized all null key and its hash code. + private transient byte[] nullBytes; + private transient int nullKeyHashCode; + + // The object that determines equal key series. + protected transient VectorKeySeriesSerialized serializedKeySeries; + + + /** Kryo ctor. */ + protected VectorReduceSinkUniformHashOperator() { + super(); + } + + public VectorReduceSinkUniformHashOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorReduceSinkUniformHashOperator(CompilationOpContext ctx, + VectorizationContext vContext, OperatorDesc conf) throws HiveException { + super(ctx, vContext, conf); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + + // Create all nulls key. + try { + Output nullKeyOutput = new Output(); + keyBinarySortableSerializeWrite.set(nullKeyOutput); + for (int i = 0; i < reduceSinkKeyColumnMap.length; i++) { + keyBinarySortableSerializeWrite.writeNull(); + } + int nullBytesLength = nullKeyOutput.getLength(); + nullBytes = new byte[nullBytesLength]; + System.arraycopy(nullKeyOutput.getData(), 0, nullBytes, 0, nullBytesLength); + nullKeyHashCode = HashCodeUtil.calculateBytesHashCode(nullBytes, 0, nullBytesLength); + } catch (Exception e) { + throw new HiveException(e); + } + } + + @Override + public void process(Object row, int tag) throws HiveException { + + try { + VectorizedRowBatch batch = (VectorizedRowBatch) row; + + batchCounter++; + + if (batch.size == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); + } + return; + } + + // Perform any key expressions. Results will go into scratch columns. + if (reduceSinkKeyExpressions != null) { + for (VectorExpression ve : reduceSinkKeyExpressions) { + ve.evaluate(batch); + } + } + + // Perform any value expressions. Results will go into scratch columns. + if (reduceSinkValueExpressions != null) { + for (VectorExpression ve : reduceSinkValueExpressions) { + ve.evaluate(batch); + } + } + + serializedKeySeries.processBatch(batch); + + boolean selectedInUse = batch.selectedInUse; + int[] selected = batch.selected; + + int keyLength; + int logical; + int end; + int batchIndex; + do { + if (serializedKeySeries.getCurrentIsAllNull()) { + + // Use the same logic as ReduceSinkOperator.toHiveKey. + // + if (tag == -1 || reduceSkipTag) { + keyWritable.set(nullBytes, 0, nullBytes.length); + } else { + keyWritable.setSize(nullBytes.length + 1); + System.arraycopy(nullBytes, 0, keyWritable.get(), 0, nullBytes.length); + keyWritable.get()[nullBytes.length] = reduceTagByte; + } + keyWritable.setDistKeyLength(nullBytes.length); + keyWritable.setHashCode(nullKeyHashCode); + + } else { + + // One serialized key for 1 or more rows for the duplicate keys. + // LOG.info("reduceSkipTag " + reduceSkipTag + " tag " + tag + " reduceTagByte " + (int) reduceTagByte + " keyLength " + serializedKeySeries.getSerializedLength()); + // LOG.info("process offset " + serializedKeySeries.getSerializedStart() + " length " + serializedKeySeries.getSerializedLength()); + keyLength = serializedKeySeries.getSerializedLength(); + if (tag == -1 || reduceSkipTag) { + keyWritable.set(serializedKeySeries.getSerializedBytes(), + serializedKeySeries.getSerializedStart(), keyLength); + } else { + keyWritable.setSize(keyLength + 1); + System.arraycopy(serializedKeySeries.getSerializedBytes(), + serializedKeySeries.getSerializedStart(), keyWritable.get(), 0, keyLength); + keyWritable.get()[keyLength] = reduceTagByte; + } + keyWritable.setDistKeyLength(keyLength); + keyWritable.setHashCode(serializedKeySeries.getCurrentHashCode()); + } + + logical = serializedKeySeries.getCurrentLogical(); + end = logical + serializedKeySeries.getCurrentDuplicateCount(); + do { + batchIndex = (selectedInUse ? selected[logical] : logical); + + valueLazyBinarySerializeWrite.reset(); + valueVectorSerializeRow.serializeWrite(batch, batchIndex); + + valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength()); + + collect(keyWritable, valueBytesWritable); + } while (++logical < end); + + if (!serializedKeySeries.next()) { + break; + } + } while (true); + + } catch (Exception e) { + throw new HiveException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 32ec1d7..bf60c10 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; @@ -62,6 +63,7 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyO import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator; import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkLongOperator; import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkMultiKeyOperator; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkObjectHashOperator; import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkStringOperator; import org.apache.hadoop.hive.ql.exec.vector.udf.VectorUDFAdaptor; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; @@ -143,6 +145,9 @@ import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.VectorSelectDesc; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; +import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.UDFAcos; import org.apache.hadoop.hive.ql.udf.UDFAsin; @@ -2909,9 +2914,6 @@ public class Vectorizer implements PhysicalPlanResolver { Operator<? extends OperatorDesc> op, VectorizationContext vContext, ReduceSinkDesc desc, VectorReduceSinkInfo vectorReduceSinkInfo) throws HiveException { - Operator<? extends OperatorDesc> vectorOp = null; - Class<? extends Operator<?>> opClass = null; - Type[] reduceSinkKeyColumnVectorTypes = vectorReduceSinkInfo.getReduceSinkKeyColumnVectorTypes(); // By default, we can always use the multi-key class. @@ -2948,18 +2950,23 @@ public class Vectorizer implements PhysicalPlanResolver { } } - switch (reduceSinkKeyType) { - case LONG: - opClass = VectorReduceSinkLongOperator.class; - break; - case STRING: - opClass = VectorReduceSinkStringOperator.class; - break; - case MULTI_KEY: - opClass = VectorReduceSinkMultiKeyOperator.class; - break; - default: - throw new HiveException("Unknown reduce sink key type " + reduceSinkKeyType); + Class<? extends Operator<?>> opClass = null; + if (vectorReduceSinkInfo.getUseUniformHash()) { + switch (reduceSinkKeyType) { + case LONG: + opClass = VectorReduceSinkLongOperator.class; + break; + case STRING: + opClass = VectorReduceSinkStringOperator.class; + break; + case MULTI_KEY: + opClass = VectorReduceSinkMultiKeyOperator.class; + break; + default: + throw new HiveException("Unknown reduce sink key type " + reduceSinkKeyType); + } + } else { + opClass = VectorReduceSinkObjectHashOperator.class; } VectorReduceSinkDesc vectorDesc = (VectorReduceSinkDesc) desc.getVectorDesc(); @@ -2967,9 +2974,17 @@ public class Vectorizer implements PhysicalPlanResolver { vectorDesc.setReduceSinkKeyType(reduceSinkKeyType); vectorDesc.setVectorReduceSinkInfo(vectorReduceSinkInfo); - vectorOp = OperatorFactory.getVectorOperator( - opClass, op.getCompilationOpContext(), op.getConf(), vContext); - LOG.info("Vectorizer vectorizeOperator reduce sink class " + vectorOp.getClass().getSimpleName()); + LOG.info("Vectorizer vectorizeOperator reduce sink class " + opClass.getSimpleName()); + + Operator<? extends OperatorDesc> vectorOp = null; + try { + vectorOp = OperatorFactory.getVectorOperator( + opClass, op.getCompilationOpContext(), op.getConf(), vContext); + } catch (Exception e) { + LOG.info("Vectorizer vectorizeOperator reduce sink class exception " + opClass.getSimpleName() + + " exception " + e); + throw new HiveException(e); + } return vectorOp; } @@ -2983,18 +2998,20 @@ public class Vectorizer implements PhysicalPlanResolver { VectorReduceSinkDesc vectorDesc = new VectorReduceSinkDesc(); desc.setVectorDesc(vectorDesc); - boolean isVectorizationReduceSinkNativeEnabled = HiveConf.getBoolVar(hiveConf, - HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCESINK_NEW_ENABLED); + // Various restrictions. - String engine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); - - boolean hasBuckets = desc.getBucketCols() != null && !desc.getBucketCols().isEmpty(); + // Set this if we encounter a condition we were not expecting. + boolean isUnexpectedCondition = false; - boolean hasTopN = desc.getTopN() >= 0; + boolean isVectorizationReduceSinkNativeEnabled = + HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCESINK_NEW_ENABLED); - boolean useUniformHash = desc.getReducerTraits().contains(UNIFORM); + String engine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + + boolean hasTopN = (desc.getTopN() >= 0); - boolean hasDistinctColumns = desc.getDistinctColumnIndices().size() > 0; + boolean hasDistinctColumns = (desc.getDistinctColumnIndices().size() > 0); TableDesc keyTableDesc = desc.getKeySerializeInfo(); Class<? extends Deserializer> keySerializerClass = keyTableDesc.getDeserializerClass(); @@ -3004,28 +3021,6 @@ public class Vectorizer implements PhysicalPlanResolver { Class<? extends Deserializer> valueDeserializerClass = valueTableDesc.getDeserializerClass(); boolean isValueLazyBinary = (valueDeserializerClass == org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.class); - // Remember the condition variables for EXPLAIN regardless. - vectorDesc.setIsVectorizationReduceSinkNativeEnabled(isVectorizationReduceSinkNativeEnabled); - vectorDesc.setEngine(engine); - vectorDesc.setHasBuckets(hasBuckets); - vectorDesc.setHasTopN(hasTopN); - vectorDesc.setUseUniformHash(useUniformHash); - vectorDesc.setHasDistinctColumns(hasDistinctColumns); - vectorDesc.setIsKeyBinarySortable(isKeyBinarySortable); - vectorDesc.setIsValueLazyBinary(isValueLazyBinary); - - // Many restrictions. - if (!isVectorizationReduceSinkNativeEnabled || - !isTezOrSpark || - hasBuckets || - hasTopN || - !useUniformHash || - hasDistinctColumns || - !isKeyBinarySortable || - !isValueLazyBinary) { - return false; - } - // We are doing work here we'd normally do in VectorGroupByCommonOperator's constructor. // So if we later decide not to specialize, we'll just waste any scratch columns allocated... @@ -3089,6 +3084,129 @@ public class Vectorizer implements PhysicalPlanResolver { vectorReduceSinkInfo.setReduceSinkValueColumnVectorTypes(reduceSinkValueColumnVectorTypes); vectorReduceSinkInfo.setReduceSinkValueExpressions(reduceSinkValueExpressions); + boolean useUniformHash = desc.getReducerTraits().contains(UNIFORM); + vectorReduceSinkInfo.setUseUniformHash(useUniformHash); + + boolean hasEmptyBuckets = false; + boolean hasNoPartitions = false; + if (useUniformHash) { + + // Check for unexpected conditions... + hasEmptyBuckets = + (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) || + (desc.getPartitionCols().size() == 0); + + if (hasEmptyBuckets) { + LOG.info("Unexpected condition: UNIFORM hash and empty buckets"); + isUnexpectedCondition = true; + } + + hasNoPartitions = (desc.getPartitionCols() == null); + + if (hasNoPartitions) { + LOG.info("Unexpected condition: UNIFORM hash and no partitions"); + isUnexpectedCondition = true; + } + + } else { + + // Collect bucket and/or partition information for object hashing. + + int[] reduceSinkBucketColumnMap = null; + TypeInfo[] reduceSinkBucketTypeInfos = null; + Type[] reduceSinkBucketColumnVectorTypes = null; + VectorExpression[] reduceSinkBucketExpressions = null; + + List<ExprNodeDesc> bucketDescs = desc.getBucketCols(); + if (bucketDescs != null) { + VectorExpression[] allBucketExpressions = vContext.getVectorExpressions(bucketDescs); + + reduceSinkBucketColumnMap = new int[bucketDescs.size()]; + reduceSinkBucketTypeInfos = new TypeInfo[bucketDescs.size()]; + reduceSinkBucketColumnVectorTypes = new Type[bucketDescs.size()]; + ArrayList<VectorExpression> reduceSinkBucketExpressionsList = new ArrayList<VectorExpression>(); + for (int i = 0; i < bucketDescs.size(); ++i) { + VectorExpression ve = allBucketExpressions[i]; + reduceSinkBucketColumnMap[i] = ve.getOutputColumn(); + reduceSinkBucketTypeInfos[i] = bucketDescs.get(i).getTypeInfo(); + reduceSinkBucketColumnVectorTypes[i] = + VectorizationContext.getColumnVectorTypeFromTypeInfo(reduceSinkBucketTypeInfos[i]); + if (!IdentityExpression.isColumnOnly(ve)) { + reduceSinkBucketExpressionsList.add(ve); + } + } + if (reduceSinkBucketExpressionsList.size() == 0) { + reduceSinkBucketExpressions = null; + } else { + reduceSinkBucketExpressions = reduceSinkBucketExpressionsList.toArray(new VectorExpression[0]); + } + } + + int[] reduceSinkPartitionColumnMap = null; + TypeInfo[] reduceSinkPartitionTypeInfos = null; + Type[] reduceSinkPartitionColumnVectorTypes = null; + VectorExpression[] reduceSinkPartitionExpressions = null; + + List<ExprNodeDesc> partitionDescs = desc.getPartitionCols(); + if (partitionDescs != null) { + VectorExpression[] allPartitionExpressions = vContext.getVectorExpressions(partitionDescs); + + reduceSinkPartitionColumnMap = new int[partitionDescs.size()]; + reduceSinkPartitionTypeInfos = new TypeInfo[partitionDescs.size()]; + reduceSinkPartitionColumnVectorTypes = new Type[partitionDescs.size()]; + ArrayList<VectorExpression> reduceSinkPartitionExpressionsList = new ArrayList<VectorExpression>(); + for (int i = 0; i < partitionDescs.size(); ++i) { + VectorExpression ve = allPartitionExpressions[i]; + reduceSinkPartitionColumnMap[i] = ve.getOutputColumn(); + reduceSinkPartitionTypeInfos[i] = partitionDescs.get(i).getTypeInfo(); + reduceSinkPartitionColumnVectorTypes[i] = + VectorizationContext.getColumnVectorTypeFromTypeInfo(reduceSinkPartitionTypeInfos[i]); + if (!IdentityExpression.isColumnOnly(ve)) { + reduceSinkPartitionExpressionsList.add(ve); + } + } + if (reduceSinkPartitionExpressionsList.size() == 0) { + reduceSinkPartitionExpressions = null; + } else { + reduceSinkPartitionExpressions = reduceSinkPartitionExpressionsList.toArray(new VectorExpression[0]); + } + } + + vectorReduceSinkInfo.setReduceSinkBucketColumnMap(reduceSinkBucketColumnMap); + vectorReduceSinkInfo.setReduceSinkBucketTypeInfos(reduceSinkBucketTypeInfos); + vectorReduceSinkInfo.setReduceSinkBucketColumnVectorTypes(reduceSinkBucketColumnVectorTypes); + vectorReduceSinkInfo.setReduceSinkBucketExpressions(reduceSinkBucketExpressions); + + vectorReduceSinkInfo.setReduceSinkPartitionColumnMap(reduceSinkPartitionColumnMap); + vectorReduceSinkInfo.setReduceSinkPartitionTypeInfos(reduceSinkPartitionTypeInfos); + vectorReduceSinkInfo.setReduceSinkPartitionColumnVectorTypes(reduceSinkPartitionColumnVectorTypes); + vectorReduceSinkInfo.setReduceSinkPartitionExpressions(reduceSinkPartitionExpressions); + } + + // Remember the condition variables for EXPLAIN regardless. + vectorDesc.setIsVectorizationReduceSinkNativeEnabled(isVectorizationReduceSinkNativeEnabled); + vectorDesc.setEngine(engine); + vectorDesc.setHasTopN(hasTopN); + vectorDesc.setHasDistinctColumns(hasDistinctColumns); + vectorDesc.setIsKeyBinarySortable(isKeyBinarySortable); + vectorDesc.setIsValueLazyBinary(isValueLazyBinary); + + // This indicates we logged an inconsistency (from our point-of-view) and will not make this + // operator native... + vectorDesc.setIsUnexpectedCondition(isUnexpectedCondition); + + // Many restrictions. + if (!isVectorizationReduceSinkNativeEnabled || + !isTezOrSpark || + (useUniformHash && (hasEmptyBuckets || hasNoPartitions)) || + hasTopN || + hasDistinctColumns || + !isKeyBinarySortable || + !isValueLazyBinary || + isUnexpectedCondition) { + return false; + } + return true; } http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 11e9c20..38461d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -534,15 +534,9 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { engineInSupported, engineInSupportedCondName), new VectorizationCondition( - !vectorReduceSinkDesc.getHasBuckets(), - "No buckets"), - new VectorizationCondition( !vectorReduceSinkDesc.getHasTopN(), "No TopN"), new VectorizationCondition( - vectorReduceSinkDesc.getUseUniformHash(), - "Uniform Hash"), - new VectorizationCondition( !vectorReduceSinkDesc.getHasDistinctColumns(), "No DISTINCT columns"), new VectorizationCondition( @@ -552,6 +546,15 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { vectorReduceSinkDesc.getIsValueLazyBinary(), "LazyBinarySerDe for values") }; + if (vectorReduceSinkDesc.getIsUnexpectedCondition()) { + VectorizationCondition[] newConditions = new VectorizationCondition[conditions.length + 1]; + System.arraycopy(conditions, 0, newConditions, 0, conditions.length); + newConditions[conditions.length] = + new VectorizationCondition( + false, + "NOT UnexpectedCondition"); + conditions = newConditions; + } return conditions; } http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java index 445dcca..d6230af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java @@ -64,12 +64,11 @@ public class VectorReduceSinkDesc extends AbstractVectorDesc { private boolean isVectorizationReduceSinkNativeEnabled; private String engine; - private boolean hasBuckets; private boolean hasTopN; - private boolean useUniformHash; private boolean hasDistinctColumns; private boolean isKeyBinarySortable; private boolean isValueLazyBinary; + private boolean isUnexpectedCondition; /* * The following conditions are for native Vector ReduceSink. @@ -86,24 +85,12 @@ public class VectorReduceSinkDesc extends AbstractVectorDesc { public String getEngine() { return engine; } - public void setHasBuckets(boolean hasBuckets) { - this.hasBuckets = hasBuckets; - } - public boolean getHasBuckets() { - return hasBuckets; - } public void setHasTopN(boolean hasTopN) { this.hasTopN = hasTopN; } public boolean getHasTopN() { return hasTopN; } - public void setUseUniformHash(boolean useUniformHash) { - this.useUniformHash = useUniformHash; - } - public boolean getUseUniformHash() { - return useUniformHash; - } public void setHasDistinctColumns(boolean hasDistinctColumns) { this.hasDistinctColumns = hasDistinctColumns; } @@ -122,4 +109,10 @@ public class VectorReduceSinkDesc extends AbstractVectorDesc { public boolean getIsValueLazyBinary() { return isValueLazyBinary; } + public void setIsUnexpectedCondition(boolean isUnexpectedCondition) { + this.isUnexpectedCondition = isUnexpectedCondition; + } + public boolean getIsUnexpectedCondition() { + return isUnexpectedCondition; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkInfo.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkInfo.java index da6e606..5bafd5b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkInfo.java @@ -35,6 +35,8 @@ public class VectorReduceSinkInfo { private static final long serialVersionUID = 1L; + private boolean useUniformHash; + private int[] reduceSinkKeyColumnMap; private TypeInfo[] reduceSinkKeyTypeInfos; private Type[] reduceSinkKeyColumnVectorTypes; @@ -45,7 +47,19 @@ public class VectorReduceSinkInfo { private Type[] reduceSinkValueColumnVectorTypes; private VectorExpression[] reduceSinkValueExpressions; + private int[] reduceSinkBucketColumnMap; + private TypeInfo[] reduceSinkBucketTypeInfos; + private Type[] reduceSinkBucketColumnVectorTypes; + private VectorExpression[] reduceSinkBucketExpressions; + + private int[] reduceSinkPartitionColumnMap; + private TypeInfo[] reduceSinkPartitionTypeInfos; + private Type[] reduceSinkPartitionColumnVectorTypes; + private VectorExpression[] reduceSinkPartitionExpressions; + public VectorReduceSinkInfo() { + useUniformHash = false; + reduceSinkKeyColumnMap = null; reduceSinkKeyTypeInfos = null; reduceSinkKeyColumnVectorTypes = null; @@ -55,6 +69,24 @@ public class VectorReduceSinkInfo { reduceSinkValueTypeInfos = null; reduceSinkValueColumnVectorTypes = null; reduceSinkValueExpressions = null; + + reduceSinkBucketColumnMap = null; + reduceSinkBucketTypeInfos = null; + reduceSinkBucketColumnVectorTypes = null; + reduceSinkBucketExpressions = null; + + reduceSinkPartitionColumnMap = null; + reduceSinkPartitionTypeInfos = null; + reduceSinkPartitionColumnVectorTypes = null; + reduceSinkPartitionExpressions = null; + } + + public boolean getUseUniformHash() { + return useUniformHash; + } + + public void setUseUniformHash(boolean useUniformHash) { + this.useUniformHash = useUniformHash; } public int[] getReduceSinkKeyColumnMap() { @@ -120,4 +152,68 @@ public class VectorReduceSinkInfo { public void setReduceSinkValueExpressions(VectorExpression[] reduceSinkValueExpressions) { this.reduceSinkValueExpressions = reduceSinkValueExpressions; } + + public int[] getReduceSinkBucketColumnMap() { + return reduceSinkBucketColumnMap; + } + + public void setReduceSinkBucketColumnMap(int[] reduceSinkBucketColumnMap) { + this.reduceSinkBucketColumnMap = reduceSinkBucketColumnMap; + } + + public TypeInfo[] getReduceSinkBucketTypeInfos() { + return reduceSinkBucketTypeInfos; + } + + public void setReduceSinkBucketTypeInfos(TypeInfo[] reduceSinkBucketTypeInfos) { + this.reduceSinkBucketTypeInfos = reduceSinkBucketTypeInfos; + } + + public Type[] getReduceSinkBucketColumnVectorTypes() { + return reduceSinkBucketColumnVectorTypes; + } + + public void setReduceSinkBucketColumnVectorTypes(Type[] reduceSinkBucketColumnVectorTypes) { + this.reduceSinkBucketColumnVectorTypes = reduceSinkBucketColumnVectorTypes; + } + + public VectorExpression[] getReduceSinkBucketExpressions() { + return reduceSinkBucketExpressions; + } + + public void setReduceSinkBucketExpressions(VectorExpression[] reduceSinkBucketExpressions) { + this.reduceSinkBucketExpressions = reduceSinkBucketExpressions; + } + + public int[] getReduceSinkPartitionColumnMap() { + return reduceSinkPartitionColumnMap; + } + + public void setReduceSinkPartitionColumnMap(int[] reduceSinkPartitionColumnMap) { + this.reduceSinkPartitionColumnMap = reduceSinkPartitionColumnMap; + } + + public TypeInfo[] getReduceSinkPartitionTypeInfos() { + return reduceSinkPartitionTypeInfos; + } + + public void setReduceSinkPartitionTypeInfos(TypeInfo[] reduceSinkPartitionTypeInfos) { + this.reduceSinkPartitionTypeInfos = reduceSinkPartitionTypeInfos; + } + + public Type[] getReduceSinkPartitionColumnVectorTypes() { + return reduceSinkPartitionColumnVectorTypes; + } + + public void setReduceSinkPartitionColumnVectorTypes(Type[] reduceSinkPartitionColumnVectorTypes) { + this.reduceSinkPartitionColumnVectorTypes = reduceSinkPartitionColumnVectorTypes; + } + + public VectorExpression[] getReduceSinkPartitionExpressions() { + return reduceSinkPartitionExpressions; + } + + public void setReduceSinkPartitionExpressions(VectorExpression[] reduceSinkPartitionExpressions) { + this.reduceSinkPartitionExpressions = reduceSinkPartitionExpressions; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out b/ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out index 0e16ff1..85d65d6 100644 --- a/ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out +++ b/ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out @@ -1106,10 +1106,10 @@ Table: over1k_part_buck_orc #### A masked pattern was here #### Partition Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} - numFiles 8 + numFiles 2 numRows 32 rawDataSize 640 - totalSize 4524 + totalSize 1424 #### A masked pattern was here #### # Storage Information @@ -1147,10 +1147,10 @@ Table: over1k_part_buck_orc #### A masked pattern was here #### Partition Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} - numFiles 4 - numRows 6 - rawDataSize 120 - totalSize 2004 + numFiles 2 + numRows 4 + rawDataSize 80 + totalSize 936 #### A masked pattern was here #### # Storage Information @@ -1188,10 +1188,10 @@ Table: over1k_part_buck_sort_orc #### A masked pattern was here #### Partition Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} - numFiles 8 + numFiles 2 numRows 32 rawDataSize 640 - totalSize 4510 + totalSize 1416 #### A masked pattern was here #### # Storage Information @@ -1229,10 +1229,10 @@ Table: over1k_part_buck_sort_orc #### A masked pattern was here #### Partition Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} - numFiles 4 - numRows 6 - rawDataSize 120 - totalSize 2004 + numFiles 2 + numRows 4 + rawDataSize 80 + totalSize 944 #### A masked pattern was here #### # Storage Information @@ -1283,7 +1283,7 @@ POSTHOOK: Input: default@over1k_part_buck_orc POSTHOOK: Input: default@over1k_part_buck_orc@t=27 POSTHOOK: Input: default@over1k_part_buck_orc@t=__HIVE_DEFAULT_PARTITION__ #### A masked pattern was here #### -38 +34 PREHOOK: query: select count(*) from over1k_part_buck_sort_orc PREHOOK: type: QUERY PREHOOK: Input: default@over1k_part_buck_sort_orc @@ -1296,7 +1296,7 @@ POSTHOOK: Input: default@over1k_part_buck_sort_orc POSTHOOK: Input: default@over1k_part_buck_sort_orc@t=27 POSTHOOK: Input: default@over1k_part_buck_sort_orc@t=__HIVE_DEFAULT_PARTITION__ #### A masked pattern was here #### -38 +34 PREHOOK: query: create table over1k_part2_orc( si smallint, i int, @@ -2472,9 +2472,9 @@ Table: over1k_part_buck_sort2_orc Partition Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 1 - numRows 3 - rawDataSize 78 - totalSize 81 + numRows 2 + rawDataSize 52 + totalSize 27 #### A masked pattern was here #### # Storage Information @@ -2534,8 +2534,6 @@ POSTHOOK: Input: default@over1k_part_buck_sort2_orc@t=__HIVE_DEFAULT_PARTITION__ 503 65628 4294967371 95.07 27 401 65779 4294967402 97.39 27 340 65677 4294967461 98.96 27 -409 65536 4294967490 46.97 NULL -374 65560 4294967516 65.43 NULL 473 65720 4294967324 80.74 NULL PREHOOK: query: explain select count(*) from over1k_part_buck_sort2_orc PREHOOK: type: QUERY @@ -2557,9 +2555,9 @@ STAGE PLANS: Map Operator Tree: TableScan alias: over1k_part_buck_sort2_orc - Statistics: Num rows: 19 Data size: 645 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 18 Data size: 611 Basic stats: COMPLETE Column stats: COMPLETE Select Operator - Statistics: Num rows: 19 Data size: 645 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 18 Data size: 611 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator aggregations: count() mode: hash @@ -2605,4 +2603,4 @@ POSTHOOK: Input: default@over1k_part_buck_sort2_orc POSTHOOK: Input: default@over1k_part_buck_sort2_orc@t=27 POSTHOOK: Input: default@over1k_part_buck_sort2_orc@t=__HIVE_DEFAULT_PARTITION__ #### A masked pattern was here #### -19 +17 http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/test/results/clientpositive/llap/vector_adaptor_usage_mode.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/vector_adaptor_usage_mode.q.out b/ql/src/test/results/clientpositive/llap/vector_adaptor_usage_mode.q.out index 8b82442..5b17144 100644 --- a/ql/src/test/results/clientpositive/llap/vector_adaptor_usage_mode.q.out +++ b/ql/src/test/results/clientpositive/llap/vector_adaptor_usage_mode.q.out @@ -1101,7 +1101,7 @@ STAGE PLANS: Reduce Sink Vectorization: className: VectorReduceSinkStringOperator native: true - nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No buckets IS true, No TopN IS true, Uniform Hash IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true Statistics: Num rows: 5 Data size: 452 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: bigint) Execution mode: vectorized, llap http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/test/results/clientpositive/llap/vector_auto_smb_mapjoin_14.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/vector_auto_smb_mapjoin_14.q.out b/ql/src/test/results/clientpositive/llap/vector_auto_smb_mapjoin_14.q.out index fbb2676..10bd85e 100644 --- a/ql/src/test/results/clientpositive/llap/vector_auto_smb_mapjoin_14.q.out +++ b/ql/src/test/results/clientpositive/llap/vector_auto_smb_mapjoin_14.q.out @@ -285,10 +285,9 @@ STAGE PLANS: Reduce Output Operator sort order: Reduce Sink Vectorization: - className: VectorReduceSinkOperator - native: false - nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No buckets IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true - nativeConditionsNotMet: Uniform Hash IS false + className: VectorReduceSinkObjectHashOperator + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Reducer 3 @@ -532,7 +531,7 @@ STAGE PLANS: Reduce Sink Vectorization: className: VectorReduceSinkLongOperator native: true - nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No buckets IS true, No TopN IS true, Uniform Hash IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true Statistics: Num rows: 5 Data size: 465 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: bigint) Reducer 3 @@ -587,7 +586,7 @@ STAGE PLANS: Reduce Sink Vectorization: className: VectorReduceSinkLongOperator native: true - nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No buckets IS true, No TopN IS true, Uniform Hash IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true Statistics: Num rows: 5 Data size: 465 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: bigint) @@ -1293,7 +1292,7 @@ STAGE PLANS: Reduce Sink Vectorization: className: VectorReduceSinkLongOperator native: true - nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No buckets IS true, No TopN IS true, Uniform Hash IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true Statistics: Num rows: 10 Data size: 930 Basic stats: COMPLETE Column stats: NONE Execution mode: vectorized, llap LLAP IO: all inputs @@ -1336,7 +1335,7 @@ STAGE PLANS: Reduce Sink Vectorization: className: VectorReduceSinkLongOperator native: true - nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No buckets IS true, No TopN IS true, Uniform Hash IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true Statistics: Num rows: 10 Data size: 930 Basic stats: COMPLETE Column stats: NONE Execution mode: vectorized, llap LLAP IO: all inputs http://git-wip-us.apache.org/repos/asf/hive/blob/27f27219/ql/src/test/results/clientpositive/llap/vector_between_columns.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/vector_between_columns.q.out b/ql/src/test/results/clientpositive/llap/vector_between_columns.q.out index 8897163..923e579 100644 --- a/ql/src/test/results/clientpositive/llap/vector_between_columns.q.out +++ b/ql/src/test/results/clientpositive/llap/vector_between_columns.q.out @@ -162,10 +162,9 @@ STAGE PLANS: Reduce Output Operator sort order: Reduce Sink Vectorization: - className: VectorReduceSinkOperator - native: false - nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No buckets IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true - nativeConditionsNotMet: Uniform Hash IS false + className: VectorReduceSinkObjectHashOperator + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true Statistics: Num rows: 5 Data size: 36 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int), _col1 (type: smallint) Execution mode: vectorized, llap @@ -175,7 +174,7 @@ STAGE PLANS: enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true groupByVectorOutput: true inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat - allNative: false + allNative: true usesVectorUDFAdaptor: false vectorized: true @@ -330,10 +329,9 @@ STAGE PLANS: Reduce Output Operator sort order: Reduce Sink Vectorization: - className: VectorReduceSinkOperator - native: false - nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No buckets IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true - nativeConditionsNotMet: Uniform Hash IS false + className: VectorReduceSinkObjectHashOperator + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true Statistics: Num rows: 5 Data size: 36 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int), _col1 (type: smallint) Execution mode: vectorized, llap @@ -343,7 +341,7 @@ STAGE PLANS: enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true groupByVectorOutput: true inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat - allNative: false + allNative: true usesVectorUDFAdaptor: false vectorized: true
