Repository: hive
Updated Branches:
  refs/heads/master d813b487a -> 240da8403


HIVE-19026: Add support for more ingestion formats - Druid Kafka Indexing 
(Nishant Bangarwa reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/240da840
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/240da840
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/240da840

Branch: refs/heads/master
Commit: 240da840334c282ebc62e1208a4b4822b0823079
Parents: d813b48
Author: Nishant <nishant.mon...@gmail.com>
Authored: Wed Nov 14 18:09:48 2018 +0530
Committer: Nishant <nishant.mon...@gmail.com>
Committed: Wed Nov 14 18:10:01 2018 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/Constants.java  |   1 +
 .../hadoop/hive/druid/DruidKafkaUtils.java      | 123 +++++++++++++++----
 .../hadoop/hive/druid/DruidStorageHandler.java  |  45 ++++---
 .../hive/druid/DruidStorageHandlerUtils.java    |  60 +++++----
 .../hadoop/hive/druid/io/DruidOutputFormat.java |  13 +-
 .../druid/io/DruidQueryBasedInputFormat.java    |   3 +-
 .../hadoop/hive/druid/io/DruidRecordWriter.java |   3 +-
 .../serde/DruidGroupByQueryRecordReader.java    |   5 +-
 .../serde/DruidSelectQueryRecordReader.java     |   5 +-
 .../hadoop/hive/druid/serde/DruidSerDe.java     |   7 +-
 .../serde/DruidTimeseriesQueryRecordReader.java |   5 +-
 .../hive/ql/io/TestDruidRecordWriter.java       |  15 +--
 itests/qtest-druid/pom.xml                      |  11 ++
 .../org/apache/hive/druid/MiniDruidCluster.java |   3 +-
 .../test/resources/testconfiguration.properties |   3 +
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |   4 +
 16 files changed, 214 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java 
b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 44d0717..ee954d9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -32,6 +32,7 @@ public class Constants {
           "org.apache.hadoop.hive.druid.io.DruidOutputFormat";
   public static final String DRUID_DATA_SOURCE = "druid.datasource";
   public static final String DRUID_SEGMENT_GRANULARITY = 
"druid.segment.granularity";
+
   public static final String DRUID_TARGET_SHARDS_PER_GRANULARITY =
       "druid.segment.targetShardsPerGranularity";
   public static final String DRUID_TIMESTAMP_GRANULARITY_COL_NAME = 
"__time_granularity";

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java
index b99c333..e0e29a3 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java
@@ -18,12 +18,25 @@
 
 package org.apache.hadoop.hive.druid;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import io.druid.data.input.impl.CSVParseSpec;
+import io.druid.data.input.impl.DelimitedParseSpec;
+import io.druid.data.input.impl.DimensionsSpec;
+import io.druid.data.input.impl.InputRowParser;
+import io.druid.data.input.impl.JSONParseSpec;
+import io.druid.data.input.impl.StringInputRowParser;
+import io.druid.data.input.impl.TimestampSpec;
 import io.druid.java.util.http.client.Request;
 import io.druid.java.util.http.client.response.FullResponseHandler;
 import io.druid.java.util.http.client.response.FullResponseHolder;
 import io.druid.segment.IndexSpec;
 import io.druid.segment.indexing.DataSchema;
+import org.apache.hadoop.hive.druid.conf.DruidConstants;
+import org.apache.hadoop.hive.druid.json.AvroParseSpec;
+import org.apache.hadoop.hive.druid.json.AvroStreamInputRowParser;
+import org.apache.hadoop.hive.druid.json.InlineSchemaAvroBytesDecoder;
 import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig;
 import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec;
 import org.apache.hadoop.hive.druid.json.KafkaSupervisorTuningConfig;
@@ -59,60 +72,60 @@ final class DruidKafkaUtils {
       IndexSpec indexSpec) {
     return new KafkaSupervisorSpec(dataSchema,
         new 
KafkaSupervisorTuningConfig(DruidStorageHandlerUtils.getIntegerProperty(table,
-            DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"maxRowsInMemory"),
+            DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"maxRowsInMemory"),
             DruidStorageHandlerUtils.getIntegerProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "maxRowsPerSegment"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"maxRowsPerSegment"),
             DruidStorageHandlerUtils.getPeriodProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "intermediatePersistPeriod"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"intermediatePersistPeriod"),
             null,
             // basePersistDirectory - use druid default, no need to be 
configured by user
             DruidStorageHandlerUtils.getIntegerProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "maxPendingPersists"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"maxPendingPersists"),
             indexSpec,
             null,
             // buildV9Directly - use druid default, no need to be configured 
by user
             DruidStorageHandlerUtils.getBooleanProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "reportParseExceptions"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"reportParseExceptions"),
             DruidStorageHandlerUtils.getLongProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "handoffConditionTimeout"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"handoffConditionTimeout"),
             DruidStorageHandlerUtils.getBooleanProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "resetOffsetAutomatically"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"resetOffsetAutomatically"),
             DruidStorageHandlerUtils.getIntegerProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "workerThreads"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"workerThreads"),
             DruidStorageHandlerUtils.getIntegerProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "chatThreads"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"chatThreads"),
             DruidStorageHandlerUtils.getLongProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "chatRetries"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"chatRetries"),
             DruidStorageHandlerUtils.getPeriodProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "httpTimeout"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"httpTimeout"),
             DruidStorageHandlerUtils.getPeriodProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "shutdownTimeout"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"shutdownTimeout"),
             DruidStorageHandlerUtils.getPeriodProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "offsetFetchPeriod")),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"offsetFetchPeriod")),
         new KafkaSupervisorIOConfig(kafkaTopic,
             // Mandatory Property
             DruidStorageHandlerUtils.getIntegerProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "replicas"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"replicas"),
             DruidStorageHandlerUtils.getIntegerProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "taskCount"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"taskCount"),
             DruidStorageHandlerUtils.getPeriodProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "taskDuration"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"taskDuration"),
             getKafkaConsumerProperties(table, kafkaServers),
             // Mandatory Property
             DruidStorageHandlerUtils.getPeriodProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "startDelay"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"startDelay"),
             DruidStorageHandlerUtils.getPeriodProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "period"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"period"),
             DruidStorageHandlerUtils.getBooleanProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "useEarliestOffset"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"useEarliestOffset"),
             DruidStorageHandlerUtils.getPeriodProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "completionTimeout"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"completionTimeout"),
             DruidStorageHandlerUtils.getPeriodProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "lateMessageRejectionPeriod"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"lateMessageRejectionPeriod"),
             DruidStorageHandlerUtils.getPeriodProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "earlyMessageRejectionPeriod"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"earlyMessageRejectionPeriod"),
             DruidStorageHandlerUtils.getBooleanProperty(table,
-                DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX 
+ "skipOffsetGaps")),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + 
"skipOffsetGaps")),
         new HashMap<>());
   }
 
@@ -120,10 +133,10 @@ final class DruidKafkaUtils {
     ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
     builder.put(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY, kafkaServers);
     for (Map.Entry<String, String> entry : table.getParameters().entrySet()) {
-      if 
(entry.getKey().startsWith(DruidStorageHandlerUtils.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX))
 {
+      if 
(entry.getKey().startsWith(DruidConstants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX))
 {
         String
             propertyName =
-            
entry.getKey().substring(DruidStorageHandlerUtils.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX.length());
+            
entry.getKey().substring(DruidConstants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX.length());
         builder.put(propertyName, entry.getValue());
       }
     }
@@ -162,6 +175,64 @@ final class DruidKafkaUtils {
 
   static boolean isKafkaStreamingTable(Table table) {
     // For kafka Streaming tables it is mandatory to set a kafka topic.
-    return DruidStorageHandlerUtils.getTableProperty(table, 
DruidStorageHandlerUtils.KAFKA_TOPIC) != null;
+    return DruidStorageHandlerUtils.getTableProperty(table, 
DruidConstants.KAFKA_TOPIC) != null;
+  }
+
+  static InputRowParser getInputRowParser(Table table,
+          TimestampSpec timestampSpec,
+          DimensionsSpec dimensionsSpec
+  ) {
+    String parseSpecFormat = DruidStorageHandlerUtils.getTableProperty(table, 
DruidConstants.DRUID_PARSE_SPEC_FORMAT);
+
+    // Default case JSON
+    if(parseSpecFormat == null || parseSpecFormat.equalsIgnoreCase("json")) {
+      return new StringInputRowParser(
+              new JSONParseSpec(timestampSpec,
+                      dimensionsSpec,
+                      null,
+                      null
+              ), "UTF-8");
+    } else if(parseSpecFormat.equalsIgnoreCase("csv")){
+      return new StringInputRowParser(
+              new CSVParseSpec(
+                      timestampSpec,
+                      dimensionsSpec,
+                      DruidStorageHandlerUtils.getTableProperty(table, 
DruidConstants.DRUID_PARSE_SPEC_LIST_DELIMITER),
+                      DruidStorageHandlerUtils.getListProperty(table, 
DruidConstants.DRUID_PARSE_SPEC_COLUMNS),
+                      DruidStorageHandlerUtils.getBooleanProperty(table, 
DruidConstants.DRUID_PARSE_SPEC_HAS_HEADER_ROWS, false),
+                      DruidStorageHandlerUtils.getIntegerProperty(table, 
DruidConstants.DRUID_PARSE_SPEC_SKIP_HEADER_ROWS, 0)
+              ), "UTF-8");
+    } else if (parseSpecFormat.equalsIgnoreCase("delimited")){
+      return new StringInputRowParser(
+              new DelimitedParseSpec(
+                      timestampSpec,
+                      dimensionsSpec,
+                      DruidStorageHandlerUtils.getTableProperty(table, 
DruidConstants.DRUID_PARSE_SPEC_DELIMITER),
+                      DruidStorageHandlerUtils.getTableProperty(table, 
DruidConstants.DRUID_PARSE_SPEC_LIST_DELIMITER),
+                      DruidStorageHandlerUtils.getListProperty(table, 
DruidConstants.DRUID_PARSE_SPEC_COLUMNS),
+                      DruidStorageHandlerUtils.getBooleanProperty(table, 
DruidConstants.DRUID_PARSE_SPEC_HAS_HEADER_ROWS, false),
+                      DruidStorageHandlerUtils.getIntegerProperty(table, 
DruidConstants.DRUID_PARSE_SPEC_SKIP_HEADER_ROWS, 0)
+              ), "UTF-8");
+    } else if(parseSpecFormat.equalsIgnoreCase("avro")) {
+      try {
+        String avroSchemaLiteral = 
DruidStorageHandlerUtils.getTableProperty(table, 
DruidConstants.AVRO_SCHEMA_LITERAL);
+        Preconditions.checkNotNull(avroSchemaLiteral,
+                "Please specify avro schema literal when using avro parser"
+        );
+        Map<String, Object> avroSchema = JSON_MAPPER
+                .readValue(avroSchemaLiteral, new TypeReference<Map<String, 
Object>>() {
+                });
+        return new AvroStreamInputRowParser(new AvroParseSpec(
+                timestampSpec,
+                dimensionsSpec,
+                null
+        ), new InlineSchemaAvroBytesDecoder(avroSchema));
+      } catch (Exception e) {
+        throw new IllegalStateException("Exception while creating avro 
schema", e);
+      }
+    }
+
+    throw new IllegalArgumentException("Invalid parse spec format [" + 
parseSpecFormat+"]. "
+            + "Supported types are : json, csv, tsv, avro");
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index 1c52ae6..7434559 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -30,8 +30,6 @@ import com.google.common.collect.Sets;
 import io.druid.data.input.impl.DimensionSchema;
 import io.druid.data.input.impl.DimensionsSpec;
 import io.druid.data.input.impl.InputRowParser;
-import io.druid.data.input.impl.JSONParseSpec;
-import io.druid.data.input.impl.StringInputRowParser;
 import io.druid.data.input.impl.TimestampSpec;
 import io.druid.java.util.common.Pair;
 import io.druid.java.util.common.RetryUtils;
@@ -66,6 +64,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.conf.DruidConstants;
 import org.apache.hadoop.hive.druid.io.DruidOutputFormat;
 import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
 import org.apache.hadoop.hive.druid.io.DruidRecordWriter;
@@ -259,11 +258,11 @@ import static 
org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
     final String
         kafkaTopic =
         
Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table,
-            DruidStorageHandlerUtils.KAFKA_TOPIC), "kafka topic is null");
+            DruidConstants.KAFKA_TOPIC), "kafka topic is null");
     final String
         kafkaServers =
         
Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table,
-            DruidStorageHandlerUtils.KAFKA_BOOTSTRAP_SERVERS), "kafka connect 
string is null");
+            DruidConstants.KAFKA_BOOTSTRAP_SERVERS), "kafka connect string is 
null");
 
     Properties tableProperties = new Properties();
     tableProperties.putAll(table.getParameters());
@@ -282,18 +281,26 @@ import static 
org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
     Pair<List<DimensionSchema>, AggregatorFactory[]>
         dimensionsAndAggregates =
         DruidStorageHandlerUtils.getDimensionsAndAggregates(columnNames, 
columnTypes);
-    if 
(!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) {
+    if (!columnNames.contains(DruidConstants.DEFAULT_TIMESTAMP_COLUMN)) {
       throw new IllegalStateException("Timestamp column (' "
-          + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN
+          + DruidConstants.DEFAULT_TIMESTAMP_COLUMN
           + "') not specified in create table; list of columns is : "
           + columnNames);
     }
 
-    final InputRowParser
-        inputRowParser =
-        new StringInputRowParser(new JSONParseSpec(new 
TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
-            "auto",
-            null), new DimensionsSpec(dimensionsAndAggregates.lhs, null, 
null), null, null), "UTF-8");
+    DimensionsSpec dimensionsSpec = new 
DimensionsSpec(dimensionsAndAggregates.lhs, null, null);
+    String timestampFormat = DruidStorageHandlerUtils
+            .getTableProperty(table, DruidConstants.DRUID_TIMESTAMP_FORMAT);
+    String timestampColumnName = DruidStorageHandlerUtils
+            .getTableProperty(table, DruidConstants.DRUID_TIMESTAMP_COLUMN);
+    if(timestampColumnName == null) {
+      timestampColumnName = DruidConstants.DEFAULT_TIMESTAMP_COLUMN;
+    }
+    final TimestampSpec timestampSpec = new TimestampSpec(timestampColumnName, 
timestampFormat,
+            null
+    );
+    final InputRowParser inputRowParser = DruidKafkaUtils
+            .getInputRowParser(table, timestampSpec, dimensionsSpec);
 
     final Map<String, Object>
         inputParser =
@@ -318,7 +325,7 @@ import static 
org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
     KafkaSupervisorSpec existingSpec = fetchKafkaIngestionSpec(table);
     String
         targetState =
-        DruidStorageHandlerUtils.getTableProperty(table, 
DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION);
+        DruidStorageHandlerUtils.getTableProperty(table, 
DruidConstants.DRUID_KAFKA_INGESTION);
     if (targetState == null) {
       // Case when user has not specified any ingestion state in the current 
command
       // if there is a kafka supervisor running then keep it last known state 
is START otherwise STOP.
@@ -342,10 +349,10 @@ import static 
org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
     } else {
       throw new IllegalArgumentException(String.format(
           "Invalid value for property [%s], Valid values are [START, STOP, 
RESET]",
-          DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION));
+          DruidConstants.DRUID_KAFKA_INGESTION));
     }
     // We do not want to keep state in two separate places so remove from hive 
table properties.
-    
table.getParameters().remove(DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION);
+    table.getParameters().remove(DruidConstants.DRUID_KAFKA_INGESTION);
   }
 
   private void resetKafkaIngestion(String overlordAddress, String 
dataSourceName) {
@@ -490,8 +497,8 @@ import static 
org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
     final String dataSourceName = 
table.getParameters().get(Constants.DRUID_DATA_SOURCE);
     final String
         segmentDirectory =
-        
table.getParameters().get(DruidStorageHandlerUtils.DRUID_SEGMENT_DIRECTORY) != 
null ?
-            
table.getParameters().get(DruidStorageHandlerUtils.DRUID_SEGMENT_DIRECTORY) :
+        table.getParameters().get(DruidConstants.DRUID_SEGMENT_DIRECTORY) != 
null ?
+            table.getParameters().get(DruidConstants.DRUID_SEGMENT_DIRECTORY) :
             HiveConf.getVar(getConf(), 
HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
 
     final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new 
HdfsDataSegmentPusherConfig();
@@ -745,10 +752,10 @@ import static 
org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
 
   @Override public void configureOutputJobProperties(TableDesc tableDesc, 
Map<String, String> jobProperties) {
     jobProperties.put(Constants.DRUID_DATA_SOURCE, tableDesc.getTableName());
-    jobProperties.put(DruidStorageHandlerUtils.DRUID_SEGMENT_VERSION, new 
DateTime().toString());
-    jobProperties.put(DruidStorageHandlerUtils.DRUID_JOB_WORKING_DIRECTORY, 
getStagingWorkingDir().toString());
+    jobProperties.put(DruidConstants.DRUID_SEGMENT_VERSION, new 
DateTime().toString());
+    jobProperties.put(DruidConstants.DRUID_JOB_WORKING_DIRECTORY, 
getStagingWorkingDir().toString());
     // DruidOutputFormat will write segments in an intermediate directory
-    
jobProperties.put(DruidStorageHandlerUtils.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY,
+    jobProperties.put(DruidConstants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY,
         getIntermediateSegmentDir().toString());
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 8fcadea..6dc97d5 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -92,6 +92,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.conf.DruidConstants;
+import org.apache.hadoop.hive.druid.json.AvroParseSpec;
+import org.apache.hadoop.hive.druid.json.AvroStreamInputRowParser;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
@@ -152,28 +155,12 @@ public final class DruidStorageHandlerUtils {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DruidStorageHandlerUtils.class);
 
-  private static final String DRUID_ROLLUP = "druid.rollup";
-  private static final String DRUID_QUERY_GRANULARITY = 
"druid.query.granularity";
-  public static final String DRUID_QUERY_FETCH = "druid.query.fetch";
-  static final String DRUID_SEGMENT_DIRECTORY = 
"druid.storage.storageDirectory";
-  public static final String DRUID_SEGMENT_INTERMEDIATE_DIRECTORY = 
"druid.storage.storageDirectory.intermediate";
-  public static final String DRUID_SEGMENT_VERSION = "druid.segment.version";
-  public static final String DRUID_JOB_WORKING_DIRECTORY = 
"druid.job.workingDirectory";
-  static final String KAFKA_TOPIC = "kafka.topic";
-  static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
-  static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = 
"druid.kafka.ingestion.";
-  static final String DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX = 
DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "consumer.";
-  /* Kafka Ingestion state - valid values - START/STOP/RESET */
-  static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion";
   private static final int NUM_RETRIES = 8;
   private static final int SECONDS_BETWEEN_RETRIES = 2;
   private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB
   private static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
   private static final String SMILE_CONTENT_TYPE = 
"application/x-jackson-smile";
-  //Druid storage timestamp column name
-  public static final String DEFAULT_TIMESTAMP_COLUMN = "__time";
-  //Druid Json timestamp column name
-  public static final String EVENT_TIMESTAMP_COLUMN = "timestamp";
+
   static final String INDEX_ZIP = "index.zip";
   private static final String DESCRIPTOR_JSON = "descriptor.json";
   private static final Interval
@@ -218,6 +205,10 @@ public final class DruidStorageHandlerUtils {
     // Register the shard sub type to be used by the mapper
     JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, 
"linear"));
     JSON_MAPPER.registerSubtypes(new NamedType(NumberedShardSpec.class, 
"numbered"));
+    JSON_MAPPER.registerSubtypes(new NamedType(AvroParseSpec.class, "avro"));
+    SMILE_MAPPER.registerSubtypes(new NamedType(AvroParseSpec.class, "avro"));
+    JSON_MAPPER.registerSubtypes(new NamedType(AvroStreamInputRowParser.class, 
"avro_stream"));
+    SMILE_MAPPER.registerSubtypes(new 
NamedType(AvroStreamInputRowParser.class, "avro_stream"));
     // set the timezone of the object mapper
     // THIS IS NOT WORKING workaround is to set it as part of java opts 
-Duser.timezone="UTC"
     JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC"));
@@ -628,6 +619,11 @@ public final class DruidStorageHandlerUtils {
     return Boolean.parseBoolean(val);
   }
 
+  static boolean getBooleanProperty(Table table, String propertyName, boolean 
defaultVal) {
+    Boolean val = getBooleanProperty(table, propertyName);
+    return val == null ? defaultVal : val;
+  }
+
   @Nullable static Integer getIntegerProperty(Table table, String 
propertyName) {
     String val = getTableProperty(table, propertyName);
     if (val == null) {
@@ -642,6 +638,11 @@ public final class DruidStorageHandlerUtils {
     }
   }
 
+  static int getIntegerProperty(Table table, String propertyName, int 
defaultVal) {
+    Integer val = getIntegerProperty(table, propertyName);
+    return val == null ? defaultVal : val;
+  }
+
   @Nullable static Long getLongProperty(Table table, String propertyName) {
     String val = getTableProperty(table, propertyName);
     if (val == null) {
@@ -670,6 +671,21 @@ public final class DruidStorageHandlerUtils {
     }
   }
 
+  @Nullable public static List<String> getListProperty(Table table, String 
propertyName) {
+    List<String> rv = new ArrayList<String>();
+    String values = getTableProperty(table, propertyName);
+    if(values == null) {
+      return null;
+    }
+    String[] vals = values.trim().split(",");
+    for(String val : vals) {
+      if(org.apache.commons.lang.StringUtils.isNotBlank(val)) {
+        rv.add(val);
+      }
+    }
+    return rv;
+  }
+
   static String getTableProperty(Table table, String propertyName) {
     return table.getParameters().get(propertyName);
   }
@@ -799,13 +815,13 @@ public final class DruidStorageHandlerUtils {
             HiveConf.getVar(configuration, 
HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY);
     final boolean
         rollup =
-        tableProperties.getProperty(DRUID_ROLLUP) != null ?
+        tableProperties.getProperty(DruidConstants.DRUID_ROLLUP) != null ?
             
Boolean.parseBoolean(tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY))
 :
             HiveConf.getBoolVar(configuration, 
HiveConf.ConfVars.HIVE_DRUID_ROLLUP);
     return new 
UniformGranularitySpec(Granularity.fromString(segmentGranularity),
-        
Granularity.fromString(tableProperties.getProperty(DRUID_QUERY_GRANULARITY) == 
null ?
+        
Granularity.fromString(tableProperties.getProperty(DruidConstants.DRUID_QUERY_GRANULARITY)
 == null ?
             "NONE" :
-            tableProperties.getProperty(DRUID_QUERY_GRANULARITY)),
+            
tableProperties.getProperty(DruidConstants.DRUID_QUERY_GRANULARITY)),
         rollup,
         null);
   }
@@ -853,7 +869,7 @@ public final class DruidStorageHandlerUtils {
         // Granularity column
         String tColumnName = columnNames.get(i);
         if 
(!tColumnName.equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME) && 
!tColumnName.equals(
-            DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) {
+            DruidConstants.DEFAULT_TIMESTAMP_COLUMN)) {
           throw new IllegalArgumentException("Dimension "
               + tColumnName
               + " does not have STRING type: "
@@ -863,7 +879,7 @@ public final class DruidStorageHandlerUtils {
       case TIMESTAMPLOCALTZ:
         // Druid timestamp column
         String tLocalTZColumnName = columnNames.get(i);
-        if 
(!tLocalTZColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) 
{
+        if 
(!tLocalTZColumnName.equals(DruidConstants.DEFAULT_TIMESTAMP_COLUMN)) {
           throw new IllegalArgumentException("Dimension "
               + tLocalTZColumnName
               + " does not have STRING type: "

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
index 862d7ca..f0f039a 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.conf.DruidConstants;
 import org.apache.hadoop.hive.druid.serde.DruidWritable;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -95,7 +96,7 @@ public class DruidOutputFormat implements 
HiveOutputFormat<NullWritable, DruidWr
     final String dataSource = 
tableProperties.getProperty(Constants.DRUID_DATA_SOURCE) == null
         ? jc.get(Constants.DRUID_DATA_SOURCE)
         : tableProperties.getProperty(Constants.DRUID_DATA_SOURCE);
-    final String segmentDirectory = 
jc.get(DruidStorageHandlerUtils.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY);
+    final String segmentDirectory = 
jc.get(DruidConstants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY);
 
     final GranularitySpec granularitySpec = 
DruidStorageHandlerUtils.getGranularitySpec(jc, tableProperties);
 
@@ -109,8 +110,8 @@ public class DruidOutputFormat implements 
HiveOutputFormat<NullWritable, DruidWr
               ));
     }
     ArrayList<String> columnNames = 
Lists.newArrayList(columnNameProperty.split(","));
-    if 
(!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) {
-      throw new IllegalStateException("Timestamp column (' " + 
DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN +
+    if (!columnNames.contains(DruidConstants.DEFAULT_TIMESTAMP_COLUMN)) {
+      throw new IllegalStateException("Timestamp column (' " + 
DruidConstants.DEFAULT_TIMESTAMP_COLUMN +
               "') not specified in create table; list of columns is : " +
               tableProperties.getProperty(serdeConstants.LIST_COLUMNS));
     }
@@ -119,7 +120,7 @@ public class DruidOutputFormat implements 
HiveOutputFormat<NullWritable, DruidWr
     Pair<List<DimensionSchema>, AggregatorFactory[]> dimensionsAndAggregates = 
DruidStorageHandlerUtils
         .getDimensionsAndAggregates(columnNames, columnTypes);
     final InputRowParser inputRowParser = new MapInputRowParser(new 
TimeAndDimsParseSpec(
-            new 
TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
+            new TimestampSpec(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, "auto", 
null),
             new DimensionsSpec(dimensionsAndAggregates.lhs, Lists
                 .newArrayList(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
                     Constants.DRUID_SHARD_KEY_COL_NAME
@@ -141,8 +142,8 @@ public class DruidOutputFormat implements 
HiveOutputFormat<NullWritable, DruidWr
             DruidStorageHandlerUtils.JSON_MAPPER
     );
 
-    final String workingPath = 
jc.get(DruidStorageHandlerUtils.DRUID_JOB_WORKING_DIRECTORY);
-    final String version = 
jc.get(DruidStorageHandlerUtils.DRUID_SEGMENT_VERSION);
+    final String workingPath = 
jc.get(DruidConstants.DRUID_JOB_WORKING_DIRECTORY);
+    final String version = jc.get(DruidConstants.DRUID_SEGMENT_VERSION);
     String basePersistDirectory = HiveConf
             .getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BASE_PERSIST_DIRECTORY);
     if (Strings.isNullOrEmpty(basePersistDirectory)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index ff766c4..c1e0e75 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.DruidStorageHandler;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.conf.DruidConstants;
 import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
 import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
 import org.apache.hadoop.hive.druid.serde.DruidScanQueryRecordReader;
@@ -173,7 +174,7 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
   private static HiveDruidSplit[] distributeSelectQuery(String address, 
SelectQuery query, Path dummyPath)
       throws IOException {
     // If it has a limit, we use it and we do not distribute the query
-    final boolean isFetch = 
query.getContextBoolean(DruidStorageHandlerUtils.DRUID_QUERY_FETCH, false);
+    final boolean isFetch = 
query.getContextBoolean(DruidConstants.DRUID_QUERY_FETCH, false);
     if (isFetch) {
       return new HiveDruidSplit[] {new 
HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query),
           dummyPath,

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
index 671b8cf..65edc66 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.conf.DruidConstants;
 import org.apache.hadoop.hive.druid.serde.DruidWritable;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.io.NullWritable;
@@ -215,7 +216,7 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
 
   @Override public void write(Writable w) throws IOException {
     DruidWritable record = (DruidWritable) w;
-    final long timestamp = (long) 
record.getValue().get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN);
+    final long timestamp = (long) 
record.getValue().get(DruidConstants.DEFAULT_TIMESTAMP_COLUMN);
     final int
         partitionNumber =
         Math.toIntExact((long) 
record.getValue().getOrDefault(Constants.DRUID_SHARD_KEY_COL_NAME, -1L));

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
index 521973e..9efa6f6 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JavaType;
 import io.druid.data.input.MapBasedRow;
 import io.druid.data.input.Row;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.conf.DruidConstants;
 import org.apache.hadoop.io.NullWritable;
 
 import java.io.IOException;
@@ -62,7 +63,7 @@ public class DruidGroupByQueryRecordReader extends 
DruidQueryRecordReader<Row> {
     // Create new value
     DruidWritable value = new DruidWritable(false);
     // 1) The timestamp column
-    value.getValue().put(DruidStorageHandlerUtils.EVENT_TIMESTAMP_COLUMN,
+    value.getValue().put(DruidConstants.EVENT_TIMESTAMP_COLUMN,
         currentRow.getTimestamp() == null ? null : 
currentRow.getTimestamp().getMillis()
     );
     // 2) The dimension columns
@@ -75,7 +76,7 @@ public class DruidGroupByQueryRecordReader extends 
DruidQueryRecordReader<Row> {
       // Update value
       value.getValue().clear();
       // 1) The timestamp column
-      value.getValue().put(DruidStorageHandlerUtils.EVENT_TIMESTAMP_COLUMN,
+      value.getValue().put(DruidConstants.EVENT_TIMESTAMP_COLUMN,
           currentRow.getTimestamp() == null ? null : 
currentRow.getTimestamp().getMillis()
       );
       // 2) The dimension columns

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
index 744f4d1..2c4c8f9 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 
 import com.fasterxml.jackson.databind.JavaType;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.conf.DruidConstants;
 import org.apache.hadoop.io.NullWritable;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -67,7 +68,7 @@ public class DruidSelectQueryRecordReader extends 
DruidQueryRecordReader<Result<
     // Create new value
     DruidWritable value = new DruidWritable(false);
     EventHolder e = values.next();
-    value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, 
e.getTimestamp().getMillis());
+    value.getValue().put(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, 
e.getTimestamp().getMillis());
     value.getValue().putAll(e.getEvent());
     return value;
   }
@@ -77,7 +78,7 @@ public class DruidSelectQueryRecordReader extends 
DruidQueryRecordReader<Result<
       // Update value
       value.getValue().clear();
       EventHolder e = values.next();
-      value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, 
e.getTimestamp().getMillis());
+      value.getValue().put(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, 
e.getTimestamp().getMillis());
       value.getValue().putAll(e.getEvent());
       return true;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index 1d87262..516faf0 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.DruidStorageHandler;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.conf.DruidConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
@@ -169,7 +170,7 @@ import static 
org.joda.time.format.ISODateTimeFormat.dateOptionalTimeParser;
       throw new SerDeException(e);
     }
     for (Entry<String, ColumnAnalysis> columnInfo : 
schemaInfo.getColumns().entrySet()) {
-      if 
(columnInfo.getKey().equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) 
{
+      if (columnInfo.getKey().equals(DruidConstants.DEFAULT_TIMESTAMP_COLUMN)) 
{
         // Special handling for timestamp column
         columnNames.add(columnInfo.getKey()); // field name
         PrimitiveTypeInfo type = tsTZTypeInfo; // field type
@@ -190,9 +191,9 @@ import static 
org.joda.time.format.ISODateTimeFormat.dateOptionalTimeParser;
   private void initFromProperties(final Properties properties) throws 
SerDeException {
 
     final List<String> columnNames = new 
ArrayList<>(Utilities.getColumnNames(properties));
-    if 
(!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) {
+    if (!columnNames.contains(DruidConstants.DEFAULT_TIMESTAMP_COLUMN)) {
       throw new SerDeException("Timestamp column (' "
-          + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN
+          + DruidConstants.DEFAULT_TIMESTAMP_COLUMN
           + "') not specified in create table; list of columns is : "
           + properties.getProperty(serdeConstants.LIST_COLUMNS));
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
index 41fd341..beb342b 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JavaType;
 import io.druid.query.Result;
 import io.druid.query.timeseries.TimeseriesResultValue;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.conf.DruidConstants;
 import org.apache.hadoop.io.NullWritable;
 
 import java.io.IOException;
@@ -59,7 +60,7 @@ public class DruidTimeseriesQueryRecordReader
   public DruidWritable getCurrentValue() throws IOException, 
InterruptedException {
     // Create new value
     DruidWritable value = new DruidWritable(false);
-    value.getValue().put(DruidStorageHandlerUtils.EVENT_TIMESTAMP_COLUMN,
+    value.getValue().put(DruidConstants.EVENT_TIMESTAMP_COLUMN,
         current.getTimestamp() == null ? null : 
current.getTimestamp().getMillis()
     );
     value.getValue().putAll(current.getValue().getBaseObject());
@@ -71,7 +72,7 @@ public class DruidTimeseriesQueryRecordReader
     if (nextKeyValue()) {
       // Update value
       value.getValue().clear();
-      value.getValue().put(DruidStorageHandlerUtils.EVENT_TIMESTAMP_COLUMN,
+      value.getValue().put(DruidConstants.EVENT_TIMESTAMP_COLUMN,
           current.getTimestamp() == null ? null : 
current.getTimestamp().getMillis()
       );
       value.getValue().putAll(current.getValue().getBaseObject());

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
 
b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
index 111f047..63efdc4 100644
--- 
a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
+++ 
b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.druid.DruidStorageHandler;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.conf.DruidConstants;
 import org.apache.hadoop.hive.druid.io.DruidRecordWriter;
 import org.apache.hadoop.hive.druid.serde.DruidWritable;
 import org.joda.time.DateTime;
@@ -86,7 +87,7 @@ import java.util.stream.Collectors;
 
   final List<ImmutableMap<String, Object>>
       expectedRows =
-      
ImmutableList.of(ImmutableMap.of(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
+      ImmutableList.of(ImmutableMap.of(DruidConstants.DEFAULT_TIMESTAMP_COLUMN,
           DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(),
           "host",
           ImmutableList.of("a.example.com"),
@@ -94,7 +95,7 @@ import java.util.stream.Collectors;
           190L,
           "unique_hosts",
           1.0d),
-          ImmutableMap.of(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
+          ImmutableMap.of(DruidConstants.DEFAULT_TIMESTAMP_COLUMN,
               DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(),
               "host",
               ImmutableList.of("b.example.com"),
@@ -102,7 +103,7 @@ import java.util.stream.Collectors;
               175L,
               "unique_hosts",
               1.0d),
-          ImmutableMap.of(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
+          ImmutableMap.of(DruidConstants.DEFAULT_TIMESTAMP_COLUMN,
               DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(),
               "host",
               ImmutableList.of("c.example.com"),
@@ -113,7 +114,7 @@ import java.util.stream.Collectors;
 
   @Test public void testTimeStampColumnName() {
     Assert.assertEquals("Time column name need to match to ensure serdeser 
compatibility",
-        DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
+        DruidConstants.DEFAULT_TIMESTAMP_COLUMN,
         DruidTable.DEFAULT_TIMESTAMP_COLUMN);
   }
 
@@ -127,7 +128,7 @@ import java.util.stream.Collectors;
 
     final InputRowParser
         inputRowParser =
-        new MapInputRowParser(new TimeAndDimsParseSpec(new 
TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
+        new MapInputRowParser(new TimeAndDimsParseSpec(new 
TimestampSpec(DruidConstants.DEFAULT_TIMESTAMP_COLUMN,
             "auto",
             null), new DimensionsSpec(ImmutableList.of(new 
StringDimensionSchema("host")), null, null)));
     final Map<String, Object>
@@ -183,7 +184,7 @@ import java.util.stream.Collectors;
         expectedRows.stream()
             .map(input -> new DruidWritable(ImmutableMap.<String, 
Object>builder().putAll(input)
                 .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
-                    Granularities.DAY.bucketStart(new DateTime((long) 
input.get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)))
+                    Granularities.DAY.bucketStart(new DateTime((long) 
input.get(DruidConstants.DEFAULT_TIMESTAMP_COLUMN)))
                         .getMillis())
                 .build()))
             .collect(Collectors.toList());
@@ -226,7 +227,7 @@ import java.util.stream.Collectors;
 
       Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions());
 
-      
Assert.assertEquals(expected.get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN),
+      
Assert.assertEquals(expected.get(DruidConstants.DEFAULT_TIMESTAMP_COLUMN),
           actual.getTimestamp().getMillis());
       Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
       Assert.assertEquals(expected.get("visited_sum"), 
actual.getMetric("visited_sum"));

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/itests/qtest-druid/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml
index 19cdf91..10ddfaa 100644
--- a/itests/qtest-druid/pom.xml
+++ b/itests/qtest-druid/pom.xml
@@ -110,6 +110,17 @@
           <version>${druid.version}</version>
         </dependency>
         <dependency>
+          <groupId>io.druid.extensions</groupId>
+          <artifactId>druid-avro-extensions</artifactId>
+          <version>${druid.version}</version>
+          <exclusions>
+            <exclusion>
+              <groupId>org.mortbay.jetty</groupId>
+              <artifactId>servlet-api</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
           <groupId>org.apache.logging.log4j</groupId>
           <artifactId>log4j-api</artifactId>
           <version>${log4j2.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java
----------------------------------------------------------------------
diff --git 
a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java 
b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java
index 2a31952..a9d381f 100644
--- 
a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java
+++ 
b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java
@@ -55,7 +55,8 @@ public class MiniDruidCluster extends AbstractService {
           "druid.metadata.storage.type", "derby",
           "druid.storage.type", "hdfs",
           "druid.processing.buffer.sizeBytes", "213870912",
-          "druid.processing.numThreads", "2"
+          "druid.processing.numThreads", "2",
+          "druid.worker.capacity", "4"
   );
 
   private static final Map<String, String> COMMON_DRUID_HISTORICAL = 
ImmutableMap.of(

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index ab60213..fb50588 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1849,6 +1849,9 @@ druid.query.files=druidmini_test1.q,\
   druidmini_floorTime.q, \
   druidmini_masking.q, \
   druidkafkamini_basic.q, \
+  druidkafkamini_avro.q, \
+  druidkafkamini_csv.q, \
+  druidkafkamini_delimited.q, \
   kafka_storage_handler.q
 
 druid.llap.local.query.files=druidmini_noop.q

http://git-wip-us.apache.org/repos/asf/hive/blob/240da840/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java 
b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index b5d2386..59c7ac4 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -640,6 +640,10 @@ public class QTestUtil {
           "test-topic",
           new File(getScriptsDir(), "kafka_init_data.json")
       );
+      kafkaCluster.createTopicWithData(
+              "wiki_kafka_csv",
+              new File(getScriptsDir(), "kafka_init_data.csv")
+      );
       kafkaCluster.createTopicWithData("wiki_kafka_avro_table", getAvroRows());
     }
 

Reply via email to