HIVE-16123: Let user pick the granularity of bucketing and max in row memory (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4a041428 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4a041428 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4a041428 Branch: refs/heads/hive-14535 Commit: 4a041428b14cd195c5d89c93e07ef518652fdcd3 Parents: 33e7d60 Author: Slim Bouguerra <[email protected]> Authored: Wed Mar 8 15:11:52 2017 +0000 Committer: Jesus Camacho Rodriguez <[email protected]> Committed: Wed Mar 8 15:11:52 2017 +0000 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/Constants.java | 1 + .../hadoop/hive/druid/io/DruidOutputFormat.java | 27 ++++++++++++++++---- 2 files changed, 23 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4a041428/common/src/java/org/apache/hadoop/hive/conf/Constants.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index ea7864a..7695e02 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -30,6 +30,7 @@ public class Constants { "org.apache.hadoop.hive.druid.io.DruidOutputFormat"; public static final String DRUID_DATA_SOURCE = "druid.datasource"; public static final String DRUID_SEGMENT_GRANULARITY = "druid.segment.granularity"; + public static final String DRUID_QUERY_GRANULARITY = "druid.query.granularity"; public static final String DRUID_TIMESTAMP_GRANULARITY_COL_NAME = "__time_granularity"; public static final String DRUID_QUERY_JSON = "druid.query.json"; public static final String DRUID_QUERY_TYPE = "druid.query.type"; http://git-wip-us.apache.org/repos/asf/hive/blob/4a041428/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index 6c08ca5..c74bce6 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -28,6 +28,7 @@ import io.druid.data.input.impl.MapInputRowParser; import io.druid.data.input.impl.StringDimensionSchema; import io.druid.data.input.impl.TimeAndDimsParseSpec; import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -102,7 +103,10 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl final GranularitySpec granularitySpec = new UniformGranularitySpec( Granularity.valueOf(segmentGranularity), - null, + QueryGranularity.fromString( + tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY) == null + ? "NONE" + : tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY)), null ); @@ -190,10 +194,23 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl .getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_PARTITION_SIZE); String basePersistDirectory = HiveConf .getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BASE_PERSIST_DIRECTORY); - final RealtimeTuningConfig realtimeTuningConfig = RealtimeTuningConfig - .makeDefaultTuningConfig(new File( - basePersistDirectory)) - .withVersioningPolicy(new CustomVersioningPolicy(version)); + Integer maxRowInMemory = HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_ROW_IN_MEMORY); + + RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(maxRowInMemory, + null, + null, + new File(basePersistDirectory), + new CustomVersioningPolicy(version), + null, + null, + null, + null, + true, + 0, + 0, + true, + null + ); LOG.debug(String.format("running with Data schema [%s] ", dataSchema)); return new DruidRecordWriter(dataSchema, realtimeTuningConfig, hdfsDataSegmentPusher,
