HIVE-17761: Deprecate hive.druid.select.distribute property for Druid (Jesus 
Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a0c7e87a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a0c7e87a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a0c7e87a

Branch: refs/heads/hive-14535
Commit: a0c7e87a1128ddd85883178ee7afdf4dedb3cafc
Parents: 71b2a26
Author: Jesus Camacho Rodriguez <jcama...@apache.org>
Authored: Tue Oct 10 10:39:26 2017 -0700
Committer: Jesus Camacho Rodriguez <jcama...@apache.org>
Committed: Tue Oct 10 18:01:49 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   7 -
 .../druid/io/DruidQueryBasedInputFormat.java    | 181 +------------------
 .../TestHiveDruidQueryBasedInputFormat.java     | 104 -----------
 3 files changed, 1 insertion(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a0c7e87a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3025692..ca28198 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2014,13 +2014,6 @@ public class HiveConf extends Configuration {
     
HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS("hive.druid.coordinator.address.default",
 "localhost:8081",
             "Address of the Druid coordinator. It is used to check the load 
status of newly created segments"
     ),
-    HIVE_DRUID_SELECT_DISTRIBUTE("hive.druid.select.distribute", true,
-        "If it is set to true, we distribute the execution of Druid Select 
queries. Concretely, we retrieve\n" +
-        "the result for Select queries directly from the Druid nodes 
containing the segments data.\n" +
-        "In particular, first we contact the Druid broker node to obtain the 
nodes containing the segments\n" +
-        "for the given query, and then we contact those nodes to retrieve the 
results for the query.\n" +
-        "If it is set to false, we do not execute the Select queries in a 
distributed fashion. Instead, results\n" +
-        "for those queries are returned by the Druid broker node."),
     HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000,
         "Takes only effect when hive.druid.select.distribute is set to false. 
\n" +
         "When we can split a Select query, this is the maximum number of rows 
that we try to retrieve\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/a0c7e87a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index 209d60d..a81c2ab 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
 import java.net.URLEncoder;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -54,7 +53,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.Interval;
-import org.joda.time.chrono.ISOChronology;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,21 +64,13 @@ import com.metamx.http.client.Request;
 
 import io.druid.query.BaseQuery;
 import io.druid.query.Druids;
-import io.druid.query.Druids.SegmentMetadataQueryBuilder;
 import io.druid.query.Druids.SelectQueryBuilder;
-import io.druid.query.Druids.TimeBoundaryQueryBuilder;
 import io.druid.query.LocatedSegmentDescriptor;
 import io.druid.query.Query;
-import io.druid.query.Result;
 import io.druid.query.SegmentDescriptor;
-import io.druid.query.metadata.metadata.SegmentAnalysis;
-import io.druid.query.metadata.metadata.SegmentMetadataQuery;
 import io.druid.query.select.PagingSpec;
 import io.druid.query.select.SelectQuery;
-import io.druid.query.spec.MultipleIntervalSegmentSpec;
 import io.druid.query.spec.MultipleSpecificSegmentSpec;
-import io.druid.query.timeboundary.TimeBoundaryQuery;
-import io.druid.query.timeboundary.TimeBoundaryResultValue;
 
 /**
  * Druid query based input format.
@@ -164,12 +154,7 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
       case Query.SELECT:
         SelectQuery selectQuery = 
DruidStorageHandlerUtils.JSON_MAPPER.readValue(
                 druidQuery, SelectQuery.class);
-        boolean distributed = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_DRUID_SELECT_DISTRIBUTE);
-        if (distributed) {
-          return distributeSelectQuery(conf, address, selectQuery, paths[0]);
-        } else {
-          return splitSelectQuery(conf, address, selectQuery, paths[0]);
-        }
+        return distributeSelectQuery(conf, address, selectQuery, paths[0]);
       default:
         throw new IOException("Druid query type not recognized");
     }
@@ -248,170 +233,6 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
     return splits;
   }
 
-  /* Method that splits Select query depending on the threshold so read can be
-   * parallelized. We will only contact the Druid broker to obtain all 
results. */
-  private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String 
address,
-          SelectQuery query, Path dummyPath
-  ) throws IOException {
-    final int selectThreshold = HiveConf.getIntVar(
-            conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
-
-    final boolean isFetch = 
query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
-    if (isFetch) {
-      // If it has a limit, we use it and we do not split the query
-      return new HiveDruidSplit[] { new HiveDruidSplit(
-              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), 
dummyPath,
-              new String[] {address} ) };
-    }
-
-    // We do not have the number of rows, thus we need to execute a
-    // Segment Metadata query to obtain number of rows
-    SegmentMetadataQueryBuilder metadataBuilder = new 
Druids.SegmentMetadataQueryBuilder();
-    metadataBuilder.dataSource(query.getDataSource());
-    metadataBuilder.intervals(query.getIntervals());
-    metadataBuilder.merge(true);
-    metadataBuilder.analysisTypes();
-    SegmentMetadataQuery metadataQuery = metadataBuilder.build();
-    InputStream response;
-    try {
-      response = 
DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
-              DruidStorageHandlerUtils.createSmileRequest(address, 
metadataQuery)
-      );
-    } catch (Exception e) {
-      throw new 
IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-    }
-
-    // Retrieve results
-    List<SegmentAnalysis> metadataList;
-    try {
-      metadataList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
-              new TypeReference<List<SegmentAnalysis>>() {
-              }
-      );
-    } catch (Exception e) {
-      response.close();
-      throw new 
IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-    }
-    if (metadataList == null) {
-      throw new IOException("Connected to Druid but could not retrieve 
datasource information");
-    }
-    if (metadataList.isEmpty()) {
-      // There are no rows for that time range, we can submit query as it is
-      return new HiveDruidSplit[] { new HiveDruidSplit(
-              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), 
dummyPath,
-              new String[] {address} ) };
-    }
-    if (metadataList.size() != 1) {
-      throw new IOException("Information about segments should have been 
merged");
-    }
-
-    final long numRows = metadataList.get(0).getNumRows();
-
-    query = query.withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
-    if (numRows <= selectThreshold) {
-      // We are not going to split it
-      return new HiveDruidSplit[] { new HiveDruidSplit(
-              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), 
dummyPath,
-              new String[] {address} ) };
-    }
-
-    // If the query does not specify a timestamp, we obtain the total time 
using
-    // a Time Boundary query. Then, we use the information to split the query
-    // following the Select threshold configuration property
-    final List<Interval> intervals = new ArrayList<>();
-    if (query.getIntervals().size() == 1 && 
query.getIntervals().get(0).withChronology(
-            
ISOChronology.getInstanceUTC()).equals(DruidStorageHandlerUtils.DEFAULT_INTERVAL))
 {
-      // Default max and min, we should execute a time boundary query to get a
-      // more precise range
-      TimeBoundaryQueryBuilder timeBuilder = new 
Druids.TimeBoundaryQueryBuilder();
-      timeBuilder.dataSource(query.getDataSource());
-      TimeBoundaryQuery timeQuery = timeBuilder.build();
-      try {
-        response = 
DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
-                DruidStorageHandlerUtils.createSmileRequest(address, timeQuery)
-        );
-      } catch (Exception e) {
-        throw new 
IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-      }
-
-      // Retrieve results
-      List<Result<TimeBoundaryResultValue>> timeList;
-      try {
-        timeList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
-                new TypeReference<List<Result<TimeBoundaryResultValue>>>() {
-                }
-        );
-      } catch (Exception e) {
-        response.close();
-        throw new 
IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
-      }
-      if (timeList == null || timeList.isEmpty()) {
-        throw new IOException(
-                "Connected to Druid but could not retrieve time boundary 
information");
-      }
-      if (timeList.size() != 1) {
-        throw new IOException("We should obtain a single time boundary");
-      }
-
-      intervals.add(new 
Interval(timeList.get(0).getValue().getMinTime().getMillis(),
-              timeList.get(0).getValue().getMaxTime().getMillis(), 
ISOChronology.getInstanceUTC()
-      ));
-    } else {
-      intervals.addAll(query.getIntervals());
-    }
-
-    // Create (numRows/default threshold) input splits
-    int numSplits = (int) Math.ceil((double) numRows / selectThreshold);
-    List<List<Interval>> newIntervals = createSplitsIntervals(intervals, 
numSplits);
-    HiveDruidSplit[] splits = new HiveDruidSplit[numSplits];
-    for (int i = 0; i < numSplits; i++) {
-      // Create partial Select query
-      final SelectQuery partialQuery = query.withQuerySegmentSpec(
-              new MultipleIntervalSegmentSpec(newIntervals.get(i)));
-      splits[i] = new HiveDruidSplit(
-              
DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), 
dummyPath,
-              new String[] {address});
-    }
-    return splits;
-  }
-
-  private static List<List<Interval>> createSplitsIntervals(List<Interval> 
intervals, int numSplits
-  ) {
-
-    long startTime = intervals.get(0).getStartMillis();
-    long endTime = startTime;
-    long currTime = 0;
-    List<List<Interval>> newIntervals = new ArrayList<>();
-    long totalTime = 0;
-    for (Interval interval: intervals) {
-      totalTime += interval.getEndMillis() - interval.getStartMillis();
-    }
-    for (int i = 0, posIntervals = 0; i < numSplits; i++) {
-      final long rangeSize = Math.round((double) (totalTime * (i + 1)) / 
numSplits) -
-              Math.round((double) (totalTime * i) / numSplits);
-      // Create the new interval(s)
-      List<Interval> currentIntervals = new ArrayList<>();
-      while (posIntervals < intervals.size()) {
-        final Interval interval = intervals.get(posIntervals);
-        final long expectedRange = rangeSize - currTime;
-        if (interval.getEndMillis() - startTime >= expectedRange) {
-          endTime = startTime + expectedRange;
-          currentIntervals.add(new Interval(startTime, endTime, 
ISOChronology.getInstanceUTC()));
-          startTime = endTime;
-          currTime = 0;
-          break;
-        }
-        endTime = interval.getEndMillis();
-        currentIntervals.add(new Interval(startTime, endTime, 
ISOChronology.getInstanceUTC()));
-        currTime += (endTime - startTime);
-        startTime = intervals.get(++posIntervals).getStartMillis();
-      }
-      newIntervals.add(currentIntervals);
-    }
-    assert endTime == intervals.get(intervals.size() - 1).getEndMillis();
-    return newIntervals;
-  }
-
   private static String deserializeSerialize(String druidQuery)
           throws JsonParseException, JsonMappingException, IOException {
     BaseQuery<?> deserializedQuery = 
DruidStorageHandlerUtils.JSON_MAPPER.readValue(

http://git-wip-us.apache.org/repos/asf/hive/blob/a0c7e87a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
 
b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
index fb15830..37597d7 100644
--- 
a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
+++ 
b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
@@ -18,9 +18,7 @@
 package org.apache.hadoop.hive.druid;
 
 import java.lang.reflect.Method;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.Constants;
@@ -28,8 +26,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
 import org.apache.hadoop.hive.druid.io.HiveDruidSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.joda.time.Interval;
-import org.joda.time.chrono.ISOChronology;
 import org.junit.Test;
 
 import io.druid.query.Query;
@@ -37,105 +33,6 @@ import junit.framework.TestCase;
 
 public class TestHiveDruidQueryBasedInputFormat extends TestCase {
 
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testCreateSplitsIntervals() throws Exception {
-    DruidQueryBasedInputFormat input = new DruidQueryBasedInputFormat();
-
-    Method method1 = 
DruidQueryBasedInputFormat.class.getDeclaredMethod("createSplitsIntervals",
-            List.class, int.class
-    );
-    method1.setAccessible(true);
-
-    List<Interval> intervals;
-    List<List<Interval>> resultList;
-    List<List<Interval>> expectedResultList;
-
-    // Test 1 : single split, create 4
-    intervals = new ArrayList<>();
-    intervals.add(new Interval(1262304000000L, 1293840000000L, 
ISOChronology.getInstanceUTC()));
-    resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4);
-    expectedResultList = new ArrayList<>();
-    expectedResultList.add(Arrays
-            .asList(new Interval(1262304000000L, 1270188000000L, 
ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1270188000000L, 1278072000000L, 
ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1278072000000L, 1285956000000L, 
ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1285956000000L, 1293840000000L, 
ISOChronology.getInstanceUTC())));
-    assertEquals(expectedResultList, resultList);
-
-    // Test 2 : two splits, create 4
-    intervals = new ArrayList<>();
-    intervals.add(new Interval(1262304000000L, 1293840000000L, 
ISOChronology.getInstanceUTC()));
-    intervals.add(new Interval(1325376000000L, 1356998400000L, 
ISOChronology.getInstanceUTC()));
-    resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4);
-    expectedResultList = new ArrayList<>();
-    expectedResultList.add(Arrays
-            .asList(new Interval(1262304000000L, 1278093600000L, 
ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1278093600000L, 1293840000000L, 
ISOChronology.getInstanceUTC()),
-                    new Interval(1325376000000L, 1325419200000L, 
ISOChronology.getInstanceUTC())
-            ));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1325419200000L, 1341208800000L, 
ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1341208800000L, 1356998400000L, 
ISOChronology.getInstanceUTC())));
-    assertEquals(expectedResultList, resultList);
-
-    // Test 3 : two splits, create 5
-    intervals = new ArrayList<>();
-    intervals.add(new Interval(1262304000000L, 1293840000000L, 
ISOChronology.getInstanceUTC()));
-    intervals.add(new Interval(1325376000000L, 1356998400000L, 
ISOChronology.getInstanceUTC()));
-    resultList = (List<List<Interval>>) method1.invoke(input, intervals, 5);
-    expectedResultList = new ArrayList<>();
-    expectedResultList.add(Arrays
-            .asList(new Interval(1262304000000L, 1274935680000L, 
ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1274935680000L, 1287567360000L, 
ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1287567360000L, 1293840000000L, 
ISOChronology.getInstanceUTC()),
-                    new Interval(1325376000000L, 1331735040000L, 
ISOChronology.getInstanceUTC())
-            ));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1331735040000L, 1344366720000L, 
ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1344366720000L, 1356998400000L, 
ISOChronology.getInstanceUTC())));
-    assertEquals(expectedResultList, resultList);
-
-    // Test 4 : three splits, different ranges, create 6
-    intervals = new ArrayList<>();
-    intervals.add(new Interval(1199145600000L, 1201824000000L,
-            ISOChronology.getInstanceUTC()
-    )); // one month
-    intervals.add(new Interval(1325376000000L, 1356998400000L,
-            ISOChronology.getInstanceUTC()
-    )); // one year
-    intervals.add(new Interval(1407283200000L, 1407888000000L,
-            ISOChronology.getInstanceUTC()
-    )); // 7 days
-    resultList = (List<List<Interval>>) method1.invoke(input, intervals, 6);
-    expectedResultList = new ArrayList<>();
-    expectedResultList.add(Arrays
-            .asList(new Interval(1199145600000L, 1201824000000L, 
ISOChronology.getInstanceUTC()),
-                    new Interval(1325376000000L, 1328515200000L, 
ISOChronology.getInstanceUTC())
-            ));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1328515200000L, 1334332800000L, 
ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1334332800000L, 1340150400000L, 
ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1340150400000L, 1345968000000L, 
ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1345968000000L, 1351785600000L, 
ISOChronology.getInstanceUTC())));
-    expectedResultList.add(Arrays
-            .asList(new Interval(1351785600000L, 1356998400000L, 
ISOChronology.getInstanceUTC()),
-                    new Interval(1407283200000L, 1407888000000L, 
ISOChronology.getInstanceUTC())
-            ));
-    assertEquals(expectedResultList, resultList);
-  }
-
   private static final String TIMESERIES_QUERY =
       "{  \"queryType\": \"timeseries\", "
           + " \"dataSource\": \"sample_datasource\", "
@@ -289,7 +186,6 @@ public class TestHiveDruidQueryBasedInputFormat extends 
TestCase {
     conf.set(Constants.DRUID_DATA_SOURCE, dataSource);
     conf.set(Constants.DRUID_QUERY_JSON, jsonQuery);
     conf.set(Constants.DRUID_QUERY_TYPE, queryType);
-    conf.setBoolean(HiveConf.ConfVars.HIVE_DRUID_SELECT_DISTRIBUTE.varname, 
false);
     return conf;
   }
 

Reply via email to