HIVE-17761: Deprecate hive.druid.select.distribute property for Druid (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a0c7e87a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a0c7e87a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a0c7e87a Branch: refs/heads/hive-14535 Commit: a0c7e87a1128ddd85883178ee7afdf4dedb3cafc Parents: 71b2a26 Author: Jesus Camacho Rodriguez <jcama...@apache.org> Authored: Tue Oct 10 10:39:26 2017 -0700 Committer: Jesus Camacho Rodriguez <jcama...@apache.org> Committed: Tue Oct 10 18:01:49 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 7 - .../druid/io/DruidQueryBasedInputFormat.java | 181 +------------------ .../TestHiveDruidQueryBasedInputFormat.java | 104 ----------- 3 files changed, 1 insertion(+), 291 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a0c7e87a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3025692..ca28198 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2014,13 +2014,6 @@ public class HiveConf extends Configuration { HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS("hive.druid.coordinator.address.default", "localhost:8081", "Address of the Druid coordinator. It is used to check the load status of newly created segments" ), - HIVE_DRUID_SELECT_DISTRIBUTE("hive.druid.select.distribute", true, - "If it is set to true, we distribute the execution of Druid Select queries. Concretely, we retrieve\n" + - "the result for Select queries directly from the Druid nodes containing the segments data.\n" + - "In particular, first we contact the Druid broker node to obtain the nodes containing the segments\n" + - "for the given query, and then we contact those nodes to retrieve the results for the query.\n" + - "If it is set to false, we do not execute the Select queries in a distributed fashion. Instead, results\n" + - "for those queries are returned by the Druid broker node."), HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000, "Takes only effect when hive.druid.select.distribute is set to false. \n" + "When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/a0c7e87a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 209d60d..a81c2ab 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.net.URLEncoder; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -54,7 +53,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Interval; -import org.joda.time.chrono.ISOChronology; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,21 +64,13 @@ import com.metamx.http.client.Request; import io.druid.query.BaseQuery; import io.druid.query.Druids; -import io.druid.query.Druids.SegmentMetadataQueryBuilder; import io.druid.query.Druids.SelectQueryBuilder; -import io.druid.query.Druids.TimeBoundaryQueryBuilder; import io.druid.query.LocatedSegmentDescriptor; import io.druid.query.Query; -import io.druid.query.Result; import io.druid.query.SegmentDescriptor; -import io.druid.query.metadata.metadata.SegmentAnalysis; -import io.druid.query.metadata.metadata.SegmentMetadataQuery; import io.druid.query.select.PagingSpec; import io.druid.query.select.SelectQuery; -import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.spec.MultipleSpecificSegmentSpec; -import io.druid.query.timeboundary.TimeBoundaryQuery; -import io.druid.query.timeboundary.TimeBoundaryResultValue; /** * Druid query based input format. @@ -164,12 +154,7 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW case Query.SELECT: SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( druidQuery, SelectQuery.class); - boolean distributed = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_DISTRIBUTE); - if (distributed) { - return distributeSelectQuery(conf, address, selectQuery, paths[0]); - } else { - return splitSelectQuery(conf, address, selectQuery, paths[0]); - } + return distributeSelectQuery(conf, address, selectQuery, paths[0]); default: throw new IOException("Druid query type not recognized"); } @@ -248,170 +233,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW return splits; } - /* Method that splits Select query depending on the threshold so read can be - * parallelized. We will only contact the Druid broker to obtain all results. */ - private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address, - SelectQuery query, Path dummyPath - ) throws IOException { - final int selectThreshold = HiveConf.getIntVar( - conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD); - - final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false); - if (isFetch) { - // If it has a limit, we use it and we do not split the query - return new HiveDruidSplit[] { new HiveDruidSplit( - DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, - new String[] {address} ) }; - } - - // We do not have the number of rows, thus we need to execute a - // Segment Metadata query to obtain number of rows - SegmentMetadataQueryBuilder metadataBuilder = new Druids.SegmentMetadataQueryBuilder(); - metadataBuilder.dataSource(query.getDataSource()); - metadataBuilder.intervals(query.getIntervals()); - metadataBuilder.merge(true); - metadataBuilder.analysisTypes(); - SegmentMetadataQuery metadataQuery = metadataBuilder.build(); - InputStream response; - try { - response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), - DruidStorageHandlerUtils.createSmileRequest(address, metadataQuery) - ); - } catch (Exception e) { - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } - - // Retrieve results - List<SegmentAnalysis> metadataList; - try { - metadataList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response, - new TypeReference<List<SegmentAnalysis>>() { - } - ); - } catch (Exception e) { - response.close(); - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } - if (metadataList == null) { - throw new IOException("Connected to Druid but could not retrieve datasource information"); - } - if (metadataList.isEmpty()) { - // There are no rows for that time range, we can submit query as it is - return new HiveDruidSplit[] { new HiveDruidSplit( - DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, - new String[] {address} ) }; - } - if (metadataList.size() != 1) { - throw new IOException("Information about segments should have been merged"); - } - - final long numRows = metadataList.get(0).getNumRows(); - - query = query.withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE)); - if (numRows <= selectThreshold) { - // We are not going to split it - return new HiveDruidSplit[] { new HiveDruidSplit( - DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, - new String[] {address} ) }; - } - - // If the query does not specify a timestamp, we obtain the total time using - // a Time Boundary query. Then, we use the information to split the query - // following the Select threshold configuration property - final List<Interval> intervals = new ArrayList<>(); - if (query.getIntervals().size() == 1 && query.getIntervals().get(0).withChronology( - ISOChronology.getInstanceUTC()).equals(DruidStorageHandlerUtils.DEFAULT_INTERVAL)) { - // Default max and min, we should execute a time boundary query to get a - // more precise range - TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder(); - timeBuilder.dataSource(query.getDataSource()); - TimeBoundaryQuery timeQuery = timeBuilder.build(); - try { - response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), - DruidStorageHandlerUtils.createSmileRequest(address, timeQuery) - ); - } catch (Exception e) { - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } - - // Retrieve results - List<Result<TimeBoundaryResultValue>> timeList; - try { - timeList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response, - new TypeReference<List<Result<TimeBoundaryResultValue>>>() { - } - ); - } catch (Exception e) { - response.close(); - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } - if (timeList == null || timeList.isEmpty()) { - throw new IOException( - "Connected to Druid but could not retrieve time boundary information"); - } - if (timeList.size() != 1) { - throw new IOException("We should obtain a single time boundary"); - } - - intervals.add(new Interval(timeList.get(0).getValue().getMinTime().getMillis(), - timeList.get(0).getValue().getMaxTime().getMillis(), ISOChronology.getInstanceUTC() - )); - } else { - intervals.addAll(query.getIntervals()); - } - - // Create (numRows/default threshold) input splits - int numSplits = (int) Math.ceil((double) numRows / selectThreshold); - List<List<Interval>> newIntervals = createSplitsIntervals(intervals, numSplits); - HiveDruidSplit[] splits = new HiveDruidSplit[numSplits]; - for (int i = 0; i < numSplits; i++) { - // Create partial Select query - final SelectQuery partialQuery = query.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(newIntervals.get(i))); - splits[i] = new HiveDruidSplit( - DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath, - new String[] {address}); - } - return splits; - } - - private static List<List<Interval>> createSplitsIntervals(List<Interval> intervals, int numSplits - ) { - - long startTime = intervals.get(0).getStartMillis(); - long endTime = startTime; - long currTime = 0; - List<List<Interval>> newIntervals = new ArrayList<>(); - long totalTime = 0; - for (Interval interval: intervals) { - totalTime += interval.getEndMillis() - interval.getStartMillis(); - } - for (int i = 0, posIntervals = 0; i < numSplits; i++) { - final long rangeSize = Math.round((double) (totalTime * (i + 1)) / numSplits) - - Math.round((double) (totalTime * i) / numSplits); - // Create the new interval(s) - List<Interval> currentIntervals = new ArrayList<>(); - while (posIntervals < intervals.size()) { - final Interval interval = intervals.get(posIntervals); - final long expectedRange = rangeSize - currTime; - if (interval.getEndMillis() - startTime >= expectedRange) { - endTime = startTime + expectedRange; - currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC())); - startTime = endTime; - currTime = 0; - break; - } - endTime = interval.getEndMillis(); - currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC())); - currTime += (endTime - startTime); - startTime = intervals.get(++posIntervals).getStartMillis(); - } - newIntervals.add(currentIntervals); - } - assert endTime == intervals.get(intervals.size() - 1).getEndMillis(); - return newIntervals; - } - private static String deserializeSerialize(String druidQuery) throws JsonParseException, JsonMappingException, IOException { BaseQuery<?> deserializedQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( http://git-wip-us.apache.org/repos/asf/hive/blob/a0c7e87a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java index fb15830..37597d7 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hive.druid; import java.lang.reflect.Method; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.Constants; @@ -28,8 +26,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; import org.apache.hadoop.hive.druid.io.HiveDruidSplit; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.joda.time.Interval; -import org.joda.time.chrono.ISOChronology; import org.junit.Test; import io.druid.query.Query; @@ -37,105 +33,6 @@ import junit.framework.TestCase; public class TestHiveDruidQueryBasedInputFormat extends TestCase { - @SuppressWarnings("unchecked") - @Test - public void testCreateSplitsIntervals() throws Exception { - DruidQueryBasedInputFormat input = new DruidQueryBasedInputFormat(); - - Method method1 = DruidQueryBasedInputFormat.class.getDeclaredMethod("createSplitsIntervals", - List.class, int.class - ); - method1.setAccessible(true); - - List<Interval> intervals; - List<List<Interval>> resultList; - List<List<Interval>> expectedResultList; - - // Test 1 : single split, create 4 - intervals = new ArrayList<>(); - intervals.add(new Interval(1262304000000L, 1293840000000L, ISOChronology.getInstanceUTC())); - resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4); - expectedResultList = new ArrayList<>(); - expectedResultList.add(Arrays - .asList(new Interval(1262304000000L, 1270188000000L, ISOChronology.getInstanceUTC()))); - expectedResultList.add(Arrays - .asList(new Interval(1270188000000L, 1278072000000L, ISOChronology.getInstanceUTC()))); - expectedResultList.add(Arrays - .asList(new Interval(1278072000000L, 1285956000000L, ISOChronology.getInstanceUTC()))); - expectedResultList.add(Arrays - .asList(new Interval(1285956000000L, 1293840000000L, ISOChronology.getInstanceUTC()))); - assertEquals(expectedResultList, resultList); - - // Test 2 : two splits, create 4 - intervals = new ArrayList<>(); - intervals.add(new Interval(1262304000000L, 1293840000000L, ISOChronology.getInstanceUTC())); - intervals.add(new Interval(1325376000000L, 1356998400000L, ISOChronology.getInstanceUTC())); - resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4); - expectedResultList = new ArrayList<>(); - expectedResultList.add(Arrays - .asList(new Interval(1262304000000L, 1278093600000L, ISOChronology.getInstanceUTC()))); - expectedResultList.add(Arrays - .asList(new Interval(1278093600000L, 1293840000000L, ISOChronology.getInstanceUTC()), - new Interval(1325376000000L, 1325419200000L, ISOChronology.getInstanceUTC()) - )); - expectedResultList.add(Arrays - .asList(new Interval(1325419200000L, 1341208800000L, ISOChronology.getInstanceUTC()))); - expectedResultList.add(Arrays - .asList(new Interval(1341208800000L, 1356998400000L, ISOChronology.getInstanceUTC()))); - assertEquals(expectedResultList, resultList); - - // Test 3 : two splits, create 5 - intervals = new ArrayList<>(); - intervals.add(new Interval(1262304000000L, 1293840000000L, ISOChronology.getInstanceUTC())); - intervals.add(new Interval(1325376000000L, 1356998400000L, ISOChronology.getInstanceUTC())); - resultList = (List<List<Interval>>) method1.invoke(input, intervals, 5); - expectedResultList = new ArrayList<>(); - expectedResultList.add(Arrays - .asList(new Interval(1262304000000L, 1274935680000L, ISOChronology.getInstanceUTC()))); - expectedResultList.add(Arrays - .asList(new Interval(1274935680000L, 1287567360000L, ISOChronology.getInstanceUTC()))); - expectedResultList.add(Arrays - .asList(new Interval(1287567360000L, 1293840000000L, ISOChronology.getInstanceUTC()), - new Interval(1325376000000L, 1331735040000L, ISOChronology.getInstanceUTC()) - )); - expectedResultList.add(Arrays - .asList(new Interval(1331735040000L, 1344366720000L, ISOChronology.getInstanceUTC()))); - expectedResultList.add(Arrays - .asList(new Interval(1344366720000L, 1356998400000L, ISOChronology.getInstanceUTC()))); - assertEquals(expectedResultList, resultList); - - // Test 4 : three splits, different ranges, create 6 - intervals = new ArrayList<>(); - intervals.add(new Interval(1199145600000L, 1201824000000L, - ISOChronology.getInstanceUTC() - )); // one month - intervals.add(new Interval(1325376000000L, 1356998400000L, - ISOChronology.getInstanceUTC() - )); // one year - intervals.add(new Interval(1407283200000L, 1407888000000L, - ISOChronology.getInstanceUTC() - )); // 7 days - resultList = (List<List<Interval>>) method1.invoke(input, intervals, 6); - expectedResultList = new ArrayList<>(); - expectedResultList.add(Arrays - .asList(new Interval(1199145600000L, 1201824000000L, ISOChronology.getInstanceUTC()), - new Interval(1325376000000L, 1328515200000L, ISOChronology.getInstanceUTC()) - )); - expectedResultList.add(Arrays - .asList(new Interval(1328515200000L, 1334332800000L, ISOChronology.getInstanceUTC()))); - expectedResultList.add(Arrays - .asList(new Interval(1334332800000L, 1340150400000L, ISOChronology.getInstanceUTC()))); - expectedResultList.add(Arrays - .asList(new Interval(1340150400000L, 1345968000000L, ISOChronology.getInstanceUTC()))); - expectedResultList.add(Arrays - .asList(new Interval(1345968000000L, 1351785600000L, ISOChronology.getInstanceUTC()))); - expectedResultList.add(Arrays - .asList(new Interval(1351785600000L, 1356998400000L, ISOChronology.getInstanceUTC()), - new Interval(1407283200000L, 1407888000000L, ISOChronology.getInstanceUTC()) - )); - assertEquals(expectedResultList, resultList); - } - private static final String TIMESERIES_QUERY = "{ \"queryType\": \"timeseries\", " + " \"dataSource\": \"sample_datasource\", " @@ -289,7 +186,6 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase { conf.set(Constants.DRUID_DATA_SOURCE, dataSource); conf.set(Constants.DRUID_QUERY_JSON, jsonQuery); conf.set(Constants.DRUID_QUERY_TYPE, queryType); - conf.setBoolean(HiveConf.ConfVars.HIVE_DRUID_SELECT_DISTRIBUTE.varname, false); return conf; }