Repository: hive
Updated Branches:
  refs/heads/master 2422e1808 -> f7dea1060


HIVE-18523: Fix summary row in case there are no inputs (Zoltan Haindrich 
reviewed by Ashutosh Chauhan)

Signed-off-by: Zoltan Haindrich <k...@rxd.hu>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f942e72a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f942e72a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f942e72a

Branch: refs/heads/master
Commit: f942e72abf450f1f00f27261532a54f5f81d5170
Parents: 2422e18
Author: Zoltan Haindrich <k...@rxd.hu>
Authored: Wed Feb 7 09:35:11 2018 +0100
Committer: Zoltan Haindrich <k...@rxd.hu>
Committed: Wed Feb 7 09:35:11 2018 +0100

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/GroupByOperator.java    | 25 +++++-----
 .../hadoop/hive/ql/exec/IConfigureJobConf.java  | 30 ++++++++++++
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  4 +-
 .../hadoop/hive/ql/exec/mr/ExecDriver.java      |  1 +
 .../hadoop/hive/ql/exec/mr/ExecMapRunner.java   | 40 ++++++++++++++++
 .../hadoop/hive/ql/exec/mr/ExecMapper.java      | 18 ++++---
 .../hive/ql/exec/tez/HiveSplitGenerator.java    |  1 -
 .../ql/exec/vector/VectorGroupByOperator.java   | 40 ++++++++++------
 .../hive/ql/io/CombineHiveInputFormat.java      | 10 +++-
 .../hadoop/hive/ql/io/HiveInputFormat.java      |  9 ++++
 .../hadoop/hive/ql/io/NullRowsInputFormat.java  | 14 ++++--
 .../org/apache/hadoop/hive/ql/plan/MapWork.java | 49 ++++++++++++--------
 .../apache/hadoop/hive/ql/plan/ReduceWork.java  |  7 ---
 .../clientpositive/groupby_rollup_empty.q       |  7 +++
 .../clientpositive/groupby_rollup_empty.q.out   | 28 +++++++++++
 .../llap/groupby_rollup_empty.q.out             | 28 +++++++++++
 16 files changed, 245 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 6a0f0de..6de979e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -36,13 +36,13 @@ import org.apache.hadoop.hive.common.type.TimestampTZ;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.LlapDaemonInfo;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.exec.tez.TezContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc.Mode;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
@@ -67,13 +67,14 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
 
 import javolution.util.FastBitSet;
 
 /**
  * GroupBy operator implementation.
  */
-public class GroupByOperator extends Operator<GroupByDesc> {
+public class GroupByOperator extends Operator<GroupByDesc> implements 
IConfigureJobConf {
 
   private static final long serialVersionUID = 1L;
   private static final int NUMROWSESTIMATESIZE = 1000;
@@ -1164,14 +1165,15 @@ public class GroupByOperator extends 
Operator<GroupByDesc> {
   }
 
   public static boolean shouldEmitSummaryRow(GroupByDesc desc) {
-    // exactly one reducer should emit the summary row
-    if (!firstReducer()) {
-      return false;
-    }
     // empty keyset is basically ()
     if (desc.getKeys().size() == 0) {
       return true;
     }
+
+    if (desc.getMode() != Mode.HASH && desc.getMode() != Mode.COMPLETE && 
desc.getMode() != Mode.PARTIAL1) {
+      return false;
+    }
+
     int groupingSetPosition = desc.getGroupingSetPosition();
     List<Integer> listGroupingSets = desc.getListGroupingSets();
     // groupingSets are known at map/reducer side; but have to do real 
processing
@@ -1185,13 +1187,12 @@ public class GroupByOperator extends 
Operator<GroupByDesc> {
     return false;
   }
 
-  public static boolean firstReducer() {
-    MapredContext ctx = TezContext.get();
-    if (ctx != null && ctx instanceof TezContext) {
-      TezContext tezContext = (TezContext) ctx;
-      return tezContext.getTezProcessorContext().getTaskIndex() == 0;
+  @Override
+  public void configureJobConf(JobConf job) {
+    // only needed when grouping sets are present
+    if (conf.getGroupingSetPosition() > 0 && shouldEmitSummaryRow(conf)) {
+      job.setBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, true);
     }
-    return true;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/IConfigureJobConf.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/IConfigureJobConf.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/IConfigureJobConf.java
new file mode 100644
index 0000000..dd6da67
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/IConfigureJobConf.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Enables an operator to be able to make changes to the {@link JobConf}.
+ *
+ * Invoked during compilation phase only.
+ */
+public interface IConfigureJobConf {
+  void configureJobConf(JobConf job);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 941dd58..675ca12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -52,7 +52,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -244,6 +243,7 @@ public final class Utilities {
   public static final String USE_VECTORIZED_INPUT_FILE_FORMAT = 
"USE_VECTORIZED_INPUT_FILE_FORMAT";
   public static final String MAPNAME = "Map ";
   public static final String REDUCENAME = "Reducer ";
+  public static final String ENSURE_OPERATORS_EXECUTED = 
"ENSURE_OPERATORS_EXECUTED";
 
   @Deprecated
   protected static final String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = 
"mapred.dfsclient.parallelism.max";
@@ -361,7 +361,7 @@ public final class Utilities {
   }
 
   public static BaseWork getMergeWork(Configuration jconf) {
-    String currentMergePrefix = 
jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX); 
+    String currentMergePrefix = 
jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX);
     if (StringUtils.isEmpty(currentMergePrefix)) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 976b537..b436e80 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -256,6 +256,7 @@ public class ExecDriver extends Task<MapredWork> implements 
Serializable, Hadoop
     //See the javadoc on HiveOutputFormatImpl and 
HadoopShims.prepareJobOutput()
     job.setOutputFormat(HiveOutputFormatImpl.class);
 
+    job.setMapRunnerClass(ExecMapRunner.class);
     job.setMapperClass(ExecMapper.class);
 
     job.setMapOutputKeyClass(HiveKey.class);

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapRunner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapRunner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapRunner.java
new file mode 100644
index 0000000..d8f8c2a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapRunner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.mr;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.MapRunner;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public class ExecMapRunner<K1, V1, K2, V2> extends MapRunner<K1, V1, K2, V2> {
+  @Override
+  public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output, 
Reporter reporter) throws IOException {
+    Mapper<K1, V1, K2, V2> mapper = getMapper();
+    if (mapper instanceof ExecMapper) {
+      ExecMapper execMapper = (ExecMapper) mapper;
+      execMapper.ensureOutputInitialize(output, reporter);
+    }
+    super.run(input, output, reporter);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
index 150382a..99b33a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
@@ -141,13 +141,7 @@ public class ExecMapper extends MapReduceBase implements 
Mapper {
   @Override
   public void map(Object key, Object value, OutputCollector output,
       Reporter reporter) throws IOException {
-    if (oc == null) {
-      oc = output;
-      rp = reporter;
-      OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
-      mo.setReporter(rp);
-      MapredContext.get().setReporter(reporter);
-    }
+    ensureOutputInitialize(output, reporter);
     // reset the execContext for each new row
     execContext.resetRow();
 
@@ -171,6 +165,16 @@ public class ExecMapper extends MapReduceBase implements 
Mapper {
     }
   }
 
+  public void ensureOutputInitialize(OutputCollector output, Reporter 
reporter) {
+    if (oc == null) {
+      oc = output;
+      rp = reporter;
+      OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
+      mo.setReporter(rp);
+      MapredContext.get().setReporter(reporter);
+    }
+  }
+
   @Override
   public void close() {
     // No row was processed

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 98f4bc0..f3aa151 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -29,7 +29,6 @@ import java.util.Set;
 
 import com.google.common.base.Preconditions;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.tez.common.counters.TezCounters;

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 45d809a..e670409 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -35,8 +35,10 @@ import 
org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.IConfigureJobConf;
 import org.apache.hadoop.hive.ql.exec.KeyWrapper;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.ConstantVectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -55,6 +57,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,7 +72,7 @@ import com.google.common.base.Preconditions;
  *
  */
 public class VectorGroupByOperator extends Operator<GroupByDesc>
-    implements VectorizationOperator, VectorizationContextRegion {
+    implements VectorizationOperator, VectorizationContextRegion, 
IConfigureJobConf {
 
   private static final Logger LOG = LoggerFactory.getLogger(
       VectorGroupByOperator.class.getName());
@@ -447,6 +450,20 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
       if (!aborted) {
         flush(true);
       }
+      if (!aborted && sumBatchSize == 0 && 
GroupByOperator.shouldEmitSummaryRow(conf)) {
+        // in case the empty grouping set is preset; but no output has done
+        // the "summary row" still needs to be emitted
+        VectorHashKeyWrapper kw = 
keyWrappersBatch.getVectorHashKeyWrappers()[0];
+        kw.setNull();
+        int pos = conf.getGroupingSetPosition();
+        if (pos >= 0) {
+          long val = (1 << pos) - 1;
+          keyWrappersBatch.setLongValue(kw, pos, val);
+        }
+        VectorAggregationBufferRow groupAggregators = 
allocateAggregationBuffer();
+        writeSingleRow(kw, groupAggregators);
+      }
+
     }
 
     /**
@@ -777,7 +794,6 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
 
     private boolean first;
     private boolean isLastGroupBatch;
-    private boolean hasOutput;
 
     /**
      * The group vector key helper.
@@ -820,7 +836,6 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
     @Override
     public void doProcessBatch(VectorizedRowBatch batch, boolean 
isFirstGroupingSet,
         boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException {
-      hasOutput = true;
       if (first) {
         // Copy the group key to output batch now.  We'll copy in the 
aggregates at the end of the group.
         first = false;
@@ -849,16 +864,6 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
       if (!aborted && !first && !isLastGroupBatch) {
         writeGroupRow(groupAggregators, buffer);
       }
-      if (!hasOutput && GroupByOperator.shouldEmitSummaryRow(conf)) {
-        VectorHashKeyWrapper kw = 
keyWrappersBatch.getVectorHashKeyWrappers()[0];
-        kw.setNull();
-        int pos = conf.getGroupingSetPosition();
-        if (pos >= 0) {
-          long val = (1 << pos) - 1;
-          keyWrappersBatch.setLongValue(kw, pos, val);
-        }
-        writeSingleRow(kw , groupAggregators);
-      }
     }
   }
 
@@ -1216,4 +1221,13 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
   public VectorDesc getVectorDesc() {
     return vectorDesc;
   }
+
+  @Override
+  public void configureJobConf(JobConf job) {
+    // only needed when grouping sets are present
+    if (conf.getGroupingSetPosition() > 0 && 
GroupByOperator.shouldEmitSummaryRow(conf)) {
+      job.setBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, true);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index e4dfc00..b698987 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -464,9 +464,8 @@ public class CombineHiveInputFormat<K extends 
WritableComparable, V extends Writ
       CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is, 
pathToPartitionInfo);
       result.add(csplit);
     }
-
     LOG.info("number of splits " + result.size());
-    return result.toArray(new CombineHiveInputSplit[result.size()]);
+    return result.toArray(new InputSplit[result.size()]);
   }
 
   /**
@@ -578,6 +577,13 @@ public class CombineHiveInputFormat<K extends 
WritableComparable, V extends Writ
     // clear work from ThreadLocal after splits generated in case of thread is 
reused in pool.
     Utilities.clearWorkMapForConf(job);
 
+    if (result.isEmpty() && paths.length > 0 && 
job.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) {
+      // If there are no inputs; the Execution engine skips the operator tree.
+      // To prevent it from happening; an opaque  ZeroRows input is added here 
- when needed.
+      result.add(
+          new HiveInputSplit(new 
NullRowsInputFormat.DummyInputSplit(paths[0]), 
ZeroRowsInputFormat.class.getName()));
+    }
+
     LOG.info("Number of all splits " + result.size());
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
     return result.toArray(new InputSplit[result.size()]);

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index c3b846c..856b026 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -209,6 +209,7 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
     }
   }
 
+  @Override
   public void configure(JobConf job) {
     this.job = job;
   }
@@ -367,6 +368,7 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
     return instance;
   }
 
+  @Override
   public RecordReader getRecordReader(InputSplit split, JobConf job,
       Reporter reporter) throws IOException {
     HiveInputSplit hsplit = (HiveInputSplit) split;
@@ -500,6 +502,12 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
     for (InputSplit is : iss) {
       result.add(new HiveInputSplit(is, inputFormatClass.getName()));
     }
+    if (iss.length == 0 && finalDirs.length > 0 && 
conf.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) {
+      // If there are no inputs; the Execution engine skips the operator tree.
+      // To prevent it from happening; an opaque  ZeroRows input is added here 
- when needed.
+      result.add(new HiveInputSplit(new 
NullRowsInputFormat.DummyInputSplit(finalDirs[0].toString()),
+          ZeroRowsInputFormat.class.getName()));
+    }
   }
 
   public static Path[] processPathsForMmRead(List<Path> dirs, JobConf conf,
@@ -592,6 +600,7 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
     return dirs;
   }
 
+  @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java
index 6a372a3..e632d43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java
@@ -57,7 +57,11 @@ public class NullRowsInputFormat implements 
InputFormat<NullWritable, NullWritab
     }
 
     public DummyInputSplit(String path) {
-      super(new Path(path, "null"), 0, 1, (String[])null);
+      this(new Path(path, "null"));
+    }
+
+    public DummyInputSplit(Path path) {
+      super(path, 0, 1, (String[]) null);
     }
   }
 
@@ -119,7 +123,9 @@ public class NullRowsInputFormat implements 
InputFormat<NullWritable, NullWritab
     @Override
     public boolean next(Object arg0, Object value) throws IOException {
       if (rbCtx != null) {
-        if (counter >= MAX_ROW) return false;
+        if (counter >= MAX_ROW) {
+          return false;
+        }
         makeNullVrb(value, MAX_ROW);
         counter = MAX_ROW;
         return true;
@@ -163,7 +169,9 @@ public class NullRowsInputFormat implements 
InputFormat<NullWritable, NullWritab
   public InputSplit[] getSplits(JobConf conf, int arg1) throws IOException {
     // It's important to read the correct nulls! (in truth, the path is needed 
for SplitGrouper).
     String[] paths = conf.getTrimmedStrings(FileInputFormat.INPUT_DIR, 
(String[])null);
-    if (paths == null) throw new IOException("Cannot find path in conf");
+    if (paths == null) {
+      throw new IOException("Cannot find path in conf");
+    }
     InputSplit[] result = new InputSplit[paths.length];
     for (int i = 0; i < paths.length; ++i) {
       result[i] = new DummyInputSplit(paths[i]);

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index f2b2fc5..fa7a8a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import org.apache.hadoop.hive.common.StringInternUtils;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 
 import java.util.ArrayList;
@@ -27,14 +26,12 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Properties;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.IConfigureJobConf;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport.Support;
@@ -54,9 +52,6 @@ import 
org.apache.hadoop.hive.ql.optimizer.physical.VectorizerReason;
 import org.apache.hadoop.hive.ql.parse.SplitSample;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.JobConf;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -184,7 +179,7 @@ public class MapWork extends BaseWork {
   public void addPathToAlias(Path path, ArrayList<String> aliases){
     pathToAliases.put(path, aliases);
   }
-  
+
   public void addPathToAlias(Path path, String newAlias){
     ArrayList<String> aliases = pathToAliases.get(path);
     if (aliases == null) {
@@ -194,11 +189,11 @@ public class MapWork extends BaseWork {
     aliases.add(newAlias.intern());
   }
 
-  
+
   public void removePathToAlias(Path path){
     pathToAliases.remove(path);
   }
-  
+
   /**
    * This is used to display and verify output of "Path -> Alias" in test 
framework.
    *
@@ -244,7 +239,7 @@ public class MapWork extends BaseWork {
   public void removePathToPartitionInfo(Path path) {
     pathToPartitionInfo.remove(path);
   }
-  
+
   /**
    * Derive additional attributes to be rendered by EXPLAIN.
    * TODO: this method is relied upon by custom input formats to set jobconf 
properties.
@@ -303,15 +298,28 @@ public class MapWork extends BaseWork {
   private static String deriveLlapIoDescString(boolean isLlapOn, boolean 
canWrapAny,
       boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean 
hasAcid,
       boolean hasCacheOnly) {
-    if (!isLlapOn) return null; // LLAP IO is off, don't output.
-    if (!canWrapAny && !hasCacheOnly) return "no inputs"; // Cannot use with 
input formats.
-    if (!hasPathToPartInfo) return "unknown"; // No information to judge.
-    int varieties = (hasAcid ? 1 : 0) + (hasLlap ? 1 : 0)
-        + (hasCacheOnly ? 1 : 0) + (hasNonLlap ? 1 : 0);
-    if (varieties > 1) return "some inputs"; // Will probably never actually 
happen.
-    if (hasAcid) return "may be used (ACID table)";
-    if (hasLlap) return "all inputs";
-    if (hasCacheOnly) return "all inputs (cache only)";
+    if (!isLlapOn) {
+      return null; // LLAP IO is off, don't output.
+    }
+    if (!canWrapAny && !hasCacheOnly) {
+      return "no inputs"; // Cannot use with input formats.
+    }
+    if (!hasPathToPartInfo) {
+      return "unknown"; // No information to judge.
+    }
+    int varieties = (hasAcid ? 1 : 0) + (hasLlap ? 1 : 0) + (hasCacheOnly ? 1 
: 0) + (hasNonLlap ? 1 : 0);
+    if (varieties > 1) {
+      return "some inputs"; // Will probably never actually happen.
+    }
+    if (hasAcid) {
+      return "may be used (ACID table)";
+    }
+    if (hasLlap) {
+      return "all inputs";
+    }
+    if (hasCacheOnly) {
+      return "all inputs (cache only)";
+    }
     return "no inputs";
   }
 
@@ -641,6 +649,9 @@ public class MapWork extends BaseWork {
     for (FileSinkOperator fs : OperatorUtils.findOperators(mappers, 
FileSinkOperator.class)) {
       PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job);
     }
+    for (IConfigureJobConf icjc : OperatorUtils.findOperators(mappers, 
IConfigureJobConf.class)) {
+      icjc.configureJobConf(job);
+    }
   }
 
   public void setDummyTableScan(boolean dummyTableScan) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
index ecfb118..ff5acbb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.plan;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -29,18 +28,12 @@ import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
-import org.apache.hadoop.hive.ql.optimizer.physical.VectorizerReason;
-import org.apache.hadoop.hive.ql.plan.BaseWork.BaseExplainVectorization;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hive.common.util.ReflectionUtil;
 
 /**
  * ReduceWork represents all the information used to run a reduce task on the 
cluster.

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/test/queries/clientpositive/groupby_rollup_empty.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/groupby_rollup_empty.q 
b/ql/src/test/queries/clientpositive/groupby_rollup_empty.q
index 432d8c4..b64eef9 100644
--- a/ql/src/test/queries/clientpositive/groupby_rollup_empty.q
+++ b/ql/src/test/queries/clientpositive/groupby_rollup_empty.q
@@ -22,6 +22,10 @@ from    tx1
 where  a<0
 group by rollup (b);
 
+select '2 rows expected',sum(c) from tx1 group by rollup (a)
+union all
+select '2 rows expected',sum(c) from tx1 group by rollup (a);
+
 -- non-empty table
 
 insert into tx1 values (1,1,1);
@@ -64,6 +68,9 @@ from    tx2
 where  a<0
 group by a,b,d grouping sets ((), b, a, d);
 
+select '2 rows expected',sum(c) from tx2 group by rollup (a)
+union all
+select '2 rows expected',sum(c) from tx2 group by rollup (a);
 
 insert into tx2 values
 (1,2,3,1.1,'x','b'),

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/test/results/clientpositive/groupby_rollup_empty.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/groupby_rollup_empty.q.out 
b/ql/src/test/results/clientpositive/groupby_rollup_empty.q.out
index 7359140..2756d38 100644
--- a/ql/src/test/results/clientpositive/groupby_rollup_empty.q.out
+++ b/ql/src/test/results/clientpositive/groupby_rollup_empty.q.out
@@ -63,6 +63,20 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tx1
 #### A masked pattern was here ####
 NULL   1       NULL,1
+PREHOOK: query: select '2 rows expected',sum(c) from tx1 group by rollup (a)
+union all
+select '2 rows expected',sum(c) from tx1 group by rollup (a)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tx1
+#### A masked pattern was here ####
+POSTHOOK: query: select '2 rows expected',sum(c) from tx1 group by rollup (a)
+union all
+select '2 rows expected',sum(c) from tx1 group by rollup (a)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tx1
+#### A masked pattern was here ####
+2 rows expected        NULL
+2 rows expected        NULL
 PREHOOK: query: insert into tx1 values (1,1,1)
 PREHOOK: type: QUERY
 PREHOOK: Input: _dummy_database@_dummy_table
@@ -225,6 +239,20 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tx2
 #### A masked pattern was here ####
 NULL   NULL    asd     1       NULL,1
+PREHOOK: query: select '2 rows expected',sum(c) from tx2 group by rollup (a)
+union all
+select '2 rows expected',sum(c) from tx2 group by rollup (a)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tx2
+#### A masked pattern was here ####
+POSTHOOK: query: select '2 rows expected',sum(c) from tx2 group by rollup (a)
+union all
+select '2 rows expected',sum(c) from tx2 group by rollup (a)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tx2
+#### A masked pattern was here ####
+2 rows expected        NULL
+2 rows expected        NULL
 PREHOOK: query: insert into tx2 values
 (1,2,3,1.1,'x','b'),
 (3,2,3,1.1,'y','b')

http://git-wip-us.apache.org/repos/asf/hive/blob/f942e72a/ql/src/test/results/clientpositive/llap/groupby_rollup_empty.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/groupby_rollup_empty.q.out 
b/ql/src/test/results/clientpositive/llap/groupby_rollup_empty.q.out
index d2b5745..f2cda04 100644
--- a/ql/src/test/results/clientpositive/llap/groupby_rollup_empty.q.out
+++ b/ql/src/test/results/clientpositive/llap/groupby_rollup_empty.q.out
@@ -63,6 +63,20 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tx1
 #### A masked pattern was here ####
 NULL   1       NULL,1
+PREHOOK: query: select '2 rows expected',sum(c) from tx1 group by rollup (a)
+union all
+select '2 rows expected',sum(c) from tx1 group by rollup (a)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tx1
+#### A masked pattern was here ####
+POSTHOOK: query: select '2 rows expected',sum(c) from tx1 group by rollup (a)
+union all
+select '2 rows expected',sum(c) from tx1 group by rollup (a)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tx1
+#### A masked pattern was here ####
+2 rows expected        NULL
+2 rows expected        NULL
 PREHOOK: query: insert into tx1 values (1,1,1)
 PREHOOK: type: QUERY
 PREHOOK: Input: _dummy_database@_dummy_table
@@ -234,6 +248,20 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tx2
 #### A masked pattern was here ####
 NULL   NULL    asd     1       NULL,1
+PREHOOK: query: select '2 rows expected',sum(c) from tx2 group by rollup (a)
+union all
+select '2 rows expected',sum(c) from tx2 group by rollup (a)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tx2
+#### A masked pattern was here ####
+POSTHOOK: query: select '2 rows expected',sum(c) from tx2 group by rollup (a)
+union all
+select '2 rows expected',sum(c) from tx2 group by rollup (a)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tx2
+#### A masked pattern was here ####
+2 rows expected        NULL
+2 rows expected        NULL
 PREHOOK: query: insert into tx2 values
 (1,2,3,1.1,'x','b'),
 (3,2,3,1.1,'y','b')

Reply via email to