HIVE-17623 : Fix Select query Fix Double column serde and some refactoring 
(Slim Bouguerra via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <hashut...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e0484b78
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e0484b78
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e0484b78

Branch: refs/heads/hive-14535
Commit: e0484b789cde46a62349493cfee697eaa258d6ad
Parents: 4142c98
Author: Slim Bouguerra <slim.bougue...@gmail.com>
Authored: Thu Sep 28 14:54:07 2017 -0700
Committer: Ashutosh Chauhan <hashut...@apache.org>
Committed: Thu Sep 28 14:54:07 2017 -0700

----------------------------------------------------------------------
 .../hive/druid/DruidStorageHandlerUtils.java    |  17 +-
 .../druid/io/DruidQueryBasedInputFormat.java    |  66 +-
 .../serde/DruidGroupByQueryRecordReader.java    | 225 ++---
 .../druid/serde/DruidQueryRecordReader.java     | 215 +++-
 .../serde/DruidSelectQueryRecordReader.java     |  26 +-
 .../hadoop/hive/druid/serde/DruidSerDe.java     |  18 +-
 .../hive/druid/serde/DruidSerDeUtils.java       |  27 +-
 .../serde/DruidTimeseriesQueryRecordReader.java |  22 +-
 .../druid/serde/DruidTopNQueryRecordReader.java |  38 +-
 .../hadoop/hive/druid/TestDruidSerDe.java       | 918 -----------------
 .../hadoop/hive/druid/serde/TestDruidSerDe.java | 991 +++++++++++++++++++
 11 files changed, 1324 insertions(+), 1239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 4852ff1..35ea94f 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -17,6 +17,18 @@
  */
 package org.apache.hadoop.hive.druid;
 
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+import com.metamx.emitter.EmittingLogger;
+import com.metamx.emitter.core.NoopEmitter;
+import com.metamx.emitter.service.ServiceEmitter;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.response.InputStreamResponseHandler;
 import io.druid.common.utils.JodaUtils;
 import io.druid.jackson.DefaultObjectMapper;
 import io.druid.math.expr.ExprMacroTable;
@@ -109,6 +121,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 
@@ -191,7 +204,7 @@ public final class DruidStorageHandlerUtils {
   public static final Interner<DataSegment> DATA_SEGMENT_INTERNER = 
Interners.newWeakInterner();
 
   /**
-   * Method that creates a request for Druid JSON query (using SMILE).
+   * Method that creates a request for Druid query using SMILE format.
    *
    * @param address
    * @param query
@@ -200,7 +213,7 @@ public final class DruidStorageHandlerUtils {
    *
    * @throws IOException
    */
-  public static Request createRequest(String address, BaseQuery<?> query)
+  public static Request createSmileRequest(String address, 
io.druid.query.Query query)
           throws IOException {
     return new Request(HttpMethod.POST, new URL(String.format("%s/druid/v2/", 
"http://"; + address)))
             .setContent(SMILE_MAPPER.writeValueAsBytes(query))

http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index 3711595..209d60d 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -93,6 +93,20 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
 
   protected static final Logger LOG = 
LoggerFactory.getLogger(DruidQueryBasedInputFormat.class);
 
+  public static DruidQueryRecordReader getDruidQueryReader(String 
druidQueryType) {
+    switch (druidQueryType) {
+    case Query.TIMESERIES:
+      return new DruidTimeseriesQueryRecordReader();
+    case Query.TOPN:
+      return new DruidTopNQueryRecordReader();
+    case Query.GROUP_BY:
+      return new DruidGroupByQueryRecordReader();
+    case Query.SELECT:
+      return new DruidSelectQueryRecordReader();
+    }
+    return null;
+  }
+
   @Override
   public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int 
numSplits)
           throws IOException {
@@ -192,6 +206,7 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
     final String request = String.format(
             "http://%s/druid/v2/datasources/%s/candidates?intervals=%s";,
             address, query.getDataSource().getNames().get(0), 
URLEncoder.encode(intervals, "UTF-8"));
+    LOG.debug("sending request {} to query for segments", request);
     final InputStream response;
     try {
       response = 
DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), new 
Request(HttpMethod.GET, new URL(request)));
@@ -221,8 +236,12 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
       // Create partial Select query
       final SegmentDescriptor newSD = new SegmentDescriptor(
               locatedSD.getInterval(), locatedSD.getVersion(), 
locatedSD.getPartitionNumber());
-      final SelectQuery partialQuery = query.withQuerySegmentSpec(
-              new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD)));
+      //@TODO This is fetching all the rows at once from broker or multiple 
historical nodes
+      // Move to use scan query to avoid GC back pressure on the nodes
+      // https://issues.apache.org/jira/browse/HIVE-17627
+      final SelectQuery partialQuery = query
+              .withQuerySegmentSpec(new 
MultipleSpecificSegmentSpec(Lists.newArrayList(newSD)))
+              .withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
       splits[i] = new 
HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery),
               dummyPath, hosts);
     }
@@ -256,7 +275,7 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
     InputStream response;
     try {
       response = 
DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
-              DruidStorageHandlerUtils.createRequest(address, metadataQuery)
+              DruidStorageHandlerUtils.createSmileRequest(address, 
metadataQuery)
       );
     } catch (Exception e) {
       throw new 
IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
@@ -309,7 +328,7 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
       TimeBoundaryQuery timeQuery = timeBuilder.build();
       try {
         response = 
DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
-                DruidStorageHandlerUtils.createRequest(address, timeQuery)
+                DruidStorageHandlerUtils.createSmileRequest(address, timeQuery)
         );
       } catch (Exception e) {
         throw new 
IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
@@ -414,21 +433,10 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
       reader.initialize((HiveDruidSplit) split, job);
       return reader;
     }
-    switch (druidQueryType) {
-      case Query.TIMESERIES:
-        reader = new DruidTimeseriesQueryRecordReader();
-        break;
-      case Query.TOPN:
-        reader = new DruidTopNQueryRecordReader();
-        break;
-      case Query.GROUP_BY:
-        reader = new DruidGroupByQueryRecordReader();
-        break;
-      case Query.SELECT:
-        reader = new DruidSelectQueryRecordReader();
-        break;
-      default:
-        throw new IOException("Druid query type not recognized");
+
+    reader = getDruidQueryReader(druidQueryType);
+    if (reader == null) {
+      throw new IOException("Druid query type " + druidQueryType + " not 
recognized");
     }
     reader.initialize((HiveDruidSplit) split, job);
     return reader;
@@ -444,22 +452,10 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
     if (druidQueryType == null) {
       return new DruidSelectQueryRecordReader(); // By default
     }
-    final DruidQueryRecordReader<?, ?> reader;
-    switch (druidQueryType) {
-      case Query.TIMESERIES:
-        reader = new DruidTimeseriesQueryRecordReader();
-        break;
-      case Query.TOPN:
-        reader = new DruidTopNQueryRecordReader();
-        break;
-      case Query.GROUP_BY:
-        reader = new DruidGroupByQueryRecordReader();
-        break;
-      case Query.SELECT:
-        reader = new DruidSelectQueryRecordReader();
-        break;
-      default:
-        throw new IOException("Druid query type not recognized");
+    final DruidQueryRecordReader<?, ?> reader =
+            getDruidQueryReader(druidQueryType);
+    if (reader == null) {
+      throw new IOException("Druid query type " + druidQueryType + " not 
recognized");
     }
     return reader;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
index b5b254a..359ed36 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
@@ -18,113 +18,105 @@
 package org.apache.hadoop.hive.druid.serde;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.List;
-
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.metamx.http.client.HttpClient;
+import io.druid.data.input.MapBasedRow;
+import io.druid.query.dimension.DimensionSpec;
+import io.druid.query.dimension.ExtractionDimensionSpec;
+import io.druid.query.extraction.TimeFormatExtractionFn;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.joda.time.format.ISODateTimeFormat;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 
 import io.druid.data.input.Row;
-import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.query.aggregation.PostAggregator;
-import io.druid.query.dimension.DimensionSpec;
 import io.druid.query.groupby.GroupByQuery;
+import org.joda.time.format.ISODateTimeFormat;
+
+import static 
org.apache.hadoop.hive.druid.serde.DruidSerDeUtils.ISO_TIME_FORMAT;
 
 /**
  * Record reader for results for Druid GroupByQuery.
  */
 public class DruidGroupByQueryRecordReader
         extends DruidQueryRecordReader<GroupByQuery, Row> {
+  private final static TypeReference<Row> TYPE_REFERENCE = new 
TypeReference<Row>() {
+  };
 
-  private Row current;
-
-  private int[] indexes = new int[0];
+  private MapBasedRow currentRow;
+  private Map<String, Object> currentEvent;
 
-  // Grouping dimensions can have different types if we are grouping using an
-  // extraction function
-  private PrimitiveTypeInfo[] dimensionTypes;
-
-  // Row objects returned by GroupByQuery have different access paths 
depending on
-  // whether the result for the metric is a Float or a Long, thus we keep track
-  // using these converters
-  private Extract[] extractors;
+  private List<String> timeExtractionFields = Lists.newArrayList();
+  private List<String> intFormattedTimeExtractionFields = Lists.newArrayList();
 
   @Override
   public void initialize(InputSplit split, Configuration conf) throws 
IOException {
     super.initialize(split, conf);
     initDimensionTypes();
-    initExtractors();
   }
 
   @Override
-  protected GroupByQuery createQuery(String content) throws IOException {
-    return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, 
GroupByQuery.class);
+  public void initialize(InputSplit split, Configuration conf, ObjectMapper 
mapper,
+          ObjectMapper smileMapper, HttpClient httpClient
+  ) throws IOException {
+    super.initialize(split, conf, mapper, smileMapper, httpClient);
+    initDimensionTypes();
   }
 
   @Override
-  protected List<Row> createResultsList(InputStream content) throws 
IOException {
-    return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
-            new TypeReference<List<Row>>() {
-            }
-    );
+  protected JavaType getResultTypeDef() {
+    return 
DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE);
   }
 
   private void initDimensionTypes() throws IOException {
-    dimensionTypes = new PrimitiveTypeInfo[query.getDimensions().size()];
-    for (int i = 0; i < query.getDimensions().size(); i++) {
-      dimensionTypes[i] = 
DruidSerDeUtils.extractTypeFromDimension(query.getDimensions().get(i));
-    }
-  }
-
-  private void initExtractors() throws IOException {
-    extractors = new Extract[query.getAggregatorSpecs().size() + 
query.getPostAggregatorSpecs()
-            .size()];
-    int counter = 0;
-    for (int i = 0; i < query.getAggregatorSpecs().size(); i++, counter++) {
-      AggregatorFactory af = query.getAggregatorSpecs().get(i);
-      switch (af.getTypeName().toUpperCase()) {
-        case DruidSerDeUtils.FLOAT_TYPE:
-          extractors[counter] = Extract.FLOAT;
-          break;
-        case DruidSerDeUtils.LONG_TYPE:
-          extractors[counter] = Extract.LONG;
-          break;
-        default:
-          throw new IOException("Type not supported");
+    //@TODO move this out of here to 
org.apache.hadoop.hive.druid.serde.DruidSerDe
+    List<DimensionSpec> dimensionSpecList = ((GroupByQuery) 
query).getDimensions();
+    List<DimensionSpec> extractionDimensionSpecList = 
dimensionSpecList.stream()
+            .filter(dimensionSpecs -> dimensionSpecs instanceof 
ExtractionDimensionSpec)
+            .collect(Collectors.toList());
+    extractionDimensionSpecList.stream().forEach(dimensionSpec -> {
+      ExtractionDimensionSpec extractionDimensionSpec = 
(ExtractionDimensionSpec) dimensionSpec;
+      if (extractionDimensionSpec.getExtractionFn() instanceof 
TimeFormatExtractionFn) {
+        final TimeFormatExtractionFn timeFormatExtractionFn = 
(TimeFormatExtractionFn) extractionDimensionSpec
+                .getExtractionFn();
+        if (timeFormatExtractionFn  == null || 
timeFormatExtractionFn.getFormat().equals(ISO_TIME_FORMAT)) {
+          timeExtractionFields.add(extractionDimensionSpec.getOutputName());
+        } else {
+          
intFormattedTimeExtractionFields.add(extractionDimensionSpec.getOutputName());
+        }
       }
-    }
-    for (int i = 0; i < query.getPostAggregatorSpecs().size(); i++, counter++) 
{
-      extractors[counter] = Extract.FLOAT;
-    }
+    });
   }
 
   @Override
   public boolean nextKeyValue() {
-    // Refresh indexes
-    for (int i = indexes.length - 1; i >= 0; i--) {
-      if (indexes[i] > 0) {
-        indexes[i]--;
-        for (int j = i + 1; j < indexes.length; j++) {
-          indexes[j] = current.getDimension(
-                  query.getDimensions().get(j).getOutputName()).size() - 1;
-        }
-        return true;
-      }
-    }
     // Results
-    if (results.hasNext()) {
-      current = results.next();
-      indexes = new int[query.getDimensions().size()];
-      for (int i = 0; i < query.getDimensions().size(); i++) {
-        DimensionSpec ds = query.getDimensions().get(i);
-        indexes[i] = current.getDimension(ds.getOutputName()).size() - 1;
-      }
+
+    if (queryResultsIterator.hasNext()) {
+      final Row row = queryResultsIterator.next();
+      // currently druid supports only MapBasedRow as Jackson SerDe so it 
should safe to cast without check
+      currentRow = (MapBasedRow) row;
+      //@TODO move this out of here to 
org.apache.hadoop.hive.druid.serde.DruidSerDe
+      currentEvent = Maps.transformEntries(currentRow.getEvent(),
+              (key, value1) -> {
+                if (timeExtractionFields.contains(key)) {
+                  return 
ISODateTimeFormat.dateTimeParser().parseMillis((String) value1);
+                }
+                if (intFormattedTimeExtractionFields.contains(key)) {
+                  return Integer.valueOf((String) value1);
+                }
+                return value1;
+              }
+      );
       return true;
     }
     return false;
@@ -140,49 +132,9 @@ public class DruidGroupByQueryRecordReader
     // Create new value
     DruidWritable value = new DruidWritable();
     // 1) The timestamp column
-    value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, 
current.getTimestamp().getMillis());
+    value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, 
currentRow.getTimestamp().getMillis());
     // 2) The dimension columns
-    for (int i = 0; i < query.getDimensions().size(); i++) {
-      DimensionSpec ds = query.getDimensions().get(i);
-      List<String> dims = current.getDimension(ds.getOutputName());
-      if (dims.size() == 0) {
-        // NULL value for dimension
-        value.getValue().put(ds.getOutputName(), null);
-      } else {
-        int pos = dims.size() - indexes[i] - 1;
-        Object val;
-        switch (dimensionTypes[i].getPrimitiveCategory()) {
-          case TIMESTAMP:
-            // FLOOR extraction function
-            val = ISODateTimeFormat.dateTimeParser().parseMillis((String) 
dims.get(pos));
-            break;
-          case INT:
-            // EXTRACT extraction function
-            val = Integer.valueOf((String) dims.get(pos));
-            break;
-          default:
-            val = dims.get(pos);
-        }
-        value.getValue().put(ds.getOutputName(), val);
-      }
-    }
-    int counter = 0;
-    // 3) The aggregation columns
-    for (AggregatorFactory af : query.getAggregatorSpecs()) {
-      switch (extractors[counter++]) {
-        case FLOAT:
-          value.getValue().put(af.getName(), 
current.getFloatMetric(af.getName()));
-          break;
-        case LONG:
-          value.getValue().put(af.getName(), 
current.getLongMetric(af.getName()));
-          break;
-      }
-    }
-    // 4) The post-aggregation columns
-    for (PostAggregator pa : query.getPostAggregatorSpecs()) {
-      assert extractors[counter++] == Extract.FLOAT;
-      value.getValue().put(pa.getName(), current.getFloatMetric(pa.getName()));
-    }
+    value.getValue().putAll(currentEvent);
     return value;
   }
 
@@ -192,49 +144,9 @@ public class DruidGroupByQueryRecordReader
       // Update value
       value.getValue().clear();
       // 1) The timestamp column
-      value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, 
current.getTimestamp().getMillis());
+      value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, 
currentRow.getTimestamp().getMillis());
       // 2) The dimension columns
-      for (int i = 0; i < query.getDimensions().size(); i++) {
-        DimensionSpec ds = query.getDimensions().get(i);
-        List<String> dims = current.getDimension(ds.getOutputName());
-        if (dims.size() == 0) {
-          // NULL value for dimension
-          value.getValue().put(ds.getOutputName(), null);
-        } else {
-          int pos = dims.size() - indexes[i] - 1;
-          Object val;
-          switch (dimensionTypes[i].getPrimitiveCategory()) {
-            case TIMESTAMP:
-              // FLOOR extraction function
-              val = ISODateTimeFormat.dateTimeParser().parseMillis((String) 
dims.get(pos));
-              break;
-            case INT:
-              // EXTRACT extraction function
-              val = Integer.valueOf((String) dims.get(pos));
-              break;
-            default:
-              val = dims.get(pos);
-          }
-          value.getValue().put(ds.getOutputName(), val);
-        }
-      }
-      int counter = 0;
-      // 3) The aggregation columns
-      for (AggregatorFactory af : query.getAggregatorSpecs()) {
-        switch (extractors[counter++]) {
-          case FLOAT:
-            value.getValue().put(af.getName(), 
current.getFloatMetric(af.getName()));
-            break;
-          case LONG:
-            value.getValue().put(af.getName(), 
current.getLongMetric(af.getName()));
-            break;
-        }
-      }
-      // 4) The post-aggregation columns
-      for (PostAggregator pa : query.getPostAggregatorSpecs()) {
-        assert extractors[counter++] == Extract.FLOAT;
-        value.getValue().put(pa.getName(), 
current.getFloatMetric(pa.getName()));
-      }
+      value.getValue().putAll(currentEvent);
       return true;
     }
     return false;
@@ -242,12 +154,7 @@ public class DruidGroupByQueryRecordReader
 
   @Override
   public float getProgress() throws IOException {
-    return results.hasNext() ? 0 : 1;
-  }
-
-  private enum Extract {
-    FLOAT,
-    LONG
+    return queryResultsIterator.hasNext() ? 0 : 1;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index 103591d..de06533 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -17,14 +17,23 @@
  */
 package org.apache.hadoop.hive.druid.serde;
 
-import com.google.common.collect.Iterators;
-import com.metamx.common.lifecycle.Lifecycle;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.ObjectCodec;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
+import com.metamx.http.client.Request;
+import com.metamx.http.client.response.InputStreamResponseHandler;
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.RE;
+import io.druid.java.util.common.guava.CloseQuietly;
 import io.druid.query.BaseQuery;
+import io.druid.query.Query;
+import io.druid.query.QueryInterruptedException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.DruidStorageHandler;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
 import org.apache.hadoop.hive.druid.io.HiveDruidSplit;
@@ -32,14 +41,16 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.joda.time.Period;
+import org.apache.parquet.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
-import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 /**
  * Base record reader for given a Druid query. This class contains the logic to
@@ -56,57 +67,65 @@ public abstract class DruidQueryRecordReader<T extends 
BaseQuery<R>, R extends C
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DruidQueryRecordReader.class);
 
+  private HttpClient httpClient;
+  private ObjectMapper mapper;
+  // Smile mapper is used to read query results that are serialized as binary 
instead of json
+  private ObjectMapper smileMapper;
+
   /**
    * Query that Druid executes.
    */
-  protected T query;
+  protected Query query;
 
   /**
-   * Query results.
+   * Query results as a streaming iterator.
    */
-  protected Iterator<R> results = Iterators.emptyIterator();
+  protected JsonParserIterator<R> queryResultsIterator =  null;
+
+  /**
+   * Result type definition used to read the rows, this is query dependent.
+   */
+  protected JavaType resultsType = null;
 
   @Override
   public void initialize(InputSplit split, TaskAttemptContext context) throws 
IOException {
     initialize(split, context.getConfiguration());
   }
 
-  public void initialize(InputSplit split, Configuration conf) throws 
IOException {
+  public void initialize(InputSplit split, Configuration conf, ObjectMapper 
mapper,
+          ObjectMapper smileMapper, HttpClient httpClient
+  ) throws IOException {
     HiveDruidSplit hiveDruidSplit = (HiveDruidSplit) split;
-
+    Preconditions.checkNotNull(hiveDruidSplit, "input split is null ???");
+    this.mapper = Preconditions.checkNotNull(mapper, "object Mapper can not be 
null");
+    // Smile mapper is used to read query results that are serilized as binary 
instead of json
+    this.smileMapper = Preconditions.checkNotNull(smileMapper, "Smile Mapper 
can not be null");
     // Create query
-    query = createQuery(hiveDruidSplit.getDruidQuery());
-
+    this.query = 
this.mapper.readValue(Preconditions.checkNotNull(hiveDruidSplit.getDruidQuery()),
 Query.class);
+    Preconditions.checkNotNull(query);
+    this.resultsType = getResultTypeDef();
+    this.httpClient = Preconditions.checkNotNull(httpClient, "need Http 
Client");
     // Execute query
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Retrieving from druid using query:\n " + query);
-    }
-
-    InputStream response;
-    try {
-      response = 
DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
-              
DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getLocations()[0], 
query));
-    } catch (Exception e) {
-      throw new 
IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    LOG.debug("Retrieving data from druid using query:\n " + query);
+    final String address = hiveDruidSplit.getLocations()[0];
+    if (Strings.isNullOrEmpty(address)) {
+      throw new IOException("can not fetch results form empty or null host 
value");
     }
-
-    // Retrieve results
-    List<R> resultsList;
-    try {
-      resultsList = createResultsList(response);
-    } catch (IOException e) {
-      response.close();
-      throw e;
-    }
-    if (resultsList == null || resultsList.isEmpty()) {
-      return;
-    }
-    results = resultsList.iterator();
+    Request request = DruidStorageHandlerUtils.createSmileRequest(address, 
query);
+    Future<InputStream> inputStreamFuture = this.httpClient
+            .go(request, new InputStreamResponseHandler());
+    queryResultsIterator = new JsonParserIterator(this.smileMapper, 
resultsType, inputStreamFuture,
+            request.getUrl().toString(), query
+    );
   }
 
-  protected abstract T createQuery(String content) throws IOException;
+  public void initialize(InputSplit split, Configuration conf) throws 
IOException {
+    initialize(split, conf, DruidStorageHandlerUtils.JSON_MAPPER,
+            DruidStorageHandlerUtils.SMILE_MAPPER, 
DruidStorageHandler.getHttpClient()
+    );
+  }
 
-  protected abstract List<R> createResultsList(InputStream content) throws 
IOException;
+  protected abstract JavaType getResultTypeDef();
 
   @Override
   public NullWritable createKey() {
@@ -141,7 +160,123 @@ public abstract class DruidQueryRecordReader<T extends 
BaseQuery<R>, R extends C
 
   @Override
   public void close() {
-    // Nothing to do
+    CloseQuietly.close(queryResultsIterator);
+  }
+
+  /**
+   * This is a helper wrapper class used to create an iterator of druid rows 
out of InputStream.
+   * The type of the rows is defined by 
org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader.JsonParserIterator#typeRef
+   *
+   * @param <R> druid Row type returned as result
+   */
+  protected class JsonParserIterator<R extends Comparable<R>> implements 
Iterator<R>, Closeable
+  {
+    private JsonParser jp;
+    private ObjectCodec objectCodec;
+    private final ObjectMapper mapper;
+    private final JavaType typeRef;
+    private final Future<InputStream> future;
+    private final Query query;
+    private final String url;
+
+    /**
+     * @param mapper mapper used to deserialize the stream of data (we use 
smile factory)
+     * @param typeRef Type definition of the results objects
+     * @param future Future holding the input stream (the input stream is not 
owned but it will be closed when 
org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader.JsonParserIterator#close()
 is called or reach the end of the steam)
+     * @param url URL used to fetch the data, used mostly as message with 
exception stack to identify the faulty stream, thus this can be empty string.
+     * @param query Query used to fetch the data, used mostly as message with 
exception stack, thus can be empty string.
+     */
+    public JsonParserIterator(ObjectMapper mapper,
+            JavaType typeRef,
+            Future<InputStream> future,
+            String url,
+            Query query
+    )
+    {
+      this.typeRef = typeRef;
+      this.future = future;
+      this.url = url;
+      this.query = query;
+      this.mapper = mapper;
+      jp = null;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      init();
+
+      if (jp.isClosed()) {
+        return false;
+      }
+      if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
+        CloseQuietly.close(jp);
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public R next()
+    {
+      init();
+
+      try {
+        final R retVal = objectCodec.readValue(jp, typeRef);
+        jp.nextToken();
+        return retVal;
+      }
+      catch (IOException e) {
+        throw Throwables.propagate(e);
+      }
+    }
+
+    @Override
+    public void remove()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    private void init()
+    {
+      if (jp == null) {
+        try {
+          InputStream is = future.get();
+          if (is == null) {
+            throw  new IOException(String.format("query[%s] url[%s] timed 
out", query, url));
+          } else {
+            jp = 
mapper.getFactory().createParser(is).configure(JsonParser.Feature.AUTO_CLOSE_SOURCE,
 true);
+          }
+          final JsonToken nextToken = jp.nextToken();
+          if (nextToken == JsonToken.START_OBJECT) {
+            QueryInterruptedException cause = jp.getCodec().readValue(jp, 
QueryInterruptedException.class);
+            throw new QueryInterruptedException(cause);
+          } else if (nextToken != JsonToken.START_ARRAY) {
+            throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url 
[%s]", jp.getCurrentToken(), url);
+          } else {
+            jp.nextToken();
+            objectCodec = jp.getCodec();
+          }
+        }
+        catch (IOException | InterruptedException | ExecutionException e) {
+          throw new RE(
+                  e,
+                  "Failure getting results for query[%s] url[%s] because of 
[%s]",
+                  query,
+                  url,
+                  e.getMessage()
+          );
+        }
+      }
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+      CloseQuietly.close(jp);
+    }
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
index 82eec5d..c0744b5 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
@@ -22,6 +22,8 @@ import java.io.InputStream;
 import java.util.Iterator;
 import java.util.List;
 
+import com.fasterxml.jackson.databind.JavaType;
+import io.druid.query.select.SelectQueryQueryToolChest;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
 import org.apache.hadoop.io.NullWritable;
 
@@ -39,22 +41,18 @@ import io.druid.query.select.SelectResultValue;
 public class DruidSelectQueryRecordReader
         extends DruidQueryRecordReader<SelectQuery, Result<SelectResultValue>> 
{
 
+  private static final TypeReference<Result<SelectResultValue>> TYPE_REFERENCE 
=
+          new TypeReference<Result<SelectResultValue>>()
+          {
+          };
+
   private Result<SelectResultValue> current;
 
   private Iterator<EventHolder> values = Iterators.emptyIterator();
 
   @Override
-  protected SelectQuery createQuery(String content) throws IOException {
-    return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, 
SelectQuery.class);
-  }
-
-  @Override
-  protected List<Result<SelectResultValue>> createResultsList(InputStream 
content)
-          throws IOException {
-    return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
-            new TypeReference<List<Result<SelectResultValue>>>() {
-            }
-    );
+  protected JavaType getResultTypeDef() {
+    return 
DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE);
   }
 
   @Override
@@ -62,8 +60,8 @@ public class DruidSelectQueryRecordReader
     if (values.hasNext()) {
       return true;
     }
-    if (results.hasNext()) {
-      current = results.next();
+    if (queryResultsIterator.hasNext()) {
+      current = queryResultsIterator.next();
       values = current.getValue().getEvents().iterator();
       return nextKeyValue();
     }
@@ -100,7 +98,7 @@ public class DruidSelectQueryRecordReader
 
   @Override
   public float getProgress() {
-    return results.hasNext() || values.hasNext() ? 0 : 1;
+    return queryResultsIterator.hasNext() || values.hasNext() ? 0 : 1;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index 8750285..e6e01d1 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -132,21 +132,11 @@ public class DruidSerDe extends AbstractSerDe {
                   properties.getProperty(serdeConstants.LIST_COLUMNS));
         }
         
columnTypes.addAll(Lists.transform(Utilities.getColumnTypes(properties),
-                new Function<String, PrimitiveTypeInfo>() {
-                  @Override
-                  public PrimitiveTypeInfo apply(String type) {
-                    return TypeInfoFactory.getPrimitiveTypeInfo(type);
-                  }
-                }
+                type -> TypeInfoFactory.getPrimitiveTypeInfo(type)
         ));
         inspectors.addAll(Lists.transform(columnTypes,
-                new Function<PrimitiveTypeInfo, ObjectInspector>() {
-                  @Override
-                  public ObjectInspector apply(PrimitiveTypeInfo type) {
-                    return PrimitiveObjectInspectorFactory
-                            .getPrimitiveWritableObjectInspector(type);
-                  }
-                }
+                (Function<PrimitiveTypeInfo, ObjectInspector>) type -> 
PrimitiveObjectInspectorFactory
+                        .getPrimitiveWritableObjectInspector(type)
         ));
         columns = columnNames.toArray(new String[columnNames.size()]);
         types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
@@ -273,7 +263,7 @@ public class DruidSerDe extends AbstractSerDe {
     InputStream response;
     try {
       response = 
DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(),
-              DruidStorageHandlerUtils.createRequest(address, query)
+              DruidStorageHandlerUtils.createSmileRequest(address, query)
       );
     } catch (Exception e) {
       throw new SerDeException(StringUtils.stringifyException(e));

http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
index c8a63ab..86c325b 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.druid.serde;
 
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.slf4j.Logger;
@@ -37,6 +36,7 @@ public final class DruidSerDeUtils {
   protected static final String ISO_TIME_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
 
   protected static final String FLOAT_TYPE = "FLOAT";
+  protected static final String DOUBLE_TYPE = "DOUBLE";
   protected static final String LONG_TYPE = "LONG";
   protected static final String STRING_TYPE = "STRING";
 
@@ -47,6 +47,8 @@ public final class DruidSerDeUtils {
     switch (typeName) {
       case FLOAT_TYPE:
         return TypeInfoFactory.floatTypeInfo;
+      case DOUBLE_TYPE:
+        return TypeInfoFactory.doubleTypeInfo;
       case LONG_TYPE:
         return TypeInfoFactory.longTypeInfo;
       case STRING_TYPE:
@@ -63,29 +65,6 @@ public final class DruidSerDeUtils {
     }
   }
 
-  /* This method converts from the String representation of Druid type
-   * to the String representation of the corresponding Hive type */
-  public static String convertDruidToHiveTypeString(String typeName) {
-    typeName = typeName.toUpperCase();
-    switch (typeName) {
-      case FLOAT_TYPE:
-        return serdeConstants.FLOAT_TYPE_NAME;
-      case LONG_TYPE:
-        return serdeConstants.BIGINT_TYPE_NAME;
-      case STRING_TYPE:
-        return serdeConstants.STRING_TYPE_NAME;
-      default:
-        // This is a guard for special Druid types e.g. hyperUnique
-        // 
(http://druid.io/docs/0.9.1.1/querying/aggregations.html#hyperunique-aggregator).
-        // Currently, we do not support doing anything special with them in 
Hive.
-        // However, those columns are there, and they can be actually read as 
normal
-        // dimensions e.g. with a select query. Thus, we print the warning and 
just read them
-        // as String.
-        LOG.warn("Transformation to STRING for unknown type " + typeName);
-        return serdeConstants.STRING_TYPE_NAME;
-    }
-  }
-
   /* Extract type from dimension spec. It returns TIMESTAMP if it is a FLOOR,
    * INTEGER if it is a EXTRACT, or STRING otherwise. */
   public static PrimitiveTypeInfo extractTypeFromDimension(DimensionSpec ds) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
index a1c8488..971af82 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 
+import com.fasterxml.jackson.databind.JavaType;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
 import org.apache.hadoop.io.NullWritable;
 
@@ -36,26 +37,19 @@ import io.druid.query.timeseries.TimeseriesResultValue;
 public class DruidTimeseriesQueryRecordReader
         extends DruidQueryRecordReader<TimeseriesQuery, 
Result<TimeseriesResultValue>> {
 
+  private static final TypeReference TYPE_REFERENCE = new 
TypeReference<Result<TimeseriesResultValue>>() {
+  };
   private Result<TimeseriesResultValue> current;
 
   @Override
-  protected TimeseriesQuery createQuery(String content) throws IOException {
-    return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, 
TimeseriesQuery.class);
-  }
-
-  @Override
-  protected List<Result<TimeseriesResultValue>> createResultsList(InputStream 
content)
-          throws IOException {
-    return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
-            new TypeReference<List<Result<TimeseriesResultValue>>>() {
-            }
-    );
+  protected JavaType getResultTypeDef() {
+    return 
DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE);
   }
 
   @Override
   public boolean nextKeyValue() {
-    if (results.hasNext()) {
-      current = results.next();
+    if (queryResultsIterator.hasNext()) {
+      current = queryResultsIterator.next();
       return true;
     }
     return false;
@@ -89,7 +83,7 @@ public class DruidTimeseriesQueryRecordReader
 
   @Override
   public float getProgress() throws IOException {
-    return results.hasNext() ? 0 : 1;
+    return queryResultsIterator.hasNext() ? 0 : 1;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
index afdf670..6e1fffe 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ import java.io.InputStream;
 import java.util.Iterator;
 import java.util.List;
 
+import com.fasterxml.jackson.databind.JavaType;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
 import org.apache.hadoop.io.NullWritable;
 
@@ -39,22 +40,17 @@ import io.druid.query.topn.TopNResultValue;
 public class DruidTopNQueryRecordReader
         extends DruidQueryRecordReader<TopNQuery, Result<TopNResultValue>> {
 
+  private static final TypeReference<Result<TopNResultValue>> TYPE_REFERENCE =
+          new TypeReference<Result<TopNResultValue>>() {
+          };
+
   private Result<TopNResultValue> current;
 
   private Iterator<DimensionAndMetricValueExtractor> values = 
Iterators.emptyIterator();
 
   @Override
-  protected TopNQuery createQuery(String content) throws IOException {
-    return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, 
TopNQuery.class);
-  }
-
-  @Override
-  protected List<Result<TopNResultValue>> createResultsList(InputStream 
content)
-          throws IOException {
-    return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
-            new TypeReference<List<Result<TopNResultValue>>>() {
-            }
-    );
+  protected JavaType getResultTypeDef() {
+    return 
DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE);
   }
 
   @Override
@@ -62,8 +58,8 @@ public class DruidTopNQueryRecordReader
     if (values.hasNext()) {
       return true;
     }
-    if (results.hasNext()) {
-      current = results.next();
+    if (queryResultsIterator.hasNext()) {
+      current = queryResultsIterator.next();
       values = current.getValue().getValue().iterator();
       return nextKeyValue();
     }
@@ -79,7 +75,9 @@ public class DruidTopNQueryRecordReader
   public DruidWritable getCurrentValue() throws IOException, 
InterruptedException {
     // Create new value
     DruidWritable value = new DruidWritable();
-    value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, 
current.getTimestamp().getMillis());
+    value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
+            current.getTimestamp().getMillis()
+    );
     if (values.hasNext()) {
       value.getValue().putAll(values.next().getBaseObject());
       return value;
@@ -92,7 +90,9 @@ public class DruidTopNQueryRecordReader
     if (nextKeyValue()) {
       // Update value
       value.getValue().clear();
-      value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, 
current.getTimestamp().getMillis());
+      value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
+              current.getTimestamp().getMillis()
+      );
       if (values.hasNext()) {
         value.getValue().putAll(values.next().getBaseObject());
       }
@@ -103,7 +103,7 @@ public class DruidTopNQueryRecordReader
 
   @Override
   public float getProgress() {
-    return results.hasNext() || values.hasNext() ? 0 : 1;
+    return queryResultsIterator.hasNext() || values.hasNext() ? 0 : 1;
   }
 
 }

Reply via email to