Repository: hive
Updated Branches:
  refs/heads/master e0bf12d98 -> e05e0fa19


HIVE-16125 : Split work between reducers. (Slim Bouguerra via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <hashut...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e05e0fa1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e05e0fa1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e05e0fa1

Branch: refs/heads/master
Commit: e05e0fa19d7fd7c48617c4a770fa579b7f01f40e
Parents: e0bf12d
Author: Slim Bouguerra <slim.bougue...@gmail.com>
Authored: Thu Feb 8 20:46:00 2018 -0800
Committer: Ashutosh Chauhan <hashut...@apache.org>
Committed: Tue Feb 20 10:34:11 2018 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/Constants.java  |   3 +
 .../hadoop/hive/druid/io/DruidOutputFormat.java |  12 +-
 .../hadoop/hive/druid/io/DruidRecordWriter.java |  72 ++-
 .../hadoop/hive/druid/serde/DruidSerDe.java     |  26 +-
 .../test/resources/testconfiguration.properties |   3 +-
 ...tedDynPartitionTimeGranularityOptimizer.java | 237 ++++---
 .../druidmini_dynamic_partition.q               | 170 +++++
 .../druid/druidmini_dynamic_partition.q.out     | 625 +++++++++++++++++++
 8 files changed, 1038 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e05e0fa1/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java 
b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 51408b1..10aaee1 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -33,7 +33,10 @@ public class Constants {
   public static final String DRUID_DATA_SOURCE = "druid.datasource";
   public static final String DRUID_SEGMENT_GRANULARITY = 
"druid.segment.granularity";
   public static final String DRUID_QUERY_GRANULARITY = 
"druid.query.granularity";
+  public static final String DRUID_TARGET_SHARDS_PER_GRANULARITY =
+      "druid.segment.targetShardsPerGranularity";
   public static final String DRUID_TIMESTAMP_GRANULARITY_COL_NAME = 
"__time_granularity";
+  public static final String DRUID_SHARD_KEY_COL_NAME = 
"__druid_extra_partition_key";
   public static final String DRUID_QUERY_JSON = "druid.query.json";
   public static final String DRUID_QUERY_TYPE = "druid.query.type";
   public static final String DRUID_QUERY_FETCH = "druid.query.fetch";

http://git-wip-us.apache.org/repos/asf/hive/blob/e05e0fa1/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
index 8c25d62..b758efd 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -92,6 +92,10 @@ public class DruidOutputFormat<K, V> implements 
HiveOutputFormat<K, DruidWritabl
             tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) 
!= null ?
                     
tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) :
                     HiveConf.getVar(jc, 
HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY);
+    final int targetNumShardsPerGranularity = Integer.parseUnsignedInt(
+        
tableProperties.getProperty(Constants.DRUID_TARGET_SHARDS_PER_GRANULARITY, 
"0"));
+    final int maxPartitionSize = targetNumShardsPerGranularity > 0 ? -1 : 
HiveConf
+        .getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_PARTITION_SIZE);
     // If datasource is in the table properties, it is an INSERT/INSERT 
OVERWRITE as the datasource
     // name was already persisted. Otherwise, it is a CT/CTAS and we need to 
get the name from the
     // job properties that are set by configureOutputJobProperties in the 
DruidStorageHandler
@@ -191,8 +195,10 @@ public class DruidOutputFormat<K, V> implements 
HiveOutputFormat<K, DruidWritabl
     List<AggregatorFactory> aggregatorFactories = 
aggregatorFactoryBuilder.build();
     final InputRowParser inputRowParser = new MapInputRowParser(new 
TimeAndDimsParseSpec(
             new 
TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
-            new DimensionsSpec(dimensions,
-                    
Lists.newArrayList(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME), null
+            new DimensionsSpec(dimensions, Lists
+                .newArrayList(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
+                    Constants.DRUID_SHARD_KEY_COL_NAME
+                ), null
             )
     ));
 
@@ -209,8 +215,6 @@ public class DruidOutputFormat<K, V> implements 
HiveOutputFormat<K, DruidWritabl
 
     final String workingPath = jc.get(Constants.DRUID_JOB_WORKING_DIRECTORY);
     final String version = jc.get(Constants.DRUID_SEGMENT_VERSION);
-    Integer maxPartitionSize = HiveConf
-            .getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_PARTITION_SIZE);
     String basePersistDirectory = HiveConf
             .getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BASE_PERSIST_DIRECTORY);
     if (Strings.isNullOrEmpty(basePersistDirectory)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e05e0fa1/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
index cf4dad6..7d2bb91 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -24,6 +24,7 @@ import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.base.Throwables;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import io.druid.data.input.Committer;
 import io.druid.data.input.InputRow;
@@ -78,12 +79,14 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
 
   private SegmentIdentifier currentOpenSegment = null;
 
-  private final Integer maxPartitionSize;
+  private final int maxPartitionSize;
 
   private final FileSystem fileSystem;
 
   private final Supplier<Committer> committerSupplier;
 
+  private final Granularity segmentGranularity;
+
   public DruidRecordWriter(
           DataSchema dataSchema,
           RealtimeTuningConfig realtimeTuningConfig,
@@ -106,12 +109,13 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
                     dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER,
                     DruidStorageHandlerUtils.INDEX_IO, 
DruidStorageHandlerUtils.INDEX_MERGER_V9
             );
-    Preconditions.checkArgument(maxPartitionSize > 0, "maxPartitionSize need 
to be greater than 0");
     this.maxPartitionSize = maxPartitionSize;
     appenderator.startJob(); // maybe we need to move this out of the 
constructor
     this.segmentsDescriptorDir = Preconditions
             .checkNotNull(segmentsDescriptorsDir, "segmentsDescriptorsDir is 
null");
     this.fileSystem = Preconditions.checkNotNull(fileSystem, "file system is 
null");
+    this.segmentGranularity = this.dataSchema.getGranularitySpec()
+        .getSegmentGranularity();
     committerSupplier = Suppliers.ofInstance(Committers.nil());
   }
 
@@ -125,10 +129,6 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
    * @return segmentIdentifier with of the truncatedTime and maybe push the 
current open segment.
    */
   private SegmentIdentifier getSegmentIdentifierAndMaybePush(long 
truncatedTime) {
-
-    final Granularity segmentGranularity = dataSchema.getGranularitySpec()
-            .getSegmentGranularity();
-
     final Interval interval = new Interval(
             new DateTime(truncatedTime),
             segmentGranularity.increment(new DateTime(truncatedTime))
@@ -136,14 +136,13 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
 
     SegmentIdentifier retVal;
     if (currentOpenSegment == null) {
-      retVal = new SegmentIdentifier(
+      currentOpenSegment = new SegmentIdentifier(
               dataSchema.getDataSource(),
               interval,
               tuningConfig.getVersioningPolicy().getVersion(interval),
               new LinearShardSpec(0)
       );
-      currentOpenSegment = retVal;
-      return retVal;
+      return currentOpenSegment;
     } else if (currentOpenSegment.getInterval().equals(interval)) {
       retVal = currentOpenSegment;
       int rowCount = appenderator.getRowCount(retVal);
@@ -238,22 +237,51 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
   @Override
   public void write(Writable w) throws IOException {
     DruidWritable record = (DruidWritable) w;
-    final long timestamp = (long) 
record.getValue().get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN);
-    final long truncatedTime = (long) record.getValue()
-            .get(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME);
-
-    InputRow inputRow = new MapBasedInputRow(
-            timestamp,
-            dataSchema.getParser()
-                    .getParseSpec()
-                    .getDimensionsSpec()
-                    .getDimensionNames(),
-            record.getValue()
+    final long timestamp =
+        (long) 
record.getValue().get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN);
+    final long truncatedTime =
+        (long) 
record.getValue().get(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME);
+    final int partitionNumber = Math.toIntExact(
+        (long) 
record.getValue().getOrDefault(Constants.DRUID_SHARD_KEY_COL_NAME, -1l));
+    final InputRow inputRow = new MapBasedInputRow(timestamp,
+        
dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
+        record.getValue()
     );
 
     try {
-      appenderator
-              .add(getSegmentIdentifierAndMaybePush(truncatedTime), inputRow, 
committerSupplier);
+      if (partitionNumber != -1 && maxPartitionSize == -1) {
+        final Interval interval = new Interval(new DateTime(truncatedTime),
+            segmentGranularity.increment(new DateTime(truncatedTime))
+        );
+
+        if (currentOpenSegment != null) {
+          if (currentOpenSegment.getShardSpec().getPartitionNum() != 
partitionNumber
+              || !currentOpenSegment.getInterval().equals(interval)) {
+            pushSegments(ImmutableList.of(currentOpenSegment));
+            currentOpenSegment = new 
SegmentIdentifier(dataSchema.getDataSource(), interval,
+                tuningConfig.getVersioningPolicy().getVersion(interval),
+                new LinearShardSpec(partitionNumber)
+            );
+          }
+        } else if (currentOpenSegment == null) {
+          currentOpenSegment = new 
SegmentIdentifier(dataSchema.getDataSource(), interval,
+              tuningConfig.getVersioningPolicy().getVersion(interval),
+              new LinearShardSpec(partitionNumber)
+          );
+
+        }
+        appenderator.add(currentOpenSegment, inputRow, committerSupplier);
+
+      } else if (partitionNumber == -1 && maxPartitionSize != -1) {
+        appenderator
+            .add(getSegmentIdentifierAndMaybePush(truncatedTime), inputRow, 
committerSupplier);
+      } else {
+        throw new IllegalArgumentException(String.format(
+            "partitionNumber and  maxPartitionSize should be mutually 
exclusive got partitionNum [%s] and maxPartitionSize [%s]",
+            partitionNumber, maxPartitionSize
+        ));
+      }
+
     } catch (SegmentNotWritableException e) {
       throw new IOException(e);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/e05e0fa1/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index 3696b0f..914954d 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -85,6 +85,7 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
@@ -561,10 +562,31 @@ public class DruidSerDe extends AbstractSerDe {
       }
       value.put(columns[i], res);
     }
+    //Extract the partitions keys segments granularity and partition key if any
+    // First Segment Granularity has to be here.
+    final int granularityFieldIndex = columns.length;
+    assert values.size() > granularityFieldIndex;
+    
Preconditions.checkArgument(fields.get(granularityFieldIndex).getFieldName()
+        .equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME));
     value.put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
-            ((TimestampObjectInspector) 
fields.get(columns.length).getFieldObjectInspector())
-                    
.getPrimitiveJavaObject(values.get(columns.length)).getTime()
+            ((TimestampObjectInspector) 
fields.get(granularityFieldIndex).getFieldObjectInspector())
+                    
.getPrimitiveJavaObject(values.get(granularityFieldIndex)).getTime()
     );
+    if (values.size() == columns.length + 2) {
+      // Then partition number if any.
+      final int partitionNumPos = granularityFieldIndex + 1;
+      Preconditions.checkArgument(
+          
fields.get(partitionNumPos).getFieldName().equals(Constants.DRUID_SHARD_KEY_COL_NAME),
+          String.format("expecting to encounter %s but was %s", 
Constants.DRUID_SHARD_KEY_COL_NAME,
+              fields.get(partitionNumPos).getFieldName()
+          )
+      );
+      value.put(Constants.DRUID_SHARD_KEY_COL_NAME,
+          ((LongObjectInspector) 
fields.get(partitionNumPos).getFieldObjectInspector())
+              .get(values.get(partitionNumPos))
+      );
+    }
+
     return new DruidWritable(value);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e05e0fa1/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index d4f2e53..4a52eb5 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1656,5 +1656,6 @@ spark.perf.disabled.query.files=query14.q,\
 druid.query.files=druidmini_test1.q,\
   druidmini_test_insert.q,\
   druidmini_mv.q,\
-  druid_timestamptz.q
+  druid_timestamptz.q, \
+  druidmini_dynamic_partition.q
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e05e0fa1/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
index e3dee93..0e995d7 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.optimizer;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -46,6 +44,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
@@ -61,9 +60,13 @@ import org.apache.hadoop.hive.ql.udf.UDFDateFloorMonth;
 import org.apache.hadoop.hive.ql.udf.UDFDateFloorSecond;
 import org.apache.hadoop.hive.ql.udf.UDFDateFloorWeek;
 import org.apache.hadoop.hive.ql.udf.UDFDateFloorYear;
+import org.apache.hadoop.hive.ql.udf.UDFRand;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFEpochMilli;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFTimestamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPDivide;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPMod;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
@@ -72,12 +75,17 @@ import org.apache.parquet.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Stack;
+import java.util.stream.Collectors;
 
 /**
  * Introduces a RS before FS to partition data by configuration specified
@@ -113,6 +121,9 @@ public class SortedDynPartitionTimeGranularityOptimizer 
extends Transform {
 
     private final Logger LOG = 
LoggerFactory.getLogger(SortedDynPartitionTimeGranularityOptimizer.class);
     protected ParseContext parseCtx;
+    private int targetShardsPerGranularity = 0;
+    private int granularityKeyPos = -1;
+    private int partitionKeyPos = -1;
 
     public SortedDynamicPartitionProc(ParseContext pCtx) {
       this.parseCtx = pCtx;
@@ -130,66 +141,92 @@ public class SortedDynPartitionTimeGranularityOptimizer 
extends Transform {
         // Bail out, nothing to do
         return null;
       }
-      String segmentGranularity = null;
+      String segmentGranularity;
+      final String targetShardsProperty;
       final Table table = fsOp.getConf().getTable();
       if (table != null) {
         // case the statement is an INSERT
         segmentGranularity = 
table.getParameters().get(Constants.DRUID_SEGMENT_GRANULARITY);
+        targetShardsProperty =
+            
table.getParameters().getOrDefault(Constants.DRUID_TARGET_SHARDS_PER_GRANULARITY,
 "0");
+
       } else if (parseCtx.getCreateViewDesc() != null) {
         // case the statement is a CREATE MATERIALIZED VIEW AS
         segmentGranularity = parseCtx.getCreateViewDesc().getTblProps()
                 .get(Constants.DRUID_SEGMENT_GRANULARITY);
+        targetShardsProperty = parseCtx.getCreateViewDesc().getTblProps()
+            .getOrDefault(Constants.DRUID_TARGET_SHARDS_PER_GRANULARITY, "0");
       } else if (parseCtx.getCreateTable() != null) {
         // case the statement is a CREATE TABLE AS
         segmentGranularity = parseCtx.getCreateTable().getTblProps()
                 .get(Constants.DRUID_SEGMENT_GRANULARITY);
+        targetShardsProperty = parseCtx.getCreateTable().getTblProps()
+            .getOrDefault(Constants.DRUID_TARGET_SHARDS_PER_GRANULARITY, "0");
       } else {
         throw new SemanticException("Druid storage handler used but not an 
INSERT, "
                 + "CMVAS or CTAS statement");
       }
-      segmentGranularity = !Strings.isNullOrEmpty(segmentGranularity)
-              ? segmentGranularity
-              : HiveConf.getVar(parseCtx.getConf(),
-                      HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY
-              );
+      segmentGranularity = Strings.isNullOrEmpty(segmentGranularity) ? HiveConf
+          .getVar(parseCtx.getConf(),
+              HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY
+          ) : segmentGranularity;
+      targetShardsPerGranularity = Integer.parseInt(targetShardsProperty);
+
       LOG.info("Sorted dynamic partitioning on time granularity optimization 
kicked in...");
 
       // unlink connection between FS and its parent
-      Operator<? extends OperatorDesc> fsParent = 
fsOp.getParentOperators().get(0);
-      fsParent = fsOp.getParentOperators().get(0);
+      final Operator<? extends OperatorDesc> fsParent = 
fsOp.getParentOperators().get(0);
       fsParent.getChildOperators().clear();
 
+      if (targetShardsPerGranularity > 0) {
+        partitionKeyPos = fsParent.getSchema().getSignature().size() + 1;
+      }
+      granularityKeyPos = fsParent.getSchema().getSignature().size();
       // Create SelectOp with granularity column
-      Operator<? extends OperatorDesc> granularitySelOp = 
getGranularitySelOp(fsParent, segmentGranularity);
+      final Operator<? extends OperatorDesc> granularitySelOp = 
getGranularitySelOp(fsParent,
+              segmentGranularity
+      );
 
       // Create ReduceSinkOp operator
-      ArrayList<ColumnInfo> parentCols = 
Lists.newArrayList(granularitySelOp.getSchema().getSignature());
-      ArrayList<ExprNodeDesc> allRSCols = Lists.newArrayList();
+      final ArrayList<ColumnInfo> parentCols =
+          Lists.newArrayList(granularitySelOp.getSchema().getSignature());
+      final ArrayList<ExprNodeDesc> allRSCols = Lists.newArrayList();
       for (ColumnInfo ci : parentCols) {
         allRSCols.add(new ExprNodeColumnDesc(ci));
       }
       // Get the key positions
-      List<Integer> keyPositions = new ArrayList<>();
-      keyPositions.add(allRSCols.size() - 1);
-      List<Integer> sortOrder = new ArrayList<Integer>(1);
-      sortOrder.add(1); // asc
-      List<Integer> sortNullOrder = new ArrayList<Integer>(1);
-      sortNullOrder.add(0); // nulls first
+      final List<Integer> keyPositions;
+      final List<Integer> sortOrder;
+      final List<Integer> sortNullOrder;
+      //Order matters, assuming later that __time_granularity comes first then 
__druidPartitionKey
+      if (targetShardsPerGranularity > 0) {
+        keyPositions = Lists.newArrayList(granularityKeyPos, partitionKeyPos);
+        sortOrder = Lists.newArrayList(1, 1); // asc
+        sortNullOrder = Lists.newArrayList(0, 0); // nulls first
+      } else {
+        keyPositions = Lists.newArrayList(granularityKeyPos);
+        sortOrder = Lists.newArrayList(1); // asc
+        sortNullOrder = Lists.newArrayList(0); // nulls first
+      }
       ReduceSinkOperator rsOp = getReduceSinkOp(keyPositions, sortOrder,
           sortNullOrder, allRSCols, granularitySelOp);
 
       // Create backtrack SelectOp
-      List<ExprNodeDesc> descs = new ArrayList<ExprNodeDesc>(allRSCols.size());
-      List<String> colNames = new ArrayList<String>();
-      String colName;
+      final List<ExprNodeDesc> descs = new ArrayList<>(allRSCols.size());
+      final List<String> colNames = new ArrayList<>();
       for (int i = 0; i < allRSCols.size(); i++) {
         ExprNodeDesc col = allRSCols.get(i);
-        colName = col.getExprString();
+        final String colName = col.getExprString();
         colNames.add(colName);
         if (keyPositions.contains(i)) {
-          descs.add(new ExprNodeColumnDesc(col.getTypeInfo(), 
ReduceField.KEY.toString()+"."+colName, null, false));
+          descs.add(
+              new ExprNodeColumnDesc(col.getTypeInfo(), 
ReduceField.KEY.toString() + "." + colName,
+                  null, false
+              ));
         } else {
-          descs.add(new ExprNodeColumnDesc(col.getTypeInfo(), 
ReduceField.VALUE.toString()+"."+colName, null, false));
+          descs.add(new ExprNodeColumnDesc(col.getTypeInfo(),
+              ReduceField.VALUE.toString() + "." + colName, null, false
+          ));
         }
       }
       RowSchema selRS = new RowSchema(granularitySelOp.getSchema());
@@ -205,24 +242,30 @@ public class SortedDynPartitionTimeGranularityOptimizer 
extends Transform {
       // Update file sink descriptor
       fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_SORTED);
       fsOp.getConf().setPartitionCols(rsOp.getConf().getPartitionCols());
-      ColumnInfo ci = new 
ColumnInfo(granularitySelOp.getSchema().getSignature().get(
-              granularitySelOp.getSchema().getSignature().size() - 1)); // 
granularity column
-      fsOp.getSchema().getSignature().add(ci);
+      final ColumnInfo granularityColumnInfo =
+          new 
ColumnInfo(granularitySelOp.getSchema().getSignature().get(granularityKeyPos));
+      fsOp.getSchema().getSignature().add(granularityColumnInfo);
+      if (targetShardsPerGranularity > 0) {
+        final ColumnInfo partitionKeyColumnInfo =
+            new 
ColumnInfo(granularitySelOp.getSchema().getSignature().get(partitionKeyPos));
+        fsOp.getSchema().getSignature().add(partitionKeyColumnInfo);
+      }
 
       LOG.info("Inserted " + granularitySelOp.getOperatorId() + ", " + 
rsOp.getOperatorId() + " and "
           + backtrackSelOp.getOperatorId() + " as parent of " + 
fsOp.getOperatorId()
           + " and child of " + fsParent.getOperatorId());
-
       parseCtx.setReduceSinkAddedBySortedDynPartition(true);
       return null;
     }
 
     private Operator<? extends OperatorDesc> getGranularitySelOp(
-            Operator<? extends OperatorDesc> fsParent, String 
segmentGranularity
+        Operator<? extends OperatorDesc> fsParent,
+        String segmentGranularity
     ) throws SemanticException {
-      ArrayList<ColumnInfo> parentCols = 
Lists.newArrayList(fsParent.getSchema().getSignature());
-      ArrayList<ExprNodeDesc> descs = Lists.newArrayList();
-      List<String> colNames = Lists.newArrayList();
+      final ArrayList<ColumnInfo> parentCols =
+          Lists.newArrayList(fsParent.getSchema().getSignature());
+      final ArrayList<ExprNodeDesc> descs = Lists.newArrayList();
+      final List<String> colNames = Lists.newArrayList();
       int timestampPos = -1;
       for (int i = 0; i < parentCols.size(); i++) {
         ColumnInfo ci = parentCols.get(i);
@@ -242,10 +285,9 @@ public class SortedDynPartitionTimeGranularityOptimizer 
extends Transform {
         throw new SemanticException("No column with timestamp with local 
time-zone type on query result; "
                 + "one column should be of timestamp with local time-zone 
type");
       }
-      RowSchema selRS = new RowSchema(fsParent.getSchema());
+      final RowSchema selRS = new RowSchema(fsParent.getSchema());
       // Granularity (partition) column
-      String udfName;
-
+      final String udfName;
       Class<? extends UDF> udfClass;
       switch (segmentGranularity) {
         case "YEAR":
@@ -277,9 +319,13 @@ public class SortedDynPartitionTimeGranularityOptimizer 
extends Transform {
           udfClass = UDFDateFloorSecond.class;
           break;
         default:
-          throw new SemanticException("Granularity for Druid segment not 
recognized");
+          throw new SemanticException(String.format(Locale.ENGLISH,
+              "Unknown Druid Granularity [%s], Accepted values are [YEAR, 
MONTH, WEEK, DAY, HOUR, MINUTE, SECOND]",
+              segmentGranularity
+          ));
       }
 
+
       // Timestamp column type in Druid is timestamp with local time-zone, as 
it represents
       // a specific instant in time. Thus, we have this value and we need to 
extract the
       // granularity to split the data when we are storing it in Druid. 
However, Druid stores
@@ -308,15 +354,39 @@ public class SortedDynPartitionTimeGranularityOptimizer 
extends Transform {
       descs.add(f3);
       colNames.add(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME);
       // Add granularity to the row schema
-      ColumnInfo ci = new 
ColumnInfo(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, 
TypeInfoFactory.timestampTypeInfo,
+      final ColumnInfo ci = new 
ColumnInfo(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, 
TypeInfoFactory.timestampTypeInfo,
               selRS.getSignature().get(0).getTabAlias(), false, false);
       selRS.getSignature().add(ci);
-
+      if (targetShardsPerGranularity > 0 ) {
+        // add another partitioning key based on floor(1/rand) % 
targetShardsPerGranularity
+        final ColumnInfo partitionKeyCi =
+            new ColumnInfo(Constants.DRUID_SHARD_KEY_COL_NAME, 
TypeInfoFactory.longTypeInfo,
+                selRS.getSignature().get(0).getTabAlias(), false, false
+            );
+        final ExprNodeDesc targetNumShardDescNode =
+            new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, 
targetShardsPerGranularity);
+        final ExprNodeGenericFuncDesc randomFn = ExprNodeGenericFuncDesc
+            .newInstance(new GenericUDFBridge("rand", false, 
UDFRand.class.getName()),
+                Lists.newArrayList()
+            );
+
+        final ExprNodeGenericFuncDesc random = 
ExprNodeGenericFuncDesc.newInstance(
+            new GenericUDFFloor(), Lists.newArrayList(ExprNodeGenericFuncDesc
+                .newInstance(new GenericUDFOPDivide(),
+                    Lists.newArrayList(new 
ExprNodeConstantDesc(TypeInfoFactory.doubleTypeInfo, 1.0), randomFn)
+                )));
+        final ExprNodeGenericFuncDesc randModMax = ExprNodeGenericFuncDesc
+            .newInstance(new GenericUDFOPMod(),
+                Lists.newArrayList(random, targetNumShardDescNode)
+            );
+        descs.add(randModMax);
+        colNames.add(Constants.DRUID_SHARD_KEY_COL_NAME);
+        selRS.getSignature().add(partitionKeyCi);
+      }
       // Create SelectDesc
-      SelectDesc selConf = new SelectDesc(descs, colNames);
-
+      final SelectDesc selConf = new SelectDesc(descs, colNames);
       // Create Select Operator
-      SelectOperator selOp = (SelectOperator) OperatorFactory.getAndMakeChild(
+      final SelectOperator selOp = (SelectOperator) 
OperatorFactory.getAndMakeChild(
               selConf, selRS, fsParent);
 
       return selOp;
@@ -324,72 +394,77 @@ public class SortedDynPartitionTimeGranularityOptimizer 
extends Transform {
 
     private ReduceSinkOperator getReduceSinkOp(List<Integer> keyPositions, 
List<Integer> sortOrder,
         List<Integer> sortNullOrder, ArrayList<ExprNodeDesc> allCols, 
Operator<? extends OperatorDesc> parent
-    ) throws SemanticException {
-
-      ArrayList<ExprNodeDesc> keyCols = Lists.newArrayList();
+    ) {
       // we will clone here as RS will update bucket column key with its
       // corresponding with bucket number and hence their OIs
-      for (Integer idx : keyPositions) {
-        keyCols.add(allCols.get(idx).clone());
-      }
-
+      final ArrayList<ExprNodeDesc> keyCols = keyPositions.stream()
+          .map(id -> allCols.get(id).clone())
+          .collect(Collectors.toCollection(ArrayList::new));
       ArrayList<ExprNodeDesc> valCols = Lists.newArrayList();
       for (int i = 0; i < allCols.size(); i++) {
-        if (!keyPositions.contains(i)) {
+        if (i != granularityKeyPos && i != partitionKeyPos) {
           valCols.add(allCols.get(i).clone());
         }
       }
 
-      ArrayList<ExprNodeDesc> partCols = Lists.newArrayList();
-      for (Integer idx : keyPositions) {
-        partCols.add(allCols.get(idx).clone());
-      }
+      final ArrayList<ExprNodeDesc> partCols =
+          keyPositions.stream().map(id -> allCols.get(id).clone())
+              .collect(Collectors.toCollection(ArrayList::new));
 
       // map _col0 to KEY._col0, etc
       Map<String, ExprNodeDesc> colExprMap = Maps.newHashMap();
       Map<String, String> nameMapping = new HashMap<>();
-      ArrayList<String> keyColNames = Lists.newArrayList();
-      for (ExprNodeDesc keyCol : keyCols) {
-        String keyColName = keyCol.getExprString();
-        keyColNames.add(keyColName);
-        colExprMap.put(Utilities.ReduceField.KEY + "." +keyColName, keyCol);
-        nameMapping.put(keyColName, Utilities.ReduceField.KEY + "." + 
keyColName);
-      }
-      ArrayList<String> valColNames = Lists.newArrayList();
-      for (ExprNodeDesc valCol : valCols) {
-        String colName = valCol.getExprString();
-        valColNames.add(colName);
-        colExprMap.put(Utilities.ReduceField.VALUE + "." + colName, valCol);
-        nameMapping.put(colName, Utilities.ReduceField.VALUE + "." + colName);
-      }
+      final ArrayList<String> keyColNames = Lists.newArrayList();
+      final ArrayList<String> valColNames = Lists.newArrayList();
+      keyCols.stream().forEach(exprNodeDesc -> {
+        keyColNames.add(exprNodeDesc.getExprString());
+        colExprMap
+            .put(Utilities.ReduceField.KEY + "." + 
exprNodeDesc.getExprString(), exprNodeDesc);
+        nameMapping.put(exprNodeDesc.getExprString(),
+            Utilities.ReduceField.KEY + "." + exprNodeDesc.getName()
+        );
+      });
+      valCols.stream().forEach(exprNodeDesc -> {
+        valColNames.add(exprNodeDesc.getExprString());
+        colExprMap
+            .put(Utilities.ReduceField.VALUE + "." + 
exprNodeDesc.getExprString(), exprNodeDesc);
+        nameMapping.put(exprNodeDesc.getExprString(),
+            Utilities.ReduceField.VALUE + "." + exprNodeDesc.getName()
+        );
+      });
+
 
       // order and null order
-      String orderStr = StringUtils.repeat("+", sortOrder.size());
-      String nullOrderStr = StringUtils.repeat("a", sortNullOrder.size());
+      final String orderStr = StringUtils.repeat("+", sortOrder.size());
+      final String nullOrderStr = StringUtils.repeat("a", 
sortNullOrder.size());
 
       // Create Key/Value TableDesc. When the operator plan is split into MR 
tasks,
       // the reduce operator will initialize Extract operator with information
       // from Key and Value TableDesc
-      List<FieldSchema> fields = 
PlanUtils.getFieldSchemasFromColumnList(keyCols,
+      final List<FieldSchema> fields = 
PlanUtils.getFieldSchemasFromColumnList(keyCols,
           keyColNames, 0, "");
-      TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(fields, orderStr, 
nullOrderStr);
+      final TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(fields, 
orderStr, nullOrderStr);
       List<FieldSchema> valFields = 
PlanUtils.getFieldSchemasFromColumnList(valCols,
           valColNames, 0, "");
-      TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields);
+      final TableDesc valueTable = 
PlanUtils.getReduceValueTableDesc(valFields);
       List<List<Integer>> distinctColumnIndices = Lists.newArrayList();
 
       // Number of reducers is set to default (-1)
-      ReduceSinkDesc rsConf = new ReduceSinkDesc(keyCols, keyCols.size(), 
valCols,
+      final ReduceSinkDesc rsConf = new ReduceSinkDesc(keyCols, 
keyCols.size(), valCols,
           keyColNames, distinctColumnIndices, valColNames, -1, partCols, -1, 
keyTable,
           valueTable);
 
-      ArrayList<ColumnInfo> signature = new ArrayList<>();
-      for (int index = 0; index < parent.getSchema().getSignature().size(); 
index++) {
-        ColumnInfo colInfo = new 
ColumnInfo(parent.getSchema().getSignature().get(index));
-        colInfo.setInternalName(nameMapping.get(colInfo.getInternalName()));
-        signature.add(colInfo);
-      }
-      ReduceSinkOperator op = (ReduceSinkOperator) 
OperatorFactory.getAndMakeChild(
+      final ArrayList<ColumnInfo> signature =
+          parent.getSchema().getSignature()
+              .stream()
+              .map(e -> new ColumnInfo(e))
+              .map(columnInfo ->
+                {
+                  
columnInfo.setInternalName(nameMapping.get(columnInfo.getInternalName()));
+                  return columnInfo;
+               })
+              .collect(Collectors.toCollection(ArrayList::new));
+      final ReduceSinkOperator op = (ReduceSinkOperator) 
OperatorFactory.getAndMakeChild(
           rsConf, new RowSchema(signature), parent);
       op.setColumnExprMap(colExprMap);
       return op;

http://git-wip-us.apache.org/repos/asf/hive/blob/e05e0fa1/ql/src/test/queries/clientpositive/druidmini_dynamic_partition.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidmini_dynamic_partition.q 
b/ql/src/test/queries/clientpositive/druidmini_dynamic_partition.q
new file mode 100644
index 0000000..2552717
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/druidmini_dynamic_partition.q
@@ -0,0 +1,170 @@
+CREATE TABLE druid_partitioned_table_0
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "HOUR",
+        "druid.query.granularity" = "MINUTE",
+        "druid.segment.targetShardsPerGranularity" = "0"
+        )
+        AS
+        SELECT cast (`ctimestamp1` as timestamp with local time zone) as 
`__time`,
+          cstring1,
+          cstring2,
+          cdouble,
+          cfloat,
+          ctinyint,
+          csmallint,
+          cint,
+          cbigint,
+          cboolean1,
+          cboolean2
+          FROM alltypesorc where ctimestamp1 IS NOT NULL;
+
+EXPLAIN CREATE TABLE druid_partitioned_table
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "HOUR",
+        "druid.query.granularity" = "MINUTE",
+        "druid.segment.targetShardsPerGranularity" = "6"
+        )
+        AS
+        SELECT cast (`ctimestamp1` as timestamp with local time zone) as 
`__time`,
+          cstring1,
+          cstring2,
+          cdouble,
+          cfloat,
+          ctinyint,
+          csmallint,
+          cint,
+          cbigint,
+          cboolean1,
+          cboolean2
+          FROM alltypesorc where ctimestamp1 IS NOT NULL;
+
+
+
+CREATE TABLE druid_partitioned_table
+STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+TBLPROPERTIES (
+"druid.segment.granularity" = "HOUR",
+"druid.query.granularity" = "MINUTE",
+"druid.segment.targetShardsPerGranularity" = "6"
+)
+AS
+SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`,
+  cstring1,
+  cstring2,
+  cdouble,
+  cfloat,
+  ctinyint,
+  csmallint,
+  cint,
+  cbigint,
+  cboolean1,
+  cboolean2
+  FROM alltypesorc where ctimestamp1 IS NOT NULL;
+
+SELECT sum(cfloat)  FROM druid_partitioned_table ;
+
+SELECT floor_hour(cast(`ctimestamp1` as timestamp with local time zone)) as 
`__time`,
+          cstring1,
+          cstring2,
+          cdouble,
+          cfloat,
+          ctinyint,
+          csmallint,
+          cint,
+          cbigint,
+          cboolean1,
+          cboolean2
+          FROM alltypesorc where ctimestamp1 IS NOT NULL order by `__time`, 
cstring2 DESC NULLS LAST, cstring1 DESC NULLS LAST LIMIT 10 ;
+
+
+EXPLAIN INSERT INTO TABLE druid_partitioned_table
+SELECT cast (`ctimestamp2` as timestamp with local time zone) as `__time`,
+  cstring1,
+  cstring2,
+  cdouble,
+  cfloat,
+  ctinyint,
+  csmallint,
+  cint,
+  cbigint,
+  cboolean1,
+  cboolean2
+  FROM alltypesorc where ctimestamp2 IS NOT NULL;
+
+INSERT INTO TABLE druid_partitioned_table
+SELECT cast (`ctimestamp2` as timestamp with local time zone) as `__time`,
+  cstring1,
+  cstring2,
+  cdouble,
+  cfloat,
+  ctinyint,
+  csmallint,
+  cint,
+  cbigint,
+  cboolean1,
+  cboolean2
+  FROM alltypesorc where ctimestamp2 IS NOT NULL;
+
+SELECT  sum(cfloat)  FROM druid_partitioned_table ;
+
+EXPLAIN INSERT OVERWRITE TABLE druid_partitioned_table
+  SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`,
+    cstring1,
+    cstring2,
+    cdouble,
+    cfloat,
+    ctinyint,
+    csmallint,
+    cint,
+    cbigint,
+    cboolean1,
+    cboolean2
+    FROM alltypesorc where ctimestamp1 IS NOT NULL;
+
+
+INSERT OVERWRITE TABLE druid_partitioned_table
+  SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`,
+    cstring1,
+    cstring2,
+    cdouble,
+    cfloat,
+    ctinyint,
+    csmallint,
+    cint,
+    cbigint,
+    cboolean1,
+    cboolean2
+    FROM alltypesorc where ctimestamp1 IS NOT NULL;
+
+  SELECT  sum(cfloat)  FROM druid_partitioned_table ;
+
+
+set hive.druid.indexer.partition.size.max=10;
+
+CREATE TABLE druid_max_size_partition
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "HOUR",
+        "druid.query.granularity" = "MINUTE"
+        )
+        AS
+        SELECT cast (`ctimestamp1` as timestamp with local time zone) as 
`__time`,
+          cstring1,
+          cstring2,
+          cdouble,
+          cfloat,
+          ctinyint,
+          csmallint,
+          cint,
+          cbigint,
+          cboolean1,
+          cboolean2
+          FROM alltypesorc where ctimestamp1 IS NOT NULL;
+
+SELECT  sum(cfloat)  FROM druid_max_size_partition ;
+
+  DROP TABLE druid_partitioned_table_0;
+  DROP TABLE druid_partitioned_table;
+  DROP TABLE druid_max_size_partition;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/e05e0fa1/ql/src/test/results/clientpositive/druid/druidmini_dynamic_partition.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/druid/druidmini_dynamic_partition.q.out 
b/ql/src/test/results/clientpositive/druid/druidmini_dynamic_partition.q.out
new file mode 100644
index 0000000..941b760
--- /dev/null
+++ b/ql/src/test/results/clientpositive/druid/druidmini_dynamic_partition.q.out
@@ -0,0 +1,625 @@
+PREHOOK: query: CREATE TABLE druid_partitioned_table_0
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "HOUR",
+        "druid.query.granularity" = "MINUTE",
+        "druid.segment.targetShardsPerGranularity" = "0"
+        )
+        AS
+        SELECT cast (`ctimestamp1` as timestamp with local time zone) as 
`__time`,
+          cstring1,
+          cstring2,
+          cdouble,
+          cfloat,
+          ctinyint,
+          csmallint,
+          cint,
+          cbigint,
+          cboolean1,
+          cboolean2
+          FROM alltypesorc where ctimestamp1 IS NOT NULL
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: database:default
+PREHOOK: Output: default@druid_partitioned_table_0
+POSTHOOK: query: CREATE TABLE druid_partitioned_table_0
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "HOUR",
+        "druid.query.granularity" = "MINUTE",
+        "druid.segment.targetShardsPerGranularity" = "0"
+        )
+        AS
+        SELECT cast (`ctimestamp1` as timestamp with local time zone) as 
`__time`,
+          cstring1,
+          cstring2,
+          cdouble,
+          cfloat,
+          ctinyint,
+          csmallint,
+          cint,
+          cbigint,
+          cboolean1,
+          cboolean2
+          FROM alltypesorc where ctimestamp1 IS NOT NULL
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@druid_partitioned_table_0
+POSTHOOK: Lineage: druid_partitioned_table_0.__time EXPRESSION 
[(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, 
comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table_0.cbigint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), 
]
+POSTHOOK: Lineage: druid_partitioned_table_0.cboolean1 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, 
comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table_0.cboolean2 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, 
comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table_0.cdouble SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), 
]
+POSTHOOK: Lineage: druid_partitioned_table_0.cfloat SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table_0.cint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table_0.csmallint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, 
comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table_0.cstring1 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, 
comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table_0.cstring2 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, 
comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table_0.ctinyint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, 
comment:null), ]
+PREHOOK: query: EXPLAIN CREATE TABLE druid_partitioned_table
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "HOUR",
+        "druid.query.granularity" = "MINUTE",
+        "druid.segment.targetShardsPerGranularity" = "6"
+        )
+        AS
+        SELECT cast (`ctimestamp1` as timestamp with local time zone) as 
`__time`,
+          cstring1,
+          cstring2,
+          cdouble,
+          cfloat,
+          ctinyint,
+          csmallint,
+          cint,
+          cbigint,
+          cboolean1,
+          cboolean2
+          FROM alltypesorc where ctimestamp1 IS NOT NULL
+PREHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: query: EXPLAIN CREATE TABLE druid_partitioned_table
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "HOUR",
+        "druid.query.granularity" = "MINUTE",
+        "druid.segment.targetShardsPerGranularity" = "6"
+        )
+        AS
+        SELECT cast (`ctimestamp1` as timestamp with local time zone) as 
`__time`,
+          cstring1,
+          cstring2,
+          cdouble,
+          cfloat,
+          ctinyint,
+          csmallint,
+          cint,
+          cbigint,
+          cboolean1,
+          cboolean2
+          FROM alltypesorc where ctimestamp1 IS NOT NULL
+POSTHOOK: type: CREATETABLE_AS_SELECT
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+  Stage-3 depends on stages: Stage-0
+  Stage-2 depends on stages: Stage-3
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: alltypesorc
+            Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: ctimestamp1 is not null (type: boolean)
+              Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+              Select Operator
+                expressions: CAST( ctimestamp1 AS timestamp with local time 
zone) (type: timestamp with local time zone), cstring1 (type: string), cstring2 
(type: string), cdouble (type: double), cfloat (type: float), ctinyint (type: 
tinyint), csmallint (type: smallint), cint (type: int), cbigint (type: bigint), 
cboolean1 (type: boolean), cboolean2 (type: boolean)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, 
_col6, _col7, _col8, _col9, _col10
+                Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col0 (type: timestamp with local time zone), 
_col1 (type: string), _col2 (type: string), _col3 (type: double), _col4 (type: 
float), _col5 (type: tinyint), _col6 (type: smallint), _col7 (type: int), _col8 
(type: bigint), _col9 (type: boolean), _col10 (type: boolean), floor_hour(CAST( 
GenericUDFEpochMilli(_col0) AS TIMESTAMP)) (type: timestamp), (floor((1.0 / 
rand())) % 6) (type: bigint)
+                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, 
_col6, _col7, _col8, _col9, _col10, __time_granularity, 
__druid_extra_partition_key
+                  Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+                  Reduce Output Operator
+                    key expressions: __time_granularity (type: timestamp), 
__druid_extra_partition_key (type: bigint)
+                    sort order: ++
+                    Map-reduce partition columns: __time_granularity (type: 
timestamp), __druid_extra_partition_key (type: bigint)
+                    Statistics: Num rows: 12288 Data size: 2641964 Basic 
stats: COMPLETE Column stats: NONE
+                    value expressions: _col0 (type: timestamp with local time 
zone), _col1 (type: string), _col2 (type: string), _col3 (type: double), _col4 
(type: float), _col5 (type: tinyint), _col6 (type: smallint), _col7 (type: 
int), _col8 (type: bigint), _col9 (type: boolean), _col10 (type: boolean)
+      Reduce Operator Tree:
+        Select Operator
+          expressions: VALUE._col0 (type: timestamp with local time zone), 
VALUE._col1 (type: string), VALUE._col2 (type: string), VALUE._col3 (type: 
double), VALUE._col4 (type: float), VALUE._col5 (type: tinyint), VALUE._col6 
(type: smallint), VALUE._col7 (type: int), VALUE._col8 (type: bigint), 
VALUE._col9 (type: boolean), VALUE._col10 (type: boolean), 
KEY.__time_granularity (type: timestamp), KEY.__druid_extra_partition_key 
(type: bigint)
+          outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, 
_col7, _col8, _col9, _col10, __time_granularity, __druid_extra_partition_key
+          Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE 
Column stats: NONE
+          File Output Operator
+            compressed: false
+            Dp Sort State: PARTITION_SORTED
+            Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+            table:
+                input format: 
org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat
+                output format: 
org.apache.hadoop.hive.druid.io.DruidOutputFormat
+                serde: org.apache.hadoop.hive.druid.serde.DruidSerDe
+                name: default.druid_partitioned_table
+
+  Stage: Stage-0
+    Move Operator
+      files:
+          hdfs directory: true
+#### A masked pattern was here ####
+
+  Stage: Stage-3
+      Create Table Operator:
+        Create Table
+          columns: __time timestamp with local time zone, cstring1 string, 
cstring2 string, cdouble double, cfloat float, ctinyint tinyint, csmallint 
smallint, cint int, cbigint bigint, cboolean1 boolean, cboolean2 boolean
+          storage handler: org.apache.hadoop.hive.druid.DruidStorageHandler
+          name: default.druid_partitioned_table
+          table properties:
+            druid.query.granularity MINUTE
+            druid.segment.granularity HOUR
+            druid.segment.targetShardsPerGranularity 6
+
+  Stage: Stage-2
+    Stats Work
+      Basic Stats Work:
+
+PREHOOK: query: CREATE TABLE druid_partitioned_table
+STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+TBLPROPERTIES (
+"druid.segment.granularity" = "HOUR",
+"druid.query.granularity" = "MINUTE",
+"druid.segment.targetShardsPerGranularity" = "6"
+)
+AS
+SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`,
+  cstring1,
+  cstring2,
+  cdouble,
+  cfloat,
+  ctinyint,
+  csmallint,
+  cint,
+  cbigint,
+  cboolean1,
+  cboolean2
+  FROM alltypesorc where ctimestamp1 IS NOT NULL
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: database:default
+PREHOOK: Output: default@druid_partitioned_table
+POSTHOOK: query: CREATE TABLE druid_partitioned_table
+STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+TBLPROPERTIES (
+"druid.segment.granularity" = "HOUR",
+"druid.query.granularity" = "MINUTE",
+"druid.segment.targetShardsPerGranularity" = "6"
+)
+AS
+SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`,
+  cstring1,
+  cstring2,
+  cdouble,
+  cfloat,
+  ctinyint,
+  csmallint,
+  cint,
+  cbigint,
+  cboolean1,
+  cboolean2
+  FROM alltypesorc where ctimestamp1 IS NOT NULL
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@druid_partitioned_table
+POSTHOOK: Lineage: druid_partitioned_table.__time EXPRESSION 
[(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, 
comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table.cbigint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), 
]
+POSTHOOK: Lineage: druid_partitioned_table.cboolean1 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, 
comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table.cboolean2 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, 
comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table.cdouble SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), 
]
+POSTHOOK: Lineage: druid_partitioned_table.cfloat SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table.cint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table.csmallint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, 
comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table.cstring1 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, 
comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table.cstring2 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, 
comment:null), ]
+POSTHOOK: Lineage: druid_partitioned_table.ctinyint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, 
comment:null), ]
+PREHOOK: query: SELECT sum(cfloat)  FROM druid_partitioned_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_partitioned_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT sum(cfloat)  FROM druid_partitioned_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_partitioned_table
+#### A masked pattern was here ####
+-39590.246
+PREHOOK: query: SELECT floor_hour(cast(`ctimestamp1` as timestamp with local 
time zone)) as `__time`,
+          cstring1,
+          cstring2,
+          cdouble,
+          cfloat,
+          ctinyint,
+          csmallint,
+          cint,
+          cbigint,
+          cboolean1,
+          cboolean2
+          FROM alltypesorc where ctimestamp1 IS NOT NULL order by `__time`, 
cstring2 DESC NULLS LAST, cstring1 DESC NULLS LAST LIMIT 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT floor_hour(cast(`ctimestamp1` as timestamp with local 
time zone)) as `__time`,
+          cstring1,
+          cstring2,
+          cdouble,
+          cfloat,
+          ctinyint,
+          csmallint,
+          cint,
+          cbigint,
+          cboolean1,
+          cboolean2
+          FROM alltypesorc where ctimestamp1 IS NOT NULL order by `__time`, 
cstring2 DESC NULLS LAST, cstring1 DESC NULLS LAST LIMIT 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+1969-12-31 15:00:00.0 US/Pacific       NULL    yx36UAT823Cm    -200.0  52.0    
52      -200    NULL    2029007949      NULL    true
+1969-12-31 15:00:00.0 US/Pacific       NULL    yvcx4HYTT8tvAm6CNbXHaH  -7196.0 
40.0    40      -7196   NULL    437984126       NULL    false
+1969-12-31 15:00:00.0 US/Pacific       NULL    ysho54gMb       15601.0 -22.0   
-22     15601   NULL    1553802956      NULL    false
+1969-12-31 15:00:00.0 US/Pacific       NULL    yqXw7J7 15601.0 44.0    44      
15601   NULL    1265051089      NULL    true
+1969-12-31 15:00:00.0 US/Pacific       NULL    yfpFFQ0 15601.0 11.0    11      
15601   NULL    11070716        NULL    false
+1969-12-31 15:00:00.0 US/Pacific       NULL    ySl6tu66m72erf1 -200.0  -50.0   
-50     -200    NULL    824529348       NULL    false
+1969-12-31 15:00:00.0 US/Pacific       NULL    yPUM0wC54vq     15601.0 -8.0    
-8      15601   NULL    1821279179      NULL    true
+1969-12-31 15:00:00.0 US/Pacific       NULL    yOgijYXi753GboFW7L1x65l -7196.0 
-34.0   -34     -7196   NULL    -1022931985     NULL    false
+1969-12-31 15:00:00.0 US/Pacific       NULL    yO7xKhbtrsBU147xuT0CF7Q 15601.0 
47.0    47      15601   NULL    661404907       NULL    true
+1969-12-31 15:00:00.0 US/Pacific       NULL    yLB85lr145622oTRo       -200.0  
-41.0   -41     -200    NULL    -260138227      NULL    true
+PREHOOK: query: EXPLAIN INSERT INTO TABLE druid_partitioned_table
+SELECT cast (`ctimestamp2` as timestamp with local time zone) as `__time`,
+  cstring1,
+  cstring2,
+  cdouble,
+  cfloat,
+  ctinyint,
+  csmallint,
+  cint,
+  cbigint,
+  cboolean1,
+  cboolean2
+  FROM alltypesorc where ctimestamp2 IS NOT NULL
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN INSERT INTO TABLE druid_partitioned_table
+SELECT cast (`ctimestamp2` as timestamp with local time zone) as `__time`,
+  cstring1,
+  cstring2,
+  cdouble,
+  cfloat,
+  ctinyint,
+  csmallint,
+  cint,
+  cbigint,
+  cboolean1,
+  cboolean2
+  FROM alltypesorc where ctimestamp2 IS NOT NULL
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+  Stage-2
+  Stage-1 is a root stage
+  Stage-3 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+      Alter Table Operator:
+        Alter Table
+          type: drop props
+          old name: default.druid_partitioned_table
+          properties:
+            COLUMN_STATS_ACCURATE 
+
+  Stage: Stage-2
+      Insert operator:
+        Insert
+
+  Stage: Stage-1
+      Pre Insert operator:
+        Pre-Insert task
+
+  Stage: Stage-3
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: alltypesorc
+            Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: ctimestamp2 is not null (type: boolean)
+              Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+              Select Operator
+                expressions: CAST( ctimestamp2 AS timestamp with local time 
zone) (type: timestamp with local time zone), cstring1 (type: string), cstring2 
(type: string), cdouble (type: double), cfloat (type: float), ctinyint (type: 
tinyint), csmallint (type: smallint), cint (type: int), cbigint (type: bigint), 
cboolean1 (type: boolean), cboolean2 (type: boolean)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, 
_col6, _col7, _col8, _col9, _col10
+                Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col0 (type: timestamp with local time zone), 
_col1 (type: string), _col2 (type: string), _col3 (type: double), _col4 (type: 
float), _col5 (type: tinyint), _col6 (type: smallint), _col7 (type: int), _col8 
(type: bigint), _col9 (type: boolean), _col10 (type: boolean), floor_hour(CAST( 
GenericUDFEpochMilli(_col0) AS TIMESTAMP)) (type: timestamp), (floor((1.0 / 
rand())) % 6) (type: bigint)
+                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, 
_col6, _col7, _col8, _col9, _col10, __time_granularity, 
__druid_extra_partition_key
+                  Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+                  Reduce Output Operator
+                    key expressions: __time_granularity (type: timestamp), 
__druid_extra_partition_key (type: bigint)
+                    sort order: ++
+                    Map-reduce partition columns: __time_granularity (type: 
timestamp), __druid_extra_partition_key (type: bigint)
+                    Statistics: Num rows: 12288 Data size: 2641964 Basic 
stats: COMPLETE Column stats: NONE
+                    value expressions: _col0 (type: timestamp with local time 
zone), _col1 (type: string), _col2 (type: string), _col3 (type: double), _col4 
(type: float), _col5 (type: tinyint), _col6 (type: smallint), _col7 (type: 
int), _col8 (type: bigint), _col9 (type: boolean), _col10 (type: boolean)
+      Reduce Operator Tree:
+        Select Operator
+          expressions: VALUE._col0 (type: timestamp with local time zone), 
VALUE._col1 (type: string), VALUE._col2 (type: string), VALUE._col3 (type: 
double), VALUE._col4 (type: float), VALUE._col5 (type: tinyint), VALUE._col6 
(type: smallint), VALUE._col7 (type: int), VALUE._col8 (type: bigint), 
VALUE._col9 (type: boolean), VALUE._col10 (type: boolean), 
KEY.__time_granularity (type: timestamp), KEY.__druid_extra_partition_key 
(type: bigint)
+          outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, 
_col7, _col8, _col9, _col10, __time_granularity, __druid_extra_partition_key
+          Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE 
Column stats: NONE
+          File Output Operator
+            compressed: false
+            Dp Sort State: PARTITION_SORTED
+            Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+            table:
+                input format: 
org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat
+                output format: 
org.apache.hadoop.hive.druid.io.DruidOutputFormat
+                serde: org.apache.hadoop.hive.druid.serde.DruidSerDe
+                name: default.druid_partitioned_table
+
+PREHOOK: query: INSERT INTO TABLE druid_partitioned_table
+SELECT cast (`ctimestamp2` as timestamp with local time zone) as `__time`,
+  cstring1,
+  cstring2,
+  cdouble,
+  cfloat,
+  ctinyint,
+  csmallint,
+  cint,
+  cbigint,
+  cboolean1,
+  cboolean2
+  FROM alltypesorc where ctimestamp2 IS NOT NULL
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@druid_partitioned_table
+POSTHOOK: query: INSERT INTO TABLE druid_partitioned_table
+SELECT cast (`ctimestamp2` as timestamp with local time zone) as `__time`,
+  cstring1,
+  cstring2,
+  cdouble,
+  cfloat,
+  ctinyint,
+  csmallint,
+  cint,
+  cbigint,
+  cboolean1,
+  cboolean2
+  FROM alltypesorc where ctimestamp2 IS NOT NULL
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@druid_partitioned_table
+PREHOOK: query: SELECT  sum(cfloat)  FROM druid_partitioned_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_partitioned_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT  sum(cfloat)  FROM druid_partitioned_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_partitioned_table
+#### A masked pattern was here ####
+-46301.883
+PREHOOK: query: EXPLAIN INSERT OVERWRITE TABLE druid_partitioned_table
+  SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`,
+    cstring1,
+    cstring2,
+    cdouble,
+    cfloat,
+    ctinyint,
+    csmallint,
+    cint,
+    cbigint,
+    cboolean1,
+    cboolean2
+    FROM alltypesorc where ctimestamp1 IS NOT NULL
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN INSERT OVERWRITE TABLE druid_partitioned_table
+  SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`,
+    cstring1,
+    cstring2,
+    cdouble,
+    cfloat,
+    ctinyint,
+    csmallint,
+    cint,
+    cbigint,
+    cboolean1,
+    cboolean2
+    FROM alltypesorc where ctimestamp1 IS NOT NULL
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+  Stage-2
+  Stage-1 is a root stage
+  Stage-3 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+      Alter Table Operator:
+        Alter Table
+          type: drop props
+          old name: default.druid_partitioned_table
+          properties:
+            COLUMN_STATS_ACCURATE 
+
+  Stage: Stage-2
+      Insert operator:
+        Insert
+
+  Stage: Stage-1
+      Pre Insert operator:
+        Pre-Insert task
+
+  Stage: Stage-3
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: alltypesorc
+            Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: ctimestamp1 is not null (type: boolean)
+              Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+              Select Operator
+                expressions: CAST( ctimestamp1 AS timestamp with local time 
zone) (type: timestamp with local time zone), cstring1 (type: string), cstring2 
(type: string), cdouble (type: double), cfloat (type: float), ctinyint (type: 
tinyint), csmallint (type: smallint), cint (type: int), cbigint (type: bigint), 
cboolean1 (type: boolean), cboolean2 (type: boolean)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, 
_col6, _col7, _col8, _col9, _col10
+                Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col0 (type: timestamp with local time zone), 
_col1 (type: string), _col2 (type: string), _col3 (type: double), _col4 (type: 
float), _col5 (type: tinyint), _col6 (type: smallint), _col7 (type: int), _col8 
(type: bigint), _col9 (type: boolean), _col10 (type: boolean), floor_hour(CAST( 
GenericUDFEpochMilli(_col0) AS TIMESTAMP)) (type: timestamp), (floor((1.0 / 
rand())) % 6) (type: bigint)
+                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, 
_col6, _col7, _col8, _col9, _col10, __time_granularity, 
__druid_extra_partition_key
+                  Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+                  Reduce Output Operator
+                    key expressions: __time_granularity (type: timestamp), 
__druid_extra_partition_key (type: bigint)
+                    sort order: ++
+                    Map-reduce partition columns: __time_granularity (type: 
timestamp), __druid_extra_partition_key (type: bigint)
+                    Statistics: Num rows: 12288 Data size: 2641964 Basic 
stats: COMPLETE Column stats: NONE
+                    value expressions: _col0 (type: timestamp with local time 
zone), _col1 (type: string), _col2 (type: string), _col3 (type: double), _col4 
(type: float), _col5 (type: tinyint), _col6 (type: smallint), _col7 (type: 
int), _col8 (type: bigint), _col9 (type: boolean), _col10 (type: boolean)
+      Reduce Operator Tree:
+        Select Operator
+          expressions: VALUE._col0 (type: timestamp with local time zone), 
VALUE._col1 (type: string), VALUE._col2 (type: string), VALUE._col3 (type: 
double), VALUE._col4 (type: float), VALUE._col5 (type: tinyint), VALUE._col6 
(type: smallint), VALUE._col7 (type: int), VALUE._col8 (type: bigint), 
VALUE._col9 (type: boolean), VALUE._col10 (type: boolean), 
KEY.__time_granularity (type: timestamp), KEY.__druid_extra_partition_key 
(type: bigint)
+          outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, 
_col7, _col8, _col9, _col10, __time_granularity, __druid_extra_partition_key
+          Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE 
Column stats: NONE
+          File Output Operator
+            compressed: false
+            Dp Sort State: PARTITION_SORTED
+            Statistics: Num rows: 12288 Data size: 2641964 Basic stats: 
COMPLETE Column stats: NONE
+            table:
+                input format: 
org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat
+                output format: 
org.apache.hadoop.hive.druid.io.DruidOutputFormat
+                serde: org.apache.hadoop.hive.druid.serde.DruidSerDe
+                name: default.druid_partitioned_table
+
+PREHOOK: query: INSERT OVERWRITE TABLE druid_partitioned_table
+  SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`,
+    cstring1,
+    cstring2,
+    cdouble,
+    cfloat,
+    ctinyint,
+    csmallint,
+    cint,
+    cbigint,
+    cboolean1,
+    cboolean2
+    FROM alltypesorc where ctimestamp1 IS NOT NULL
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@druid_partitioned_table
+POSTHOOK: query: INSERT OVERWRITE TABLE druid_partitioned_table
+  SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`,
+    cstring1,
+    cstring2,
+    cdouble,
+    cfloat,
+    ctinyint,
+    csmallint,
+    cint,
+    cbigint,
+    cboolean1,
+    cboolean2
+    FROM alltypesorc where ctimestamp1 IS NOT NULL
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@druid_partitioned_table
+PREHOOK: query: SELECT  sum(cfloat)  FROM druid_partitioned_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_partitioned_table
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT  sum(cfloat)  FROM druid_partitioned_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_partitioned_table
+#### A masked pattern was here ####
+-39590.246
+PREHOOK: query: CREATE TABLE druid_max_size_partition
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "HOUR",
+        "druid.query.granularity" = "MINUTE"
+        )
+        AS
+        SELECT cast (`ctimestamp1` as timestamp with local time zone) as 
`__time`,
+          cstring1,
+          cstring2,
+          cdouble,
+          cfloat,
+          ctinyint,
+          csmallint,
+          cint,
+          cbigint,
+          cboolean1,
+          cboolean2
+          FROM alltypesorc where ctimestamp1 IS NOT NULL
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: database:default
+PREHOOK: Output: default@druid_max_size_partition
+POSTHOOK: query: CREATE TABLE druid_max_size_partition
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "HOUR",
+        "druid.query.granularity" = "MINUTE"
+        )
+        AS
+        SELECT cast (`ctimestamp1` as timestamp with local time zone) as 
`__time`,
+          cstring1,
+          cstring2,
+          cdouble,
+          cfloat,
+          ctinyint,
+          csmallint,
+          cint,
+          cbigint,
+          cboolean1,
+          cboolean2
+          FROM alltypesorc where ctimestamp1 IS NOT NULL
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@druid_max_size_partition
+POSTHOOK: Lineage: druid_max_size_partition.__time EXPRESSION 
[(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, 
comment:null), ]
+POSTHOOK: Lineage: druid_max_size_partition.cbigint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), 
]
+POSTHOOK: Lineage: druid_max_size_partition.cboolean1 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, 
comment:null), ]
+POSTHOOK: Lineage: druid_max_size_partition.cboolean2 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, 
comment:null), ]
+POSTHOOK: Lineage: druid_max_size_partition.cdouble SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), 
]
+POSTHOOK: Lineage: druid_max_size_partition.cfloat SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: druid_max_size_partition.cint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: druid_max_size_partition.csmallint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, 
comment:null), ]
+POSTHOOK: Lineage: druid_max_size_partition.cstring1 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, 
comment:null), ]
+POSTHOOK: Lineage: druid_max_size_partition.cstring2 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, 
comment:null), ]
+POSTHOOK: Lineage: druid_max_size_partition.ctinyint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, 
comment:null), ]
+PREHOOK: query: SELECT  sum(cfloat)  FROM druid_max_size_partition
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_max_size_partition
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT  sum(cfloat)  FROM druid_max_size_partition
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_max_size_partition
+#### A masked pattern was here ####
+-39590.246
+PREHOOK: query: DROP TABLE druid_partitioned_table_0
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@druid_partitioned_table_0
+PREHOOK: Output: default@druid_partitioned_table_0
+POSTHOOK: query: DROP TABLE druid_partitioned_table_0
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@druid_partitioned_table_0
+POSTHOOK: Output: default@druid_partitioned_table_0
+PREHOOK: query: DROP TABLE druid_partitioned_table
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@druid_partitioned_table
+PREHOOK: Output: default@druid_partitioned_table
+POSTHOOK: query: DROP TABLE druid_partitioned_table
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@druid_partitioned_table
+POSTHOOK: Output: default@druid_partitioned_table
+PREHOOK: query: DROP TABLE druid_max_size_partition
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@druid_max_size_partition
+PREHOOK: Output: default@druid_max_size_partition
+POSTHOOK: query: DROP TABLE druid_max_size_partition
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@druid_max_size_partition
+POSTHOOK: Output: default@druid_max_size_partition

Reply via email to