Repository: hive
Updated Branches:
  refs/heads/master 240da8403 -> e55ccd291


HIVE-19026: Add support for more ingestion formats - Druid Kafka Indexing 
(Nishant Bangarwa reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e55ccd29
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e55ccd29
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e55ccd29

Branch: refs/heads/master
Commit: e55ccd29108f400c17f2d80d81278f81be128f9e
Parents: 240da84
Author: Nishant <nishant.mon...@gmail.com>
Authored: Wed Nov 14 18:13:09 2018 +0530
Committer: Nishant <nishant.mon...@gmail.com>
Committed: Wed Nov 14 18:13:09 2018 +0530

----------------------------------------------------------------------
 data/scripts/kafka_init_data.csv                |  10 +
 .../hadoop/hive/druid/conf/DruidConstants.java  |  76 ++++++
 .../hive/druid/json/AvroBytesDecoder.java       |  37 +++
 .../hadoop/hive/druid/json/AvroParseSpec.java   | 104 ++++++++
 .../druid/json/AvroStreamInputRowParser.java    |  98 +++++++
 .../json/InlineSchemaAvroBytesDecoder.java      |  52 ++++
 .../clientpositive/druidkafkamini_avro.q        |  99 +++++++
 .../queries/clientpositive/druidkafkamini_csv.q |  37 +++
 .../clientpositive/druidkafkamini_delimited.q   |  38 +++
 .../druid/druidkafkamini_avro.q.out             | 263 +++++++++++++++++++
 .../druid/druidkafkamini_csv.q.out              | 138 ++++++++++
 .../druid/druidkafkamini_delimited.q.out        | 140 ++++++++++
 12 files changed, 1092 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/data/scripts/kafka_init_data.csv
----------------------------------------------------------------------
diff --git a/data/scripts/kafka_init_data.csv b/data/scripts/kafka_init_data.csv
new file mode 100644
index 0000000..5dc094e
--- /dev/null
+++ b/data/scripts/kafka_init_data.csv
@@ -0,0 +1,10 @@
+"2013-08-31T01:02:33Z", "Gypsy 
Danger","en","nuclear","true","true","false","false","article","North 
America","United States","Bay Area","San Francisco",57,200,-143
+"2013-08-31T03:32:45Z","Striker 
Eureka","en","speed","false","true","true","false","wikipedia","Australia","Australia","Cantebury","Syndey",459,129,330
+"2013-08-31T07:11:21Z","Cherno 
Alpha","ru","masterYi","false","true","true","false","article","Asia","Russia","Oblast","Moscow",123,12,111
+"2013-08-31T11:58:39Z","Crimson 
Typhoon","zh","triplets","true","false","true","false","wikipedia","Asia","China","Shanxi","Taiyuan",905,5,900
+"2013-08-31T12:41:27Z","Coyote 
Tango","ja","stringer","true","false","true","false","wikipedia","Asia","Japan","Kanto","Tokyo",1,10,-9
+"2013-09-01T01:02:33Z","Gypsy 
Danger","en","nuclear","true","true","false","false","article","North 
America","United States","Bay Area","San Francisco",57,200,-143
+"2013-09-01T03:32:45Z","Striker 
Eureka","en","speed","false","true","true","false","wikipedia","Australia","Australia","Cantebury","Syndey",459,129,330
+"2013-09-01T07:11:21Z","Cherno 
Alpha","ru","masterYi","false","true","true","false","article","Asia","Russia","Oblast","Moscow",123,12,111
+"2013-09-01T11:58:39Z","Crimson 
Typhoon","zh","triplets","true","false","true","false","wikipedia","Asia","China","Shanxi","Taiyuan",905,5,900
+"2013-09-01T12:41:27Z","Coyote 
Tango","ja","stringer","true","false","true","false","wikipedia","Asia","Japan","Kanto","Tokyo",1,10,-9
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java
new file mode 100644
index 0000000..242f7be
--- /dev/null
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.conf;
+
+public class DruidConstants {
+
+  public static final String DRUID_QUERY_FETCH = "druid.query.fetch";
+
+  public static final String DRUID_ROLLUP = "druid.rollup";
+
+  public static final String DRUID_QUERY_GRANULARITY = 
"druid.query.granularity";
+
+  public static final String DRUID_SEGMENT_DIRECTORY = 
"druid.storage.storageDirectory";
+
+  public static final String DRUID_SEGMENT_INTERMEDIATE_DIRECTORY = 
"druid.storage.storageDirectory.intermediate";
+
+  public static final String DRUID_SEGMENT_VERSION = "druid.segment.version";
+
+  public static final String DRUID_JOB_WORKING_DIRECTORY = 
"druid.job.workingDirectory";
+
+  public static final String KAFKA_TOPIC = "kafka.topic";
+
+  public static final String KAFKA_BOOTSTRAP_SERVERS = 
"kafka.bootstrap.servers";
+
+  public static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = 
"druid.kafka.ingestion.";
+
+  public static final String DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX = 
DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "consumer.";
+
+  /* Kafka Ingestion state - valid values - START/STOP/RESET */
+  public static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion";
+
+  //Druid storage timestamp column name
+  public static final String DEFAULT_TIMESTAMP_COLUMN = "__time";
+
+  public static final String DRUID_TIMESTAMP_FORMAT = "druid.timestamp.format";
+
+  // Used when the field name in ingested data via streaming ingestion does 
not match
+  // druid default timestamp column i.e `__time`
+  public static final String DRUID_TIMESTAMP_COLUMN = "druid.timestamp.column";
+
+  //Druid Json timestamp column name for GroupBy results
+  public static final String EVENT_TIMESTAMP_COLUMN = "timestamp";
+
+  // Druid ParseSpec Type - JSON/CSV/TSV/AVRO
+  public static final String DRUID_PARSE_SPEC_FORMAT = 
"druid.parseSpec.format";
+
+  public static final String AVRO_SCHEMA_LITERAL = "avro.schema.literal";
+
+  // value delimiter for druid columns
+  public static final String DRUID_PARSE_SPEC_DELIMITER = 
"druid.parseSpec.delimiter";
+
+  // list demiliter for multi-valued columns
+  public static final String DRUID_PARSE_SPEC_LIST_DELIMITER = 
"druid.parseSpec.listDelimiter";
+
+  // order of columns for delimiter and csv parse specs.
+  public static final String DRUID_PARSE_SPEC_COLUMNS = 
"druid.parseSpec.columns";
+
+  public static final String DRUID_PARSE_SPEC_SKIP_HEADER_ROWS = 
"druid.parseSpec.skipHeaderRows";
+
+  public static final String DRUID_PARSE_SPEC_HAS_HEADER_ROWS = 
"druid.parseSpec.hasHeaderRows";
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java
new file mode 100644
index 0000000..3a1dbf7
--- /dev/null
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.druid.json;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.avro.generic.GenericRecord;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = "schema_inline", value = 
InlineSchemaAvroBytesDecoder.class)
+})
+public interface AvroBytesDecoder
+{
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java
new file mode 100644
index 0000000..af71f9a
--- /dev/null
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.druid.json;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.druid.data.input.impl.DimensionsSpec;
+import io.druid.data.input.impl.ParseSpec;
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.java.util.common.parsers.JSONPathSpec;
+import io.druid.java.util.common.parsers.Parser;
+
+import java.util.Objects;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public class AvroParseSpec extends ParseSpec
+{
+
+  @JsonIgnore
+  private final JSONPathSpec flattenSpec;
+
+  @JsonCreator
+  public AvroParseSpec(
+      @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
+      @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
+      @JsonProperty("flattenSpec") JSONPathSpec flattenSpec
+  )
+  {
+    super(
+        timestampSpec != null ? timestampSpec : new TimestampSpec(null, null, 
null),
+        dimensionsSpec != null ? dimensionsSpec : new DimensionsSpec(null, 
null, null)
+    );
+
+    this.flattenSpec = flattenSpec != null ? flattenSpec : 
JSONPathSpec.DEFAULT;
+  }
+
+  @JsonProperty
+  public JSONPathSpec getFlattenSpec()
+  {
+    return flattenSpec;
+  }
+
+  @Override
+  public Parser<String, Object> makeParser()
+  {
+    // makeParser is only used by StringInputRowParser, which cannot parse 
avro anyway.
+    throw new UnsupportedOperationException("makeParser not supported");
+  }
+
+  @Override
+  public ParseSpec withTimestampSpec(TimestampSpec spec)
+  {
+    return new AvroParseSpec(spec, getDimensionsSpec(), flattenSpec);
+  }
+
+  @Override
+  public ParseSpec withDimensionsSpec(DimensionsSpec spec)
+  {
+    return new AvroParseSpec(getTimestampSpec(), spec, flattenSpec);
+  }
+
+  @Override
+  public boolean equals(final Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    final AvroParseSpec that = (AvroParseSpec) o;
+    return Objects.equals(flattenSpec, that.flattenSpec);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), flattenSpec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java
new file mode 100644
index 0000000..d6e6624
--- /dev/null
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.druid.json;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import io.druid.data.input.ByteBufferInputRowParser;
+import io.druid.data.input.InputRow;
+import io.druid.data.input.impl.ParseSpec;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public class AvroStreamInputRowParser implements ByteBufferInputRowParser
+{
+  private final ParseSpec parseSpec;
+  private final AvroBytesDecoder avroBytesDecoder;
+
+  @JsonCreator
+  public AvroStreamInputRowParser(
+      @JsonProperty("parseSpec") ParseSpec parseSpec,
+      @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder
+  )
+  {
+    this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec");
+    this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, 
"avroBytesDecoder");
+  }
+
+  @Override
+  public List<InputRow> parseBatch(ByteBuffer input)
+  {
+    throw new UnsupportedOperationException("This class is only used for JSON 
serde");
+  }
+
+  @JsonProperty
+  @Override
+  public ParseSpec getParseSpec()
+  {
+    return parseSpec;
+  }
+
+  @JsonProperty
+  public AvroBytesDecoder getAvroBytesDecoder()
+  {
+    return avroBytesDecoder;
+  }
+
+  @Override
+  public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec)
+  {
+    return new AvroStreamInputRowParser(
+        parseSpec,
+        avroBytesDecoder
+    );
+  }
+
+  @Override
+  public boolean equals(final Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final AvroStreamInputRowParser that = (AvroStreamInputRowParser) o;
+    return Objects.equals(parseSpec, that.parseSpec) &&
+           Objects.equals(avroBytesDecoder, that.avroBytesDecoder);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(parseSpec, avroBytesDecoder);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java
new file mode 100644
index 0000000..72d6cbb
--- /dev/null
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.druid.json;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import java.util.Map;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder
+{
+  private final Map<String, Object> schema;
+
+  @JsonCreator
+  public InlineSchemaAvroBytesDecoder(
+      @JsonProperty("schema") Map<String, Object> schema
+  )
+  {
+    Preconditions.checkArgument(schema != null, "schema must be provided");
+
+    this.schema = schema;
+  }
+
+  @JsonProperty
+  public Map<String, Object> getSchema()
+  {
+    return schema;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/ql/src/test/queries/clientpositive/druidkafkamini_avro.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_avro.q 
b/ql/src/test/queries/clientpositive/druidkafkamini_avro.q
new file mode 100644
index 0000000..183491c
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/druidkafkamini_avro.q
@@ -0,0 +1,99 @@
+SET hive.vectorized.execution.enabled=false;
+
+CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` 
string, `user` string, `language` string,
+                                            `country` string,`continent` 
string, `namespace` string, `newPage` boolean, `unpatrolled` boolean,
+                                            `anonymous` boolean, `robot` 
boolean, added int, deleted int, delta bigint)
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "MONTH",
+        "druid.query.granularity" = "MINUTE",
+        "kafka.bootstrap.servers" = "localhost:9092",
+        "kafka.topic" = "wiki_kafka_avro_table",
+        "druid.kafka.ingestion.useEarliestOffset" = "true",
+        "druid.kafka.ingestion.maxRowsInMemory" = "5",
+        "druid.kafka.ingestion.startDelay" = "PT1S",
+        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.period" = "PT5S",
+        "druid.kafka.ingestion.consumer.retries" = "2",
+        "druid.kafka.ingestion.reportParseExceptions" = "true",
+        "druid.timestamp.column" = "timestamp",
+        "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss",
+        "druid.parseSpec.format" = "avro",
+        'avro.schema.literal'='{
+          "type" : "record",
+          "name" : "Wikipedia",
+          "namespace" : "org.apache.hive.kafka",
+          "version": "1",
+          "fields" : [ {
+            "name" : "isrobot",
+            "type" : "boolean"
+          }, {
+            "name" : "channel",
+            "type" : "string"
+          }, {
+            "name" : "timestamp",
+            "type" : "string"
+          }, {
+            "name" : "flags",
+            "type" : "string"
+          }, {
+            "name" : "isunpatrolled",
+            "type" : "boolean"
+          }, {
+            "name" : "page",
+            "type" : "string"
+          }, {
+            "name" : "diffurl",
+            "type" : "string"
+          }, {
+            "name" : "added",
+            "type" : "long"
+          }, {
+            "name" : "comment",
+            "type" : "string"
+          }, {
+            "name" : "commentlength",
+            "type" : "long"
+          }, {
+            "name" : "isnew",
+            "type" : "boolean"
+          }, {
+            "name" : "isminor",
+            "type" : "boolean"
+          }, {
+            "name" : "delta",
+            "type" : "long"
+          }, {
+            "name" : "isanonymous",
+            "type" : "boolean"
+          }, {
+            "name" : "user",
+            "type" : "string"
+          }, {
+            "name" : "deltabucket",
+            "type" : "double"
+          }, {
+            "name" : "deleted",
+            "type" : "long"
+          }, {
+            "name" : "namespace",
+            "type" : "string"
+          } ]
+        }'
+        );
+
+ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 
'START');
+
+!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor;
+
+-- Sleep for some time for ingestion tasks to ingest events
+!sleep 60;
+
+DESCRIBE druid_kafka_test_avro;
+DESCRIBE EXTENDED druid_kafka_test_avro;
+
+Select count(*) FROM druid_kafka_test_avro;
+
+Select page FROM druid_kafka_test_avro;
+
+DROP TABLE druid_kafka_test_avro;

http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/ql/src/test/queries/clientpositive/druidkafkamini_csv.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_csv.q 
b/ql/src/test/queries/clientpositive/druidkafkamini_csv.q
new file mode 100644
index 0000000..34be462
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/druidkafkamini_csv.q
@@ -0,0 +1,37 @@
+SET hive.vectorized.execution.enabled=false;
+
+CREATE EXTERNAL TABLE druid_kafka_test_csv(`__time` timestamp , `page` string, 
`user` string, `language` string,
+                                            `country` string,`continent` 
string, `namespace` string, `newpage` boolean, `unpatrolled` boolean,
+                                            `anonymous` boolean, `robot` 
boolean, added int, deleted int, delta bigint)
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "MONTH",
+        "druid.query.granularity" = "MINUTE",
+        "kafka.bootstrap.servers" = "localhost:9092",
+        "kafka.topic" = "wiki_kafka_csv",
+        "druid.kafka.ingestion.useEarliestOffset" = "true",
+        "druid.kafka.ingestion.maxRowsInMemory" = "5",
+        "druid.kafka.ingestion.startDelay" = "PT1S",
+        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.period" = "PT5S",
+        "druid.kafka.ingestion.consumer.retries" = "2",
+        "druid.kafka.ingestion.reportParseExceptions" = "true",
+        "druid.parseSpec.format" = "csv",
+        "druid.parseSpec.columns" = 
"__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta"
+        );
+
+ALTER TABLE druid_kafka_test_csv SET TBLPROPERTIES('druid.kafka.ingestion' = 
'START');
+
+!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor;
+
+-- Sleep for some time for ingestion tasks to ingest events
+!sleep 60;
+
+DESCRIBE druid_kafka_test_csv;
+DESCRIBE EXTENDED druid_kafka_test_csv;
+
+Select count(*) FROM druid_kafka_test_csv;
+
+Select page FROM druid_kafka_test_csv;
+
+DROP TABLE druid_kafka_test_csv;

http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q 
b/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q
new file mode 100644
index 0000000..91e279d
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q
@@ -0,0 +1,38 @@
+SET hive.vectorized.execution.enabled=false;
+
+CREATE EXTERNAL TABLE druid_kafka_test_delimited(`__time` timestamp , `page` 
string, `user` string, `language` string,
+                                            `country` string,`continent` 
string, `namespace` string, `newpage` boolean, `unpatrolled` boolean,
+                                            `anonymous` boolean, `robot` 
boolean, added int, deleted int, delta bigint)
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "MONTH",
+        "druid.query.granularity" = "MINUTE",
+        "kafka.bootstrap.servers" = "localhost:9092",
+        "kafka.topic" = "wiki_kafka_csv",
+        "druid.kafka.ingestion.useEarliestOffset" = "true",
+        "druid.kafka.ingestion.maxRowsInMemory" = "5",
+        "druid.kafka.ingestion.startDelay" = "PT1S",
+        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.period" = "PT5S",
+        "druid.kafka.ingestion.consumer.retries" = "2",
+        "druid.kafka.ingestion.reportParseExceptions" = "true",
+        "druid.parseSpec.format" = "delimited",
+        "druid.parseSpec.columns" = 
"__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta",
+        "druid.parseSpec.delimiter"=","
+        );
+
+ALTER TABLE druid_kafka_test_delimited SET 
TBLPROPERTIES('druid.kafka.ingestion' = 'START');
+
+!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor;
+
+-- Sleep for some time for ingestion tasks to ingest events
+!sleep 60;
+
+DESCRIBE druid_kafka_test_delimited;
+DESCRIBE EXTENDED druid_kafka_test_delimited;
+
+Select count(*) FROM druid_kafka_test_delimited;
+
+Select page FROM druid_kafka_test_delimited;
+
+DROP TABLE druid_kafka_test_delimited;

http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out 
b/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out
new file mode 100644
index 0000000..d33dd4c
--- /dev/null
+++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out
@@ -0,0 +1,263 @@
+PREHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp 
, `page` string, `user` string, `language` string,
+                                            `country` string,`continent` 
string, `namespace` string, `newPage` boolean, `unpatrolled` boolean,
+                                            `anonymous` boolean, `robot` 
boolean, added int, deleted int, delta bigint)
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "MONTH",
+        "druid.query.granularity" = "MINUTE",
+        "kafka.bootstrap.servers" = "localhost:9092",
+        "kafka.topic" = "wiki_kafka_avro_table",
+        "druid.kafka.ingestion.useEarliestOffset" = "true",
+        "druid.kafka.ingestion.maxRowsInMemory" = "5",
+        "druid.kafka.ingestion.startDelay" = "PT1S",
+        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.period" = "PT5S",
+        "druid.kafka.ingestion.consumer.retries" = "2",
+        "druid.kafka.ingestion.reportParseExceptions" = "true",
+        "druid.timestamp.column" = "timestamp",
+        "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss",
+        "druid.parseSpec.format" = "avro",
+        'avro.schema.literal'='{
+          "type" : "record",
+          "name" : "Wikipedia",
+          "namespace" : "org.apache.hive.kafka",
+          "version": "1",
+          "fields" : [ {
+            "name" : "isrobot",
+            "type" : "boolean"
+          }, {
+            "name" : "channel",
+            "type" : "string"
+          }, {
+            "name" : "timestamp",
+            "type" : "string"
+          }, {
+            "name" : "flags",
+            "type" : "string"
+          }, {
+            "name" : "isunpatrolled",
+            "type" : "boolean"
+          }, {
+            "name" : "page",
+            "type" : "string"
+          }, {
+            "name" : "diffurl",
+            "type" : "string"
+          }, {
+            "name" : "added",
+            "type" : "long"
+          }, {
+            "name" : "comment",
+            "type" : "string"
+          }, {
+            "name" : "commentlength",
+            "type" : "long"
+          }, {
+            "name" : "isnew",
+            "type" : "boolean"
+          }, {
+            "name" : "isminor",
+            "type" : "boolean"
+          }, {
+            "name" : "delta",
+            "type" : "long"
+          }, {
+            "name" : "isanonymous",
+            "type" : "boolean"
+          }, {
+            "name" : "user",
+            "type" : "string"
+          }, {
+            "name" : "deltabucket",
+            "type" : "double"
+          }, {
+            "name" : "deleted",
+            "type" : "long"
+          }, {
+            "name" : "namespace",
+            "type" : "string"
+          } ]
+        }'
+        )
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@druid_kafka_test_avro
+POSTHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` 
timestamp , `page` string, `user` string, `language` string,
+                                            `country` string,`continent` 
string, `namespace` string, `newPage` boolean, `unpatrolled` boolean,
+                                            `anonymous` boolean, `robot` 
boolean, added int, deleted int, delta bigint)
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "MONTH",
+        "druid.query.granularity" = "MINUTE",
+        "kafka.bootstrap.servers" = "localhost:9092",
+        "kafka.topic" = "wiki_kafka_avro_table",
+        "druid.kafka.ingestion.useEarliestOffset" = "true",
+        "druid.kafka.ingestion.maxRowsInMemory" = "5",
+        "druid.kafka.ingestion.startDelay" = "PT1S",
+        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.period" = "PT5S",
+        "druid.kafka.ingestion.consumer.retries" = "2",
+        "druid.kafka.ingestion.reportParseExceptions" = "true",
+        "druid.timestamp.column" = "timestamp",
+        "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss",
+        "druid.parseSpec.format" = "avro",
+        'avro.schema.literal'='{
+          "type" : "record",
+          "name" : "Wikipedia",
+          "namespace" : "org.apache.hive.kafka",
+          "version": "1",
+          "fields" : [ {
+            "name" : "isrobot",
+            "type" : "boolean"
+          }, {
+            "name" : "channel",
+            "type" : "string"
+          }, {
+            "name" : "timestamp",
+            "type" : "string"
+          }, {
+            "name" : "flags",
+            "type" : "string"
+          }, {
+            "name" : "isunpatrolled",
+            "type" : "boolean"
+          }, {
+            "name" : "page",
+            "type" : "string"
+          }, {
+            "name" : "diffurl",
+            "type" : "string"
+          }, {
+            "name" : "added",
+            "type" : "long"
+          }, {
+            "name" : "comment",
+            "type" : "string"
+          }, {
+            "name" : "commentlength",
+            "type" : "long"
+          }, {
+            "name" : "isnew",
+            "type" : "boolean"
+          }, {
+            "name" : "isminor",
+            "type" : "boolean"
+          }, {
+            "name" : "delta",
+            "type" : "long"
+          }, {
+            "name" : "isanonymous",
+            "type" : "boolean"
+          }, {
+            "name" : "user",
+            "type" : "string"
+          }, {
+            "name" : "deltabucket",
+            "type" : "double"
+          }, {
+            "name" : "deleted",
+            "type" : "long"
+          }, {
+            "name" : "namespace",
+            "type" : "string"
+          } ]
+        }'
+        )
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@druid_kafka_test_avro
+PREHOOK: query: ALTER TABLE druid_kafka_test_avro SET 
TBLPROPERTIES('druid.kafka.ingestion' = 'START')
+PREHOOK: type: ALTERTABLE_PROPERTIES
+PREHOOK: Input: default@druid_kafka_test_avro
+PREHOOK: Output: default@druid_kafka_test_avro
+POSTHOOK: query: ALTER TABLE druid_kafka_test_avro SET 
TBLPROPERTIES('druid.kafka.ingestion' = 'START')
+POSTHOOK: type: ALTERTABLE_PROPERTIES
+POSTHOOK: Input: default@druid_kafka_test_avro
+POSTHOOK: Output: default@druid_kafka_test_avro
+["default.druid_kafka_test_avro"]
+PREHOOK: query: DESCRIBE druid_kafka_test_avro
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@druid_kafka_test_avro
+POSTHOOK: query: DESCRIBE druid_kafka_test_avro
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@druid_kafka_test_avro
+__time                 timestamp               from deserializer   
+page                   string                  from deserializer   
+user                   string                  from deserializer   
+language               string                  from deserializer   
+country                string                  from deserializer   
+continent              string                  from deserializer   
+namespace              string                  from deserializer   
+newpage                boolean                 from deserializer   
+unpatrolled            boolean                 from deserializer   
+anonymous              boolean                 from deserializer   
+robot                  boolean                 from deserializer   
+added                  int                     from deserializer   
+deleted                int                     from deserializer   
+delta                  bigint                  from deserializer   
+PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test_avro
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@druid_kafka_test_avro
+POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test_avro
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@druid_kafka_test_avro
+__time                 timestamp               from deserializer   
+page                   string                  from deserializer   
+user                   string                  from deserializer   
+language               string                  from deserializer   
+country                string                  from deserializer   
+continent              string                  from deserializer   
+namespace              string                  from deserializer   
+newpage                boolean                 from deserializer   
+unpatrolled            boolean                 from deserializer   
+anonymous              boolean                 from deserializer   
+robot                  boolean                 from deserializer   
+added                  int                     from deserializer   
+deleted                int                     from deserializer   
+delta                  bigint                  from deserializer   
+                
+#### A masked pattern was here ####
+StorageHandlerInfo              
+Druid Storage Handler Runtime Status for default.druid_kafka_test_avro         
 
+kafkaPartitions=1               
+activeTasks=[]          
+publishingTasks=[]              
+#### A masked pattern was here ####
+aggregateLag=0          
+#### A masked pattern was here ####
+PREHOOK: query: Select count(*) FROM druid_kafka_test_avro
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_kafka_test_avro
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: Select count(*) FROM druid_kafka_test_avro
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_kafka_test_avro
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+11
+PREHOOK: query: Select page FROM druid_kafka_test_avro
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_kafka_test_avro
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: Select page FROM druid_kafka_test_avro
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_kafka_test_avro
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+page is 0
+page is 100
+page is 200
+page is 300
+page is 400
+page is 500
+page is 600
+page is 700
+page is 800
+page is 900
+page is 1000
+PREHOOK: query: DROP TABLE druid_kafka_test_avro
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@druid_kafka_test_avro
+PREHOOK: Output: default@druid_kafka_test_avro
+POSTHOOK: query: DROP TABLE druid_kafka_test_avro
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@druid_kafka_test_avro
+POSTHOOK: Output: default@druid_kafka_test_avro

http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out 
b/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out
new file mode 100644
index 0000000..2f5817a
--- /dev/null
+++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out
@@ -0,0 +1,138 @@
+PREHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_csv(`__time` timestamp 
, `page` string, `user` string, `language` string,
+                                            `country` string,`continent` 
string, `namespace` string, `newpage` boolean, `unpatrolled` boolean,
+                                            `anonymous` boolean, `robot` 
boolean, added int, deleted int, delta bigint)
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "MONTH",
+        "druid.query.granularity" = "MINUTE",
+        "kafka.bootstrap.servers" = "localhost:9092",
+        "kafka.topic" = "wiki_kafka_csv",
+        "druid.kafka.ingestion.useEarliestOffset" = "true",
+        "druid.kafka.ingestion.maxRowsInMemory" = "5",
+        "druid.kafka.ingestion.startDelay" = "PT1S",
+        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.period" = "PT5S",
+        "druid.kafka.ingestion.consumer.retries" = "2",
+        "druid.kafka.ingestion.reportParseExceptions" = "true",
+        "druid.parseSpec.format" = "csv",
+        "druid.parseSpec.columns" = 
"__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta"
+        )
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@druid_kafka_test_csv
+POSTHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_csv(`__time` timestamp 
, `page` string, `user` string, `language` string,
+                                            `country` string,`continent` 
string, `namespace` string, `newpage` boolean, `unpatrolled` boolean,
+                                            `anonymous` boolean, `robot` 
boolean, added int, deleted int, delta bigint)
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "MONTH",
+        "druid.query.granularity" = "MINUTE",
+        "kafka.bootstrap.servers" = "localhost:9092",
+        "kafka.topic" = "wiki_kafka_csv",
+        "druid.kafka.ingestion.useEarliestOffset" = "true",
+        "druid.kafka.ingestion.maxRowsInMemory" = "5",
+        "druid.kafka.ingestion.startDelay" = "PT1S",
+        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.period" = "PT5S",
+        "druid.kafka.ingestion.consumer.retries" = "2",
+        "druid.kafka.ingestion.reportParseExceptions" = "true",
+        "druid.parseSpec.format" = "csv",
+        "druid.parseSpec.columns" = 
"__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta"
+        )
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@druid_kafka_test_csv
+PREHOOK: query: ALTER TABLE druid_kafka_test_csv SET 
TBLPROPERTIES('druid.kafka.ingestion' = 'START')
+PREHOOK: type: ALTERTABLE_PROPERTIES
+PREHOOK: Input: default@druid_kafka_test_csv
+PREHOOK: Output: default@druid_kafka_test_csv
+POSTHOOK: query: ALTER TABLE druid_kafka_test_csv SET 
TBLPROPERTIES('druid.kafka.ingestion' = 'START')
+POSTHOOK: type: ALTERTABLE_PROPERTIES
+POSTHOOK: Input: default@druid_kafka_test_csv
+POSTHOOK: Output: default@druid_kafka_test_csv
+["default.druid_kafka_test_csv"]
+PREHOOK: query: DESCRIBE druid_kafka_test_csv
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@druid_kafka_test_csv
+POSTHOOK: query: DESCRIBE druid_kafka_test_csv
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@druid_kafka_test_csv
+__time                 timestamp               from deserializer   
+page                   string                  from deserializer   
+user                   string                  from deserializer   
+language               string                  from deserializer   
+country                string                  from deserializer   
+continent              string                  from deserializer   
+namespace              string                  from deserializer   
+newpage                boolean                 from deserializer   
+unpatrolled            boolean                 from deserializer   
+anonymous              boolean                 from deserializer   
+robot                  boolean                 from deserializer   
+added                  int                     from deserializer   
+deleted                int                     from deserializer   
+delta                  bigint                  from deserializer   
+PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test_csv
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@druid_kafka_test_csv
+POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test_csv
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@druid_kafka_test_csv
+__time                 timestamp               from deserializer   
+page                   string                  from deserializer   
+user                   string                  from deserializer   
+language               string                  from deserializer   
+country                string                  from deserializer   
+continent              string                  from deserializer   
+namespace              string                  from deserializer   
+newpage                boolean                 from deserializer   
+unpatrolled            boolean                 from deserializer   
+anonymous              boolean                 from deserializer   
+robot                  boolean                 from deserializer   
+added                  int                     from deserializer   
+deleted                int                     from deserializer   
+delta                  bigint                  from deserializer   
+                
+#### A masked pattern was here ####
+StorageHandlerInfo              
+Druid Storage Handler Runtime Status for default.druid_kafka_test_csv          
 
+kafkaPartitions=1               
+activeTasks=[]          
+publishingTasks=[]              
+#### A masked pattern was here ####
+aggregateLag=0          
+#### A masked pattern was here ####
+PREHOOK: query: Select count(*) FROM druid_kafka_test_csv
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_kafka_test_csv
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: Select count(*) FROM druid_kafka_test_csv
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_kafka_test_csv
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+10
+PREHOOK: query: Select page FROM druid_kafka_test_csv
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_kafka_test_csv
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: Select page FROM druid_kafka_test_csv
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_kafka_test_csv
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+Gypsy Danger
+Striker Eureka
+Cherno Alpha
+Crimson Typhoon
+Coyote Tango
+Gypsy Danger
+Striker Eureka
+Cherno Alpha
+Crimson Typhoon
+Coyote Tango
+PREHOOK: query: DROP TABLE druid_kafka_test_csv
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@druid_kafka_test_csv
+PREHOOK: Output: default@druid_kafka_test_csv
+POSTHOOK: query: DROP TABLE druid_kafka_test_csv
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@druid_kafka_test_csv
+POSTHOOK: Output: default@druid_kafka_test_csv

http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out 
b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out
new file mode 100644
index 0000000..f6a417b
--- /dev/null
+++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out
@@ -0,0 +1,140 @@
+PREHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_delimited(`__time` 
timestamp , `page` string, `user` string, `language` string,
+                                            `country` string,`continent` 
string, `namespace` string, `newpage` boolean, `unpatrolled` boolean,
+                                            `anonymous` boolean, `robot` 
boolean, added int, deleted int, delta bigint)
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "MONTH",
+        "druid.query.granularity" = "MINUTE",
+        "kafka.bootstrap.servers" = "localhost:9092",
+        "kafka.topic" = "wiki_kafka_csv",
+        "druid.kafka.ingestion.useEarliestOffset" = "true",
+        "druid.kafka.ingestion.maxRowsInMemory" = "5",
+        "druid.kafka.ingestion.startDelay" = "PT1S",
+        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.period" = "PT5S",
+        "druid.kafka.ingestion.consumer.retries" = "2",
+        "druid.kafka.ingestion.reportParseExceptions" = "true",
+        "druid.parseSpec.format" = "delimited",
+        "druid.parseSpec.columns" = 
"__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta",
+        "druid.parseSpec.delimiter"=","
+        )
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@druid_kafka_test_delimited
+POSTHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_delimited(`__time` 
timestamp , `page` string, `user` string, `language` string,
+                                            `country` string,`continent` 
string, `namespace` string, `newpage` boolean, `unpatrolled` boolean,
+                                            `anonymous` boolean, `robot` 
boolean, added int, deleted int, delta bigint)
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "MONTH",
+        "druid.query.granularity" = "MINUTE",
+        "kafka.bootstrap.servers" = "localhost:9092",
+        "kafka.topic" = "wiki_kafka_csv",
+        "druid.kafka.ingestion.useEarliestOffset" = "true",
+        "druid.kafka.ingestion.maxRowsInMemory" = "5",
+        "druid.kafka.ingestion.startDelay" = "PT1S",
+        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.period" = "PT5S",
+        "druid.kafka.ingestion.consumer.retries" = "2",
+        "druid.kafka.ingestion.reportParseExceptions" = "true",
+        "druid.parseSpec.format" = "delimited",
+        "druid.parseSpec.columns" = 
"__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta",
+        "druid.parseSpec.delimiter"=","
+        )
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@druid_kafka_test_delimited
+PREHOOK: query: ALTER TABLE druid_kafka_test_delimited SET 
TBLPROPERTIES('druid.kafka.ingestion' = 'START')
+PREHOOK: type: ALTERTABLE_PROPERTIES
+PREHOOK: Input: default@druid_kafka_test_delimited
+PREHOOK: Output: default@druid_kafka_test_delimited
+POSTHOOK: query: ALTER TABLE druid_kafka_test_delimited SET 
TBLPROPERTIES('druid.kafka.ingestion' = 'START')
+POSTHOOK: type: ALTERTABLE_PROPERTIES
+POSTHOOK: Input: default@druid_kafka_test_delimited
+POSTHOOK: Output: default@druid_kafka_test_delimited
+["default.druid_kafka_test_delimited"]
+PREHOOK: query: DESCRIBE druid_kafka_test_delimited
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@druid_kafka_test_delimited
+POSTHOOK: query: DESCRIBE druid_kafka_test_delimited
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@druid_kafka_test_delimited
+__time                 timestamp               from deserializer   
+page                   string                  from deserializer   
+user                   string                  from deserializer   
+language               string                  from deserializer   
+country                string                  from deserializer   
+continent              string                  from deserializer   
+namespace              string                  from deserializer   
+newpage                boolean                 from deserializer   
+unpatrolled            boolean                 from deserializer   
+anonymous              boolean                 from deserializer   
+robot                  boolean                 from deserializer   
+added                  int                     from deserializer   
+deleted                int                     from deserializer   
+delta                  bigint                  from deserializer   
+PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test_delimited
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@druid_kafka_test_delimited
+POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test_delimited
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@druid_kafka_test_delimited
+__time                 timestamp               from deserializer   
+page                   string                  from deserializer   
+user                   string                  from deserializer   
+language               string                  from deserializer   
+country                string                  from deserializer   
+continent              string                  from deserializer   
+namespace              string                  from deserializer   
+newpage                boolean                 from deserializer   
+unpatrolled            boolean                 from deserializer   
+anonymous              boolean                 from deserializer   
+robot                  boolean                 from deserializer   
+added                  int                     from deserializer   
+deleted                int                     from deserializer   
+delta                  bigint                  from deserializer   
+                
+#### A masked pattern was here ####
+StorageHandlerInfo              
+Druid Storage Handler Runtime Status for default.druid_kafka_test_delimited    
         
+kafkaPartitions=1               
+activeTasks=[]          
+publishingTasks=[]              
+#### A masked pattern was here ####
+aggregateLag=0          
+#### A masked pattern was here ####
+PREHOOK: query: Select count(*) FROM druid_kafka_test_delimited
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_kafka_test_delimited
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: Select count(*) FROM druid_kafka_test_delimited
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_kafka_test_delimited
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+10
+PREHOOK: query: Select page FROM druid_kafka_test_delimited
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_kafka_test_delimited
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: Select page FROM druid_kafka_test_delimited
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_kafka_test_delimited
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+ "Gypsy Danger"
+"Striker Eureka"
+"Cherno Alpha"
+"Crimson Typhoon"
+"Coyote Tango"
+"Gypsy Danger"
+"Striker Eureka"
+"Cherno Alpha"
+"Crimson Typhoon"
+"Coyote Tango"
+PREHOOK: query: DROP TABLE druid_kafka_test_delimited
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@druid_kafka_test_delimited
+PREHOOK: Output: default@druid_kafka_test_delimited
+POSTHOOK: query: DROP TABLE druid_kafka_test_delimited
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@druid_kafka_test_delimited
+POSTHOOK: Output: default@druid_kafka_test_delimited

Reply via email to