HIVE-17623 : Fix Select query Fix Double column serde and some refactoring (Slim Bouguerra via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e0484b78 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e0484b78 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e0484b78 Branch: refs/heads/hive-14535 Commit: e0484b789cde46a62349493cfee697eaa258d6ad Parents: 4142c98 Author: Slim Bouguerra <slim.bougue...@gmail.com> Authored: Thu Sep 28 14:54:07 2017 -0700 Committer: Ashutosh Chauhan <hashut...@apache.org> Committed: Thu Sep 28 14:54:07 2017 -0700 ---------------------------------------------------------------------- .../hive/druid/DruidStorageHandlerUtils.java | 17 +- .../druid/io/DruidQueryBasedInputFormat.java | 66 +- .../serde/DruidGroupByQueryRecordReader.java | 225 ++--- .../druid/serde/DruidQueryRecordReader.java | 215 +++- .../serde/DruidSelectQueryRecordReader.java | 26 +- .../hadoop/hive/druid/serde/DruidSerDe.java | 18 +- .../hive/druid/serde/DruidSerDeUtils.java | 27 +- .../serde/DruidTimeseriesQueryRecordReader.java | 22 +- .../druid/serde/DruidTopNQueryRecordReader.java | 38 +- .../hadoop/hive/druid/TestDruidSerDe.java | 918 ----------------- .../hadoop/hive/druid/serde/TestDruidSerDe.java | 991 +++++++++++++++++++ 11 files changed, 1324 insertions(+), 1239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 4852ff1..35ea94f 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -17,6 +17,18 @@ */ package org.apache.hadoop.hive.druid; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Throwables; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.core.NoopEmitter; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.response.InputStreamResponseHandler; import io.druid.common.utils.JodaUtils; import io.druid.jackson.DefaultObjectMapper; import io.druid.math.expr.ExprMacroTable; @@ -109,6 +121,7 @@ import java.util.Map; import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -191,7 +204,7 @@ public final class DruidStorageHandlerUtils { public static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner(); /** - * Method that creates a request for Druid JSON query (using SMILE). + * Method that creates a request for Druid query using SMILE format. * * @param address * @param query @@ -200,7 +213,7 @@ public final class DruidStorageHandlerUtils { * * @throws IOException */ - public static Request createRequest(String address, BaseQuery<?> query) + public static Request createSmileRequest(String address, io.druid.query.Query query) throws IOException { return new Request(HttpMethod.POST, new URL(String.format("%s/druid/v2/", "http://" + address))) .setContent(SMILE_MAPPER.writeValueAsBytes(query)) http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 3711595..209d60d 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -93,6 +93,20 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW protected static final Logger LOG = LoggerFactory.getLogger(DruidQueryBasedInputFormat.class); + public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) { + switch (druidQueryType) { + case Query.TIMESERIES: + return new DruidTimeseriesQueryRecordReader(); + case Query.TOPN: + return new DruidTopNQueryRecordReader(); + case Query.GROUP_BY: + return new DruidGroupByQueryRecordReader(); + case Query.SELECT: + return new DruidSelectQueryRecordReader(); + } + return null; + } + @Override public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { @@ -192,6 +206,7 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW final String request = String.format( "http://%s/druid/v2/datasources/%s/candidates?intervals=%s", address, query.getDataSource().getNames().get(0), URLEncoder.encode(intervals, "UTF-8")); + LOG.debug("sending request {} to query for segments", request); final InputStream response; try { response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.GET, new URL(request))); @@ -221,8 +236,12 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW // Create partial Select query final SegmentDescriptor newSD = new SegmentDescriptor( locatedSD.getInterval(), locatedSD.getVersion(), locatedSD.getPartitionNumber()); - final SelectQuery partialQuery = query.withQuerySegmentSpec( - new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD))); + //@TODO This is fetching all the rows at once from broker or multiple historical nodes + // Move to use scan query to avoid GC back pressure on the nodes + // https://issues.apache.org/jira/browse/HIVE-17627 + final SelectQuery partialQuery = query + .withQuerySegmentSpec(new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD))) + .withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE)); splits[i] = new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath, hosts); } @@ -256,7 +275,7 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW InputStream response; try { response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), - DruidStorageHandlerUtils.createRequest(address, metadataQuery) + DruidStorageHandlerUtils.createSmileRequest(address, metadataQuery) ); } catch (Exception e) { throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); @@ -309,7 +328,7 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW TimeBoundaryQuery timeQuery = timeBuilder.build(); try { response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), - DruidStorageHandlerUtils.createRequest(address, timeQuery) + DruidStorageHandlerUtils.createSmileRequest(address, timeQuery) ); } catch (Exception e) { throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); @@ -414,21 +433,10 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW reader.initialize((HiveDruidSplit) split, job); return reader; } - switch (druidQueryType) { - case Query.TIMESERIES: - reader = new DruidTimeseriesQueryRecordReader(); - break; - case Query.TOPN: - reader = new DruidTopNQueryRecordReader(); - break; - case Query.GROUP_BY: - reader = new DruidGroupByQueryRecordReader(); - break; - case Query.SELECT: - reader = new DruidSelectQueryRecordReader(); - break; - default: - throw new IOException("Druid query type not recognized"); + + reader = getDruidQueryReader(druidQueryType); + if (reader == null) { + throw new IOException("Druid query type " + druidQueryType + " not recognized"); } reader.initialize((HiveDruidSplit) split, job); return reader; @@ -444,22 +452,10 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW if (druidQueryType == null) { return new DruidSelectQueryRecordReader(); // By default } - final DruidQueryRecordReader<?, ?> reader; - switch (druidQueryType) { - case Query.TIMESERIES: - reader = new DruidTimeseriesQueryRecordReader(); - break; - case Query.TOPN: - reader = new DruidTopNQueryRecordReader(); - break; - case Query.GROUP_BY: - reader = new DruidGroupByQueryRecordReader(); - break; - case Query.SELECT: - reader = new DruidSelectQueryRecordReader(); - break; - default: - throw new IOException("Druid query type not recognized"); + final DruidQueryRecordReader<?, ?> reader = + getDruidQueryReader(druidQueryType); + if (reader == null) { + throw new IOException("Druid query type " + druidQueryType + " not recognized"); } return reader; } http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java index b5b254a..359ed36 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java @@ -18,113 +18,105 @@ package org.apache.hadoop.hive.druid.serde; import java.io.IOException; -import java.io.InputStream; import java.util.List; - +import java.util.Map; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.http.client.HttpClient; +import io.druid.data.input.MapBasedRow; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.dimension.ExtractionDimensionSpec; +import io.druid.query.extraction.TimeFormatExtractionFn; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; -import org.joda.time.format.ISODateTimeFormat; import com.fasterxml.jackson.core.type.TypeReference; import io.druid.data.input.Row; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.PostAggregator; -import io.druid.query.dimension.DimensionSpec; import io.druid.query.groupby.GroupByQuery; +import org.joda.time.format.ISODateTimeFormat; + +import static org.apache.hadoop.hive.druid.serde.DruidSerDeUtils.ISO_TIME_FORMAT; /** * Record reader for results for Druid GroupByQuery. */ public class DruidGroupByQueryRecordReader extends DruidQueryRecordReader<GroupByQuery, Row> { + private final static TypeReference<Row> TYPE_REFERENCE = new TypeReference<Row>() { + }; - private Row current; - - private int[] indexes = new int[0]; + private MapBasedRow currentRow; + private Map<String, Object> currentEvent; - // Grouping dimensions can have different types if we are grouping using an - // extraction function - private PrimitiveTypeInfo[] dimensionTypes; - - // Row objects returned by GroupByQuery have different access paths depending on - // whether the result for the metric is a Float or a Long, thus we keep track - // using these converters - private Extract[] extractors; + private List<String> timeExtractionFields = Lists.newArrayList(); + private List<String> intFormattedTimeExtractionFields = Lists.newArrayList(); @Override public void initialize(InputSplit split, Configuration conf) throws IOException { super.initialize(split, conf); initDimensionTypes(); - initExtractors(); } @Override - protected GroupByQuery createQuery(String content) throws IOException { - return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, GroupByQuery.class); + public void initialize(InputSplit split, Configuration conf, ObjectMapper mapper, + ObjectMapper smileMapper, HttpClient httpClient + ) throws IOException { + super.initialize(split, conf, mapper, smileMapper, httpClient); + initDimensionTypes(); } @Override - protected List<Row> createResultsList(InputStream content) throws IOException { - return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content, - new TypeReference<List<Row>>() { - } - ); + protected JavaType getResultTypeDef() { + return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE); } private void initDimensionTypes() throws IOException { - dimensionTypes = new PrimitiveTypeInfo[query.getDimensions().size()]; - for (int i = 0; i < query.getDimensions().size(); i++) { - dimensionTypes[i] = DruidSerDeUtils.extractTypeFromDimension(query.getDimensions().get(i)); - } - } - - private void initExtractors() throws IOException { - extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs() - .size()]; - int counter = 0; - for (int i = 0; i < query.getAggregatorSpecs().size(); i++, counter++) { - AggregatorFactory af = query.getAggregatorSpecs().get(i); - switch (af.getTypeName().toUpperCase()) { - case DruidSerDeUtils.FLOAT_TYPE: - extractors[counter] = Extract.FLOAT; - break; - case DruidSerDeUtils.LONG_TYPE: - extractors[counter] = Extract.LONG; - break; - default: - throw new IOException("Type not supported"); + //@TODO move this out of here to org.apache.hadoop.hive.druid.serde.DruidSerDe + List<DimensionSpec> dimensionSpecList = ((GroupByQuery) query).getDimensions(); + List<DimensionSpec> extractionDimensionSpecList = dimensionSpecList.stream() + .filter(dimensionSpecs -> dimensionSpecs instanceof ExtractionDimensionSpec) + .collect(Collectors.toList()); + extractionDimensionSpecList.stream().forEach(dimensionSpec -> { + ExtractionDimensionSpec extractionDimensionSpec = (ExtractionDimensionSpec) dimensionSpec; + if (extractionDimensionSpec.getExtractionFn() instanceof TimeFormatExtractionFn) { + final TimeFormatExtractionFn timeFormatExtractionFn = (TimeFormatExtractionFn) extractionDimensionSpec + .getExtractionFn(); + if (timeFormatExtractionFn == null || timeFormatExtractionFn.getFormat().equals(ISO_TIME_FORMAT)) { + timeExtractionFields.add(extractionDimensionSpec.getOutputName()); + } else { + intFormattedTimeExtractionFields.add(extractionDimensionSpec.getOutputName()); + } } - } - for (int i = 0; i < query.getPostAggregatorSpecs().size(); i++, counter++) { - extractors[counter] = Extract.FLOAT; - } + }); } @Override public boolean nextKeyValue() { - // Refresh indexes - for (int i = indexes.length - 1; i >= 0; i--) { - if (indexes[i] > 0) { - indexes[i]--; - for (int j = i + 1; j < indexes.length; j++) { - indexes[j] = current.getDimension( - query.getDimensions().get(j).getOutputName()).size() - 1; - } - return true; - } - } // Results - if (results.hasNext()) { - current = results.next(); - indexes = new int[query.getDimensions().size()]; - for (int i = 0; i < query.getDimensions().size(); i++) { - DimensionSpec ds = query.getDimensions().get(i); - indexes[i] = current.getDimension(ds.getOutputName()).size() - 1; - } + + if (queryResultsIterator.hasNext()) { + final Row row = queryResultsIterator.next(); + // currently druid supports only MapBasedRow as Jackson SerDe so it should safe to cast without check + currentRow = (MapBasedRow) row; + //@TODO move this out of here to org.apache.hadoop.hive.druid.serde.DruidSerDe + currentEvent = Maps.transformEntries(currentRow.getEvent(), + (key, value1) -> { + if (timeExtractionFields.contains(key)) { + return ISODateTimeFormat.dateTimeParser().parseMillis((String) value1); + } + if (intFormattedTimeExtractionFields.contains(key)) { + return Integer.valueOf((String) value1); + } + return value1; + } + ); return true; } return false; @@ -140,49 +132,9 @@ public class DruidGroupByQueryRecordReader // Create new value DruidWritable value = new DruidWritable(); // 1) The timestamp column - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, currentRow.getTimestamp().getMillis()); // 2) The dimension columns - for (int i = 0; i < query.getDimensions().size(); i++) { - DimensionSpec ds = query.getDimensions().get(i); - List<String> dims = current.getDimension(ds.getOutputName()); - if (dims.size() == 0) { - // NULL value for dimension - value.getValue().put(ds.getOutputName(), null); - } else { - int pos = dims.size() - indexes[i] - 1; - Object val; - switch (dimensionTypes[i].getPrimitiveCategory()) { - case TIMESTAMP: - // FLOOR extraction function - val = ISODateTimeFormat.dateTimeParser().parseMillis((String) dims.get(pos)); - break; - case INT: - // EXTRACT extraction function - val = Integer.valueOf((String) dims.get(pos)); - break; - default: - val = dims.get(pos); - } - value.getValue().put(ds.getOutputName(), val); - } - } - int counter = 0; - // 3) The aggregation columns - for (AggregatorFactory af : query.getAggregatorSpecs()) { - switch (extractors[counter++]) { - case FLOAT: - value.getValue().put(af.getName(), current.getFloatMetric(af.getName())); - break; - case LONG: - value.getValue().put(af.getName(), current.getLongMetric(af.getName())); - break; - } - } - // 4) The post-aggregation columns - for (PostAggregator pa : query.getPostAggregatorSpecs()) { - assert extractors[counter++] == Extract.FLOAT; - value.getValue().put(pa.getName(), current.getFloatMetric(pa.getName())); - } + value.getValue().putAll(currentEvent); return value; } @@ -192,49 +144,9 @@ public class DruidGroupByQueryRecordReader // Update value value.getValue().clear(); // 1) The timestamp column - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, currentRow.getTimestamp().getMillis()); // 2) The dimension columns - for (int i = 0; i < query.getDimensions().size(); i++) { - DimensionSpec ds = query.getDimensions().get(i); - List<String> dims = current.getDimension(ds.getOutputName()); - if (dims.size() == 0) { - // NULL value for dimension - value.getValue().put(ds.getOutputName(), null); - } else { - int pos = dims.size() - indexes[i] - 1; - Object val; - switch (dimensionTypes[i].getPrimitiveCategory()) { - case TIMESTAMP: - // FLOOR extraction function - val = ISODateTimeFormat.dateTimeParser().parseMillis((String) dims.get(pos)); - break; - case INT: - // EXTRACT extraction function - val = Integer.valueOf((String) dims.get(pos)); - break; - default: - val = dims.get(pos); - } - value.getValue().put(ds.getOutputName(), val); - } - } - int counter = 0; - // 3) The aggregation columns - for (AggregatorFactory af : query.getAggregatorSpecs()) { - switch (extractors[counter++]) { - case FLOAT: - value.getValue().put(af.getName(), current.getFloatMetric(af.getName())); - break; - case LONG: - value.getValue().put(af.getName(), current.getLongMetric(af.getName())); - break; - } - } - // 4) The post-aggregation columns - for (PostAggregator pa : query.getPostAggregatorSpecs()) { - assert extractors[counter++] == Extract.FLOAT; - value.getValue().put(pa.getName(), current.getFloatMetric(pa.getName())); - } + value.getValue().putAll(currentEvent); return true; } return false; @@ -242,12 +154,7 @@ public class DruidGroupByQueryRecordReader @Override public float getProgress() throws IOException { - return results.hasNext() ? 0 : 1; - } - - private enum Extract { - FLOAT, - LONG + return queryResultsIterator.hasNext() ? 0 : 1; } } http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java index 103591d..de06533 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java @@ -17,14 +17,23 @@ */ package org.apache.hadoop.hive.druid.serde; -import com.google.common.collect.Iterators; -import com.metamx.common.lifecycle.Lifecycle; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.ObjectCodec; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.InputStreamResponseHandler; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.RE; +import io.druid.java.util.common.guava.CloseQuietly; import io.druid.query.BaseQuery; +import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.io.HiveDruidSplit; @@ -32,14 +41,16 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.joda.time.Period; +import org.apache.parquet.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; -import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; /** * Base record reader for given a Druid query. This class contains the logic to @@ -56,57 +67,65 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends C private static final Logger LOG = LoggerFactory.getLogger(DruidQueryRecordReader.class); + private HttpClient httpClient; + private ObjectMapper mapper; + // Smile mapper is used to read query results that are serialized as binary instead of json + private ObjectMapper smileMapper; + /** * Query that Druid executes. */ - protected T query; + protected Query query; /** - * Query results. + * Query results as a streaming iterator. */ - protected Iterator<R> results = Iterators.emptyIterator(); + protected JsonParserIterator<R> queryResultsIterator = null; + + /** + * Result type definition used to read the rows, this is query dependent. + */ + protected JavaType resultsType = null; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException { initialize(split, context.getConfiguration()); } - public void initialize(InputSplit split, Configuration conf) throws IOException { + public void initialize(InputSplit split, Configuration conf, ObjectMapper mapper, + ObjectMapper smileMapper, HttpClient httpClient + ) throws IOException { HiveDruidSplit hiveDruidSplit = (HiveDruidSplit) split; - + Preconditions.checkNotNull(hiveDruidSplit, "input split is null ???"); + this.mapper = Preconditions.checkNotNull(mapper, "object Mapper can not be null"); + // Smile mapper is used to read query results that are serilized as binary instead of json + this.smileMapper = Preconditions.checkNotNull(smileMapper, "Smile Mapper can not be null"); // Create query - query = createQuery(hiveDruidSplit.getDruidQuery()); - + this.query = this.mapper.readValue(Preconditions.checkNotNull(hiveDruidSplit.getDruidQuery()), Query.class); + Preconditions.checkNotNull(query); + this.resultsType = getResultTypeDef(); + this.httpClient = Preconditions.checkNotNull(httpClient, "need Http Client"); // Execute query - if (LOG.isInfoEnabled()) { - LOG.info("Retrieving from druid using query:\n " + query); - } - - InputStream response; - try { - response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), - DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getLocations()[0], query)); - } catch (Exception e) { - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + LOG.debug("Retrieving data from druid using query:\n " + query); + final String address = hiveDruidSplit.getLocations()[0]; + if (Strings.isNullOrEmpty(address)) { + throw new IOException("can not fetch results form empty or null host value"); } - - // Retrieve results - List<R> resultsList; - try { - resultsList = createResultsList(response); - } catch (IOException e) { - response.close(); - throw e; - } - if (resultsList == null || resultsList.isEmpty()) { - return; - } - results = resultsList.iterator(); + Request request = DruidStorageHandlerUtils.createSmileRequest(address, query); + Future<InputStream> inputStreamFuture = this.httpClient + .go(request, new InputStreamResponseHandler()); + queryResultsIterator = new JsonParserIterator(this.smileMapper, resultsType, inputStreamFuture, + request.getUrl().toString(), query + ); } - protected abstract T createQuery(String content) throws IOException; + public void initialize(InputSplit split, Configuration conf) throws IOException { + initialize(split, conf, DruidStorageHandlerUtils.JSON_MAPPER, + DruidStorageHandlerUtils.SMILE_MAPPER, DruidStorageHandler.getHttpClient() + ); + } - protected abstract List<R> createResultsList(InputStream content) throws IOException; + protected abstract JavaType getResultTypeDef(); @Override public NullWritable createKey() { @@ -141,7 +160,123 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends C @Override public void close() { - // Nothing to do + CloseQuietly.close(queryResultsIterator); + } + + /** + * This is a helper wrapper class used to create an iterator of druid rows out of InputStream. + * The type of the rows is defined by org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader.JsonParserIterator#typeRef + * + * @param <R> druid Row type returned as result + */ + protected class JsonParserIterator<R extends Comparable<R>> implements Iterator<R>, Closeable + { + private JsonParser jp; + private ObjectCodec objectCodec; + private final ObjectMapper mapper; + private final JavaType typeRef; + private final Future<InputStream> future; + private final Query query; + private final String url; + + /** + * @param mapper mapper used to deserialize the stream of data (we use smile factory) + * @param typeRef Type definition of the results objects + * @param future Future holding the input stream (the input stream is not owned but it will be closed when org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader.JsonParserIterator#close() is called or reach the end of the steam) + * @param url URL used to fetch the data, used mostly as message with exception stack to identify the faulty stream, thus this can be empty string. + * @param query Query used to fetch the data, used mostly as message with exception stack, thus can be empty string. + */ + public JsonParserIterator(ObjectMapper mapper, + JavaType typeRef, + Future<InputStream> future, + String url, + Query query + ) + { + this.typeRef = typeRef; + this.future = future; + this.url = url; + this.query = query; + this.mapper = mapper; + jp = null; + } + + @Override + public boolean hasNext() + { + init(); + + if (jp.isClosed()) { + return false; + } + if (jp.getCurrentToken() == JsonToken.END_ARRAY) { + CloseQuietly.close(jp); + return false; + } + + return true; + } + + @Override + public R next() + { + init(); + + try { + final R retVal = objectCodec.readValue(jp, typeRef); + jp.nextToken(); + return retVal; + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + + private void init() + { + if (jp == null) { + try { + InputStream is = future.get(); + if (is == null) { + throw new IOException(String.format("query[%s] url[%s] timed out", query, url)); + } else { + jp = mapper.getFactory().createParser(is).configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, true); + } + final JsonToken nextToken = jp.nextToken(); + if (nextToken == JsonToken.START_OBJECT) { + QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class); + throw new QueryInterruptedException(cause); + } else if (nextToken != JsonToken.START_ARRAY) { + throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url); + } else { + jp.nextToken(); + objectCodec = jp.getCodec(); + } + } + catch (IOException | InterruptedException | ExecutionException e) { + throw new RE( + e, + "Failure getting results for query[%s] url[%s] because of [%s]", + query, + url, + e.getMessage() + ); + } + } + } + + @Override + public void close() throws IOException + { + CloseQuietly.close(jp); + } } + } http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java index 82eec5d..c0744b5 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java @@ -22,6 +22,8 @@ import java.io.InputStream; import java.util.Iterator; import java.util.List; +import com.fasterxml.jackson.databind.JavaType; +import io.druid.query.select.SelectQueryQueryToolChest; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.io.NullWritable; @@ -39,22 +41,18 @@ import io.druid.query.select.SelectResultValue; public class DruidSelectQueryRecordReader extends DruidQueryRecordReader<SelectQuery, Result<SelectResultValue>> { + private static final TypeReference<Result<SelectResultValue>> TYPE_REFERENCE = + new TypeReference<Result<SelectResultValue>>() + { + }; + private Result<SelectResultValue> current; private Iterator<EventHolder> values = Iterators.emptyIterator(); @Override - protected SelectQuery createQuery(String content) throws IOException { - return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, SelectQuery.class); - } - - @Override - protected List<Result<SelectResultValue>> createResultsList(InputStream content) - throws IOException { - return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content, - new TypeReference<List<Result<SelectResultValue>>>() { - } - ); + protected JavaType getResultTypeDef() { + return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE); } @Override @@ -62,8 +60,8 @@ public class DruidSelectQueryRecordReader if (values.hasNext()) { return true; } - if (results.hasNext()) { - current = results.next(); + if (queryResultsIterator.hasNext()) { + current = queryResultsIterator.next(); values = current.getValue().getEvents().iterator(); return nextKeyValue(); } @@ -100,7 +98,7 @@ public class DruidSelectQueryRecordReader @Override public float getProgress() { - return results.hasNext() || values.hasNext() ? 0 : 1; + return queryResultsIterator.hasNext() || values.hasNext() ? 0 : 1; } } http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 8750285..e6e01d1 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -132,21 +132,11 @@ public class DruidSerDe extends AbstractSerDe { properties.getProperty(serdeConstants.LIST_COLUMNS)); } columnTypes.addAll(Lists.transform(Utilities.getColumnTypes(properties), - new Function<String, PrimitiveTypeInfo>() { - @Override - public PrimitiveTypeInfo apply(String type) { - return TypeInfoFactory.getPrimitiveTypeInfo(type); - } - } + type -> TypeInfoFactory.getPrimitiveTypeInfo(type) )); inspectors.addAll(Lists.transform(columnTypes, - new Function<PrimitiveTypeInfo, ObjectInspector>() { - @Override - public ObjectInspector apply(PrimitiveTypeInfo type) { - return PrimitiveObjectInspectorFactory - .getPrimitiveWritableObjectInspector(type); - } - } + (Function<PrimitiveTypeInfo, ObjectInspector>) type -> PrimitiveObjectInspectorFactory + .getPrimitiveWritableObjectInspector(type) )); columns = columnNames.toArray(new String[columnNames.size()]); types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]); @@ -273,7 +263,7 @@ public class DruidSerDe extends AbstractSerDe { InputStream response; try { response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), - DruidStorageHandlerUtils.createRequest(address, query) + DruidStorageHandlerUtils.createSmileRequest(address, query) ); } catch (Exception e) { throw new SerDeException(StringUtils.stringifyException(e)); http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java index c8a63ab..86c325b 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.druid.serde; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.slf4j.Logger; @@ -37,6 +36,7 @@ public final class DruidSerDeUtils { protected static final String ISO_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; protected static final String FLOAT_TYPE = "FLOAT"; + protected static final String DOUBLE_TYPE = "DOUBLE"; protected static final String LONG_TYPE = "LONG"; protected static final String STRING_TYPE = "STRING"; @@ -47,6 +47,8 @@ public final class DruidSerDeUtils { switch (typeName) { case FLOAT_TYPE: return TypeInfoFactory.floatTypeInfo; + case DOUBLE_TYPE: + return TypeInfoFactory.doubleTypeInfo; case LONG_TYPE: return TypeInfoFactory.longTypeInfo; case STRING_TYPE: @@ -63,29 +65,6 @@ public final class DruidSerDeUtils { } } - /* This method converts from the String representation of Druid type - * to the String representation of the corresponding Hive type */ - public static String convertDruidToHiveTypeString(String typeName) { - typeName = typeName.toUpperCase(); - switch (typeName) { - case FLOAT_TYPE: - return serdeConstants.FLOAT_TYPE_NAME; - case LONG_TYPE: - return serdeConstants.BIGINT_TYPE_NAME; - case STRING_TYPE: - return serdeConstants.STRING_TYPE_NAME; - default: - // This is a guard for special Druid types e.g. hyperUnique - // (http://druid.io/docs/0.9.1.1/querying/aggregations.html#hyperunique-aggregator). - // Currently, we do not support doing anything special with them in Hive. - // However, those columns are there, and they can be actually read as normal - // dimensions e.g. with a select query. Thus, we print the warning and just read them - // as String. - LOG.warn("Transformation to STRING for unknown type " + typeName); - return serdeConstants.STRING_TYPE_NAME; - } - } - /* Extract type from dimension spec. It returns TIMESTAMP if it is a FLOOR, * INTEGER if it is a EXTRACT, or STRING otherwise. */ public static PrimitiveTypeInfo extractTypeFromDimension(DimensionSpec ds) { http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java index a1c8488..971af82 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; +import com.fasterxml.jackson.databind.JavaType; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.io.NullWritable; @@ -36,26 +37,19 @@ import io.druid.query.timeseries.TimeseriesResultValue; public class DruidTimeseriesQueryRecordReader extends DruidQueryRecordReader<TimeseriesQuery, Result<TimeseriesResultValue>> { + private static final TypeReference TYPE_REFERENCE = new TypeReference<Result<TimeseriesResultValue>>() { + }; private Result<TimeseriesResultValue> current; @Override - protected TimeseriesQuery createQuery(String content) throws IOException { - return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, TimeseriesQuery.class); - } - - @Override - protected List<Result<TimeseriesResultValue>> createResultsList(InputStream content) - throws IOException { - return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content, - new TypeReference<List<Result<TimeseriesResultValue>>>() { - } - ); + protected JavaType getResultTypeDef() { + return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE); } @Override public boolean nextKeyValue() { - if (results.hasNext()) { - current = results.next(); + if (queryResultsIterator.hasNext()) { + current = queryResultsIterator.next(); return true; } return false; @@ -89,7 +83,7 @@ public class DruidTimeseriesQueryRecordReader @Override public float getProgress() throws IOException { - return results.hasNext() ? 0 : 1; + return queryResultsIterator.hasNext() ? 0 : 1; } } http://git-wip-us.apache.org/repos/asf/hive/blob/e0484b78/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java index afdf670..6e1fffe 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,6 +22,7 @@ import java.io.InputStream; import java.util.Iterator; import java.util.List; +import com.fasterxml.jackson.databind.JavaType; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.io.NullWritable; @@ -39,22 +40,17 @@ import io.druid.query.topn.TopNResultValue; public class DruidTopNQueryRecordReader extends DruidQueryRecordReader<TopNQuery, Result<TopNResultValue>> { + private static final TypeReference<Result<TopNResultValue>> TYPE_REFERENCE = + new TypeReference<Result<TopNResultValue>>() { + }; + private Result<TopNResultValue> current; private Iterator<DimensionAndMetricValueExtractor> values = Iterators.emptyIterator(); @Override - protected TopNQuery createQuery(String content) throws IOException { - return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, TopNQuery.class); - } - - @Override - protected List<Result<TopNResultValue>> createResultsList(InputStream content) - throws IOException { - return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content, - new TypeReference<List<Result<TopNResultValue>>>() { - } - ); + protected JavaType getResultTypeDef() { + return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE); } @Override @@ -62,8 +58,8 @@ public class DruidTopNQueryRecordReader if (values.hasNext()) { return true; } - if (results.hasNext()) { - current = results.next(); + if (queryResultsIterator.hasNext()) { + current = queryResultsIterator.next(); values = current.getValue().getValue().iterator(); return nextKeyValue(); } @@ -79,7 +75,9 @@ public class DruidTopNQueryRecordReader public DruidWritable getCurrentValue() throws IOException, InterruptedException { // Create new value DruidWritable value = new DruidWritable(); - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + current.getTimestamp().getMillis() + ); if (values.hasNext()) { value.getValue().putAll(values.next().getBaseObject()); return value; @@ -92,7 +90,9 @@ public class DruidTopNQueryRecordReader if (nextKeyValue()) { // Update value value.getValue().clear(); - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + current.getTimestamp().getMillis() + ); if (values.hasNext()) { value.getValue().putAll(values.next().getBaseObject()); } @@ -103,7 +103,7 @@ public class DruidTopNQueryRecordReader @Override public float getProgress() { - return results.hasNext() || values.hasNext() ? 0 : 1; + return queryResultsIterator.hasNext() || values.hasNext() ? 0 : 1; } }