Repository: hive Updated Branches: refs/heads/master 2422e1808 -> f7dea1060
HIVE-18523: Fix summary row in case there are no inputs (Zoltan Haindrich reviewed by Ashutosh Chauhan) Signed-off-by: Zoltan Haindrich <k...@rxd.hu> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f942e72a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f942e72a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f942e72a Branch: refs/heads/master Commit: f942e72abf450f1f00f27261532a54f5f81d5170 Parents: 2422e18 Author: Zoltan Haindrich <k...@rxd.hu> Authored: Wed Feb 7 09:35:11 2018 +0100 Committer: Zoltan Haindrich <k...@rxd.hu> Committed: Wed Feb 7 09:35:11 2018 +0100 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/GroupByOperator.java | 25 +++++----- .../hadoop/hive/ql/exec/IConfigureJobConf.java | 30 ++++++++++++ .../apache/hadoop/hive/ql/exec/Utilities.java | 4 +- .../hadoop/hive/ql/exec/mr/ExecDriver.java | 1 + .../hadoop/hive/ql/exec/mr/ExecMapRunner.java | 40 ++++++++++++++++ .../hadoop/hive/ql/exec/mr/ExecMapper.java | 18 ++++--- .../hive/ql/exec/tez/HiveSplitGenerator.java | 1 - .../ql/exec/vector/VectorGroupByOperator.java | 40 ++++++++++------ .../hive/ql/io/CombineHiveInputFormat.java | 10 +++- .../hadoop/hive/ql/io/HiveInputFormat.java | 9 ++++ .../hadoop/hive/ql/io/NullRowsInputFormat.java | 14 ++++-- .../org/apache/hadoop/hive/ql/plan/MapWork.java | 49 ++++++++++++-------- .../apache/hadoop/hive/ql/plan/ReduceWork.java | 7 --- .../clientpositive/groupby_rollup_empty.q | 7 +++ .../clientpositive/groupby_rollup_empty.q.out | 28 +++++++++++ .../llap/groupby_rollup_empty.q.out | 28 +++++++++++ 16 files changed, 245 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 6a0f0de..6de979e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -36,13 +36,13 @@ import org.apache.hadoop.hive.common.type.TimestampTZ; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.LlapDaemonInfo; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.tez.TezContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc.Mode; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; @@ -67,13 +67,14 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; import javolution.util.FastBitSet; /** * GroupBy operator implementation. */ -public class GroupByOperator extends Operator<GroupByDesc> { +public class GroupByOperator extends Operator<GroupByDesc> implements IConfigureJobConf { private static final long serialVersionUID = 1L; private static final int NUMROWSESTIMATESIZE = 1000; @@ -1164,14 +1165,15 @@ public class GroupByOperator extends Operator<GroupByDesc> { } public static boolean shouldEmitSummaryRow(GroupByDesc desc) { - // exactly one reducer should emit the summary row - if (!firstReducer()) { - return false; - } // empty keyset is basically () if (desc.getKeys().size() == 0) { return true; } + + if (desc.getMode() != Mode.HASH && desc.getMode() != Mode.COMPLETE && desc.getMode() != Mode.PARTIAL1) { + return false; + } + int groupingSetPosition = desc.getGroupingSetPosition(); List<Integer> listGroupingSets = desc.getListGroupingSets(); // groupingSets are known at map/reducer side; but have to do real processing @@ -1185,13 +1187,12 @@ public class GroupByOperator extends Operator<GroupByDesc> { return false; } - public static boolean firstReducer() { - MapredContext ctx = TezContext.get(); - if (ctx != null && ctx instanceof TezContext) { - TezContext tezContext = (TezContext) ctx; - return tezContext.getTezProcessorContext().getTaskIndex() == 0; + @Override + public void configureJobConf(JobConf job) { + // only needed when grouping sets are present + if (conf.getGroupingSetPosition() > 0 && shouldEmitSummaryRow(conf)) { + job.setBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, true); } - return true; } } http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/IConfigureJobConf.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/IConfigureJobConf.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/IConfigureJobConf.java new file mode 100644 index 0000000..dd6da67 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/IConfigureJobConf.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.mapred.JobConf; + +/** + * Enables an operator to be able to make changes to the {@link JobConf}. + * + * Invoked during compilation phase only. + */ +public interface IConfigureJobConf { + void configureJobConf(JobConf job); +} http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 941dd58..675ca12 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -52,7 +52,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Collection; -import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -244,6 +243,7 @@ public final class Utilities { public static final String USE_VECTORIZED_INPUT_FILE_FORMAT = "USE_VECTORIZED_INPUT_FILE_FORMAT"; public static final String MAPNAME = "Map "; public static final String REDUCENAME = "Reducer "; + public static final String ENSURE_OPERATORS_EXECUTED = "ENSURE_OPERATORS_EXECUTED"; @Deprecated protected static final String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = "mapred.dfsclient.parallelism.max"; @@ -361,7 +361,7 @@ public final class Utilities { } public static BaseWork getMergeWork(Configuration jconf) { - String currentMergePrefix = jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX); + String currentMergePrefix = jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX); if (StringUtils.isEmpty(currentMergePrefix)) { return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 976b537..b436e80 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -256,6 +256,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop //See the javadoc on HiveOutputFormatImpl and HadoopShims.prepareJobOutput() job.setOutputFormat(HiveOutputFormatImpl.class); + job.setMapRunnerClass(ExecMapRunner.class); job.setMapperClass(ExecMapper.class); job.setMapOutputKeyClass(HiveKey.class); http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapRunner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapRunner.java new file mode 100644 index 0000000..d8f8c2a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapRunner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.mr; + +import java.io.IOException; + +import org.apache.hadoop.mapred.MapRunner; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +public class ExecMapRunner<K1, V1, K2, V2> extends MapRunner<K1, V1, K2, V2> { + @Override + public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output, Reporter reporter) throws IOException { + Mapper<K1, V1, K2, V2> mapper = getMapper(); + if (mapper instanceof ExecMapper) { + ExecMapper execMapper = (ExecMapper) mapper; + execMapper.ensureOutputInitialize(output, reporter); + } + super.run(input, output, reporter); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 150382a..99b33a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -141,13 +141,7 @@ public class ExecMapper extends MapReduceBase implements Mapper { @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { - if (oc == null) { - oc = output; - rp = reporter; - OperatorUtils.setChildrenCollector(mo.getChildOperators(), output); - mo.setReporter(rp); - MapredContext.get().setReporter(reporter); - } + ensureOutputInitialize(output, reporter); // reset the execContext for each new row execContext.resetRow(); @@ -171,6 +165,16 @@ public class ExecMapper extends MapReduceBase implements Mapper { } } + public void ensureOutputInitialize(OutputCollector output, Reporter reporter) { + if (oc == null) { + oc = output; + rp = reporter; + OperatorUtils.setChildrenCollector(mo.getChildOperators(), output); + mo.setReporter(rp); + MapredContext.get().setReporter(reporter); + } + } + @Override public void close() { // No row was processed http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 98f4bc0..f3aa151 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -29,7 +29,6 @@ import java.util.Set; import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.tez.common.counters.TezCounters; http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 45d809a..e670409 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -35,8 +35,10 @@ import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.IConfigureJobConf; import org.apache.hadoop.hive.ql.exec.KeyWrapper; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.expressions.ConstantVectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -55,6 +57,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +72,7 @@ import com.google.common.base.Preconditions; * */ public class VectorGroupByOperator extends Operator<GroupByDesc> - implements VectorizationOperator, VectorizationContextRegion { + implements VectorizationOperator, VectorizationContextRegion, IConfigureJobConf { private static final Logger LOG = LoggerFactory.getLogger( VectorGroupByOperator.class.getName()); @@ -447,6 +450,20 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> if (!aborted) { flush(true); } + if (!aborted && sumBatchSize == 0 && GroupByOperator.shouldEmitSummaryRow(conf)) { + // in case the empty grouping set is preset; but no output has done + // the "summary row" still needs to be emitted + VectorHashKeyWrapper kw = keyWrappersBatch.getVectorHashKeyWrappers()[0]; + kw.setNull(); + int pos = conf.getGroupingSetPosition(); + if (pos >= 0) { + long val = (1 << pos) - 1; + keyWrappersBatch.setLongValue(kw, pos, val); + } + VectorAggregationBufferRow groupAggregators = allocateAggregationBuffer(); + writeSingleRow(kw, groupAggregators); + } + } /** @@ -777,7 +794,6 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> private boolean first; private boolean isLastGroupBatch; - private boolean hasOutput; /** * The group vector key helper. @@ -820,7 +836,6 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> @Override public void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet, boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException { - hasOutput = true; if (first) { // Copy the group key to output batch now. We'll copy in the aggregates at the end of the group. first = false; @@ -849,16 +864,6 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> if (!aborted && !first && !isLastGroupBatch) { writeGroupRow(groupAggregators, buffer); } - if (!hasOutput && GroupByOperator.shouldEmitSummaryRow(conf)) { - VectorHashKeyWrapper kw = keyWrappersBatch.getVectorHashKeyWrappers()[0]; - kw.setNull(); - int pos = conf.getGroupingSetPosition(); - if (pos >= 0) { - long val = (1 << pos) - 1; - keyWrappersBatch.setLongValue(kw, pos, val); - } - writeSingleRow(kw , groupAggregators); - } } } @@ -1216,4 +1221,13 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> public VectorDesc getVectorDesc() { return vectorDesc; } + + @Override + public void configureJobConf(JobConf job) { + // only needed when grouping sets are present + if (conf.getGroupingSetPosition() > 0 && GroupByOperator.shouldEmitSummaryRow(conf)) { + job.setBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, true); + } + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index e4dfc00..b698987 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -464,9 +464,8 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is, pathToPartitionInfo); result.add(csplit); } - LOG.info("number of splits " + result.size()); - return result.toArray(new CombineHiveInputSplit[result.size()]); + return result.toArray(new InputSplit[result.size()]); } /** @@ -578,6 +577,13 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ // clear work from ThreadLocal after splits generated in case of thread is reused in pool. Utilities.clearWorkMapForConf(job); + if (result.isEmpty() && paths.length > 0 && job.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) { + // If there are no inputs; the Execution engine skips the operator tree. + // To prevent it from happening; an opaque ZeroRows input is added here - when needed. + result.add( + new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit(paths[0]), ZeroRowsInputFormat.class.getName())); + } + LOG.info("Number of all splits " + result.size()); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return result.toArray(new InputSplit[result.size()]); http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index c3b846c..856b026 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -209,6 +209,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } } + @Override public void configure(JobConf job) { this.job = job; } @@ -367,6 +368,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> return instance; } + @Override public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { HiveInputSplit hsplit = (HiveInputSplit) split; @@ -500,6 +502,12 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> for (InputSplit is : iss) { result.add(new HiveInputSplit(is, inputFormatClass.getName())); } + if (iss.length == 0 && finalDirs.length > 0 && conf.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) { + // If there are no inputs; the Execution engine skips the operator tree. + // To prevent it from happening; an opaque ZeroRows input is added here - when needed. + result.add(new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit(finalDirs[0].toString()), + ZeroRowsInputFormat.class.getName())); + } } public static Path[] processPathsForMmRead(List<Path> dirs, JobConf conf, @@ -592,6 +600,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> return dirs; } + @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS); http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java index 6a372a3..e632d43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java @@ -57,7 +57,11 @@ public class NullRowsInputFormat implements InputFormat<NullWritable, NullWritab } public DummyInputSplit(String path) { - super(new Path(path, "null"), 0, 1, (String[])null); + this(new Path(path, "null")); + } + + public DummyInputSplit(Path path) { + super(path, 0, 1, (String[]) null); } } @@ -119,7 +123,9 @@ public class NullRowsInputFormat implements InputFormat<NullWritable, NullWritab @Override public boolean next(Object arg0, Object value) throws IOException { if (rbCtx != null) { - if (counter >= MAX_ROW) return false; + if (counter >= MAX_ROW) { + return false; + } makeNullVrb(value, MAX_ROW); counter = MAX_ROW; return true; @@ -163,7 +169,9 @@ public class NullRowsInputFormat implements InputFormat<NullWritable, NullWritab public InputSplit[] getSplits(JobConf conf, int arg1) throws IOException { // It's important to read the correct nulls! (in truth, the path is needed for SplitGrouper). String[] paths = conf.getTrimmedStrings(FileInputFormat.INPUT_DIR, (String[])null); - if (paths == null) throw new IOException("Cannot find path in conf"); + if (paths == null) { + throw new IOException("Cannot find path in conf"); + } InputSplit[] result = new InputSplit[paths.length]; for (int i = 0; i < paths.length; ++i) { result[i] = new DummyInputSplit(paths[i]); http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index f2b2fc5..fa7a8a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.plan; import org.apache.hadoop.hive.common.StringInternUtils; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import java.util.ArrayList; @@ -27,14 +26,12 @@ import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -42,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.IConfigureJobConf; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport.Support; @@ -54,9 +52,6 @@ import org.apache.hadoop.hive.ql.optimizer.physical.VectorizerReason; import org.apache.hadoop.hive.ql.parse.SplitSample; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.JobConf; import com.google.common.annotations.VisibleForTesting; @@ -184,7 +179,7 @@ public class MapWork extends BaseWork { public void addPathToAlias(Path path, ArrayList<String> aliases){ pathToAliases.put(path, aliases); } - + public void addPathToAlias(Path path, String newAlias){ ArrayList<String> aliases = pathToAliases.get(path); if (aliases == null) { @@ -194,11 +189,11 @@ public class MapWork extends BaseWork { aliases.add(newAlias.intern()); } - + public void removePathToAlias(Path path){ pathToAliases.remove(path); } - + /** * This is used to display and verify output of "Path -> Alias" in test framework. * @@ -244,7 +239,7 @@ public class MapWork extends BaseWork { public void removePathToPartitionInfo(Path path) { pathToPartitionInfo.remove(path); } - + /** * Derive additional attributes to be rendered by EXPLAIN. * TODO: this method is relied upon by custom input formats to set jobconf properties. @@ -303,15 +298,28 @@ public class MapWork extends BaseWork { private static String deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny, boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid, boolean hasCacheOnly) { - if (!isLlapOn) return null; // LLAP IO is off, don't output. - if (!canWrapAny && !hasCacheOnly) return "no inputs"; // Cannot use with input formats. - if (!hasPathToPartInfo) return "unknown"; // No information to judge. - int varieties = (hasAcid ? 1 : 0) + (hasLlap ? 1 : 0) - + (hasCacheOnly ? 1 : 0) + (hasNonLlap ? 1 : 0); - if (varieties > 1) return "some inputs"; // Will probably never actually happen. - if (hasAcid) return "may be used (ACID table)"; - if (hasLlap) return "all inputs"; - if (hasCacheOnly) return "all inputs (cache only)"; + if (!isLlapOn) { + return null; // LLAP IO is off, don't output. + } + if (!canWrapAny && !hasCacheOnly) { + return "no inputs"; // Cannot use with input formats. + } + if (!hasPathToPartInfo) { + return "unknown"; // No information to judge. + } + int varieties = (hasAcid ? 1 : 0) + (hasLlap ? 1 : 0) + (hasCacheOnly ? 1 : 0) + (hasNonLlap ? 1 : 0); + if (varieties > 1) { + return "some inputs"; // Will probably never actually happen. + } + if (hasAcid) { + return "may be used (ACID table)"; + } + if (hasLlap) { + return "all inputs"; + } + if (hasCacheOnly) { + return "all inputs (cache only)"; + } return "no inputs"; } @@ -641,6 +649,9 @@ public class MapWork extends BaseWork { for (FileSinkOperator fs : OperatorUtils.findOperators(mappers, FileSinkOperator.class)) { PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job); } + for (IConfigureJobConf icjc : OperatorUtils.findOperators(mappers, IConfigureJobConf.class)) { + icjc.configureJobConf(job); + } } public void setDummyTableScan(boolean dummyTableScan) { http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index ecfb118..ff5acbb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.plan; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -29,18 +28,12 @@ import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; -import org.apache.hadoop.hive.ql.optimizer.physical.VectorizerReason; -import org.apache.hadoop.hive.ql.plan.BaseWork.BaseExplainVectorization; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; -import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.mapred.JobConf; -import org.apache.hive.common.util.ReflectionUtil; /** * ReduceWork represents all the information used to run a reduce task on the cluster. http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/test/queries/clientpositive/groupby_rollup_empty.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/groupby_rollup_empty.q b/ql/src/test/queries/clientpositive/groupby_rollup_empty.q index 432d8c4..b64eef9 100644 --- a/ql/src/test/queries/clientpositive/groupby_rollup_empty.q +++ b/ql/src/test/queries/clientpositive/groupby_rollup_empty.q @@ -22,6 +22,10 @@ from tx1 where a<0 group by rollup (b); +select '2 rows expected',sum(c) from tx1 group by rollup (a) +union all +select '2 rows expected',sum(c) from tx1 group by rollup (a); + -- non-empty table insert into tx1 values (1,1,1); @@ -64,6 +68,9 @@ from tx2 where a<0 group by a,b,d grouping sets ((), b, a, d); +select '2 rows expected',sum(c) from tx2 group by rollup (a) +union all +select '2 rows expected',sum(c) from tx2 group by rollup (a); insert into tx2 values (1,2,3,1.1,'x','b'), http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/test/results/clientpositive/groupby_rollup_empty.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/groupby_rollup_empty.q.out b/ql/src/test/results/clientpositive/groupby_rollup_empty.q.out index 7359140..2756d38 100644 --- a/ql/src/test/results/clientpositive/groupby_rollup_empty.q.out +++ b/ql/src/test/results/clientpositive/groupby_rollup_empty.q.out @@ -63,6 +63,20 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@tx1 #### A masked pattern was here #### NULL 1 NULL,1 +PREHOOK: query: select '2 rows expected',sum(c) from tx1 group by rollup (a) +union all +select '2 rows expected',sum(c) from tx1 group by rollup (a) +PREHOOK: type: QUERY +PREHOOK: Input: default@tx1 +#### A masked pattern was here #### +POSTHOOK: query: select '2 rows expected',sum(c) from tx1 group by rollup (a) +union all +select '2 rows expected',sum(c) from tx1 group by rollup (a) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tx1 +#### A masked pattern was here #### +2 rows expected NULL +2 rows expected NULL PREHOOK: query: insert into tx1 values (1,1,1) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table @@ -225,6 +239,20 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@tx2 #### A masked pattern was here #### NULL NULL asd 1 NULL,1 +PREHOOK: query: select '2 rows expected',sum(c) from tx2 group by rollup (a) +union all +select '2 rows expected',sum(c) from tx2 group by rollup (a) +PREHOOK: type: QUERY +PREHOOK: Input: default@tx2 +#### A masked pattern was here #### +POSTHOOK: query: select '2 rows expected',sum(c) from tx2 group by rollup (a) +union all +select '2 rows expected',sum(c) from tx2 group by rollup (a) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tx2 +#### A masked pattern was here #### +2 rows expected NULL +2 rows expected NULL PREHOOK: query: insert into tx2 values (1,2,3,1.1,'x','b'), (3,2,3,1.1,'y','b') http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/test/results/clientpositive/llap/groupby_rollup_empty.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/groupby_rollup_empty.q.out b/ql/src/test/results/clientpositive/llap/groupby_rollup_empty.q.out index d2b5745..f2cda04 100644 --- a/ql/src/test/results/clientpositive/llap/groupby_rollup_empty.q.out +++ b/ql/src/test/results/clientpositive/llap/groupby_rollup_empty.q.out @@ -63,6 +63,20 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@tx1 #### A masked pattern was here #### NULL 1 NULL,1 +PREHOOK: query: select '2 rows expected',sum(c) from tx1 group by rollup (a) +union all +select '2 rows expected',sum(c) from tx1 group by rollup (a) +PREHOOK: type: QUERY +PREHOOK: Input: default@tx1 +#### A masked pattern was here #### +POSTHOOK: query: select '2 rows expected',sum(c) from tx1 group by rollup (a) +union all +select '2 rows expected',sum(c) from tx1 group by rollup (a) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tx1 +#### A masked pattern was here #### +2 rows expected NULL +2 rows expected NULL PREHOOK: query: insert into tx1 values (1,1,1) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table @@ -234,6 +248,20 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@tx2 #### A masked pattern was here #### NULL NULL asd 1 NULL,1 +PREHOOK: query: select '2 rows expected',sum(c) from tx2 group by rollup (a) +union all +select '2 rows expected',sum(c) from tx2 group by rollup (a) +PREHOOK: type: QUERY +PREHOOK: Input: default@tx2 +#### A masked pattern was here #### +POSTHOOK: query: select '2 rows expected',sum(c) from tx2 group by rollup (a) +union all +select '2 rows expected',sum(c) from tx2 group by rollup (a) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tx2 +#### A masked pattern was here #### +2 rows expected NULL +2 rows expected NULL PREHOOK: query: insert into tx2 values (1,2,3,1.1,'x','b'), (3,2,3,1.1,'y','b')