asdf2014 closed pull request #6590: optimize input row parsers URL: https://github.com/apache/incubator-druid/pull/6590
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java b/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java index 133e574f567..80db2c43965 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java @@ -36,6 +36,7 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>> { private final ParseSpec parseSpec; + private final List<String> dimensions; @JsonCreator public MapInputRowParser( @@ -43,20 +44,20 @@ public MapInputRowParser( ) { this.parseSpec = parseSpec; + this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); } @Override public List<InputRow> parseBatch(Map<String, Object> theMap) { - final List<String> dimensions = parseSpec.getDimensionsSpec().hasCustomDimensions() - ? parseSpec.getDimensionsSpec().getDimensionNames() - : Lists.newArrayList( - Sets.difference( - theMap.keySet(), - parseSpec.getDimensionsSpec() - .getDimensionExclusions() - ) - ); + final List<String> dimensions; + if (!this.dimensions.isEmpty()) { + dimensions = this.dimensions; + } else { + dimensions = Lists.newArrayList( + Sets.difference(theMap.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions()) + ); + } final DateTime timestamp; try { @@ -75,7 +76,7 @@ public MapInputRowParser( throw new ParseException(e, "Unparseable timestamp found! Event: %s", theMap); } - return ImmutableList.of(new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap)); + return ImmutableList.of(new MapBasedInputRow(timestamp, dimensions, theMap)); } @JsonProperty diff --git a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java index 96a7e0bcfdc..980f3694ebe 100644 --- a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java +++ b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.InputRowParser; @@ -129,6 +130,14 @@ public OrcHadoopInputRowParser( TimestampSpec timestampSpec = parseSpec.getTimestampSpec(); DateTime dateTime = timestampSpec.extractTimestamp(map); + final List<String> dimensions; + if (!this.dimensions.isEmpty()) { + dimensions = this.dimensions; + } else { + dimensions = Lists.newArrayList( + Sets.difference(map.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions()) + ); + } return ImmutableList.of(new MapBasedInputRow(dateTime, dimensions, map)); } diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java index 740b888ee78..8e148d207d9 100644 --- a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.twitter.elephantbird.mapreduce.io.ThriftWritable; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; @@ -55,6 +57,7 @@ private Parser<String, Object> parser; private volatile Class<TBase> thriftClass = null; + private final List<String> dimensions; @JsonCreator public ThriftInputRowParser( @@ -68,6 +71,7 @@ public ThriftInputRowParser( Preconditions.checkNotNull(thriftClassName, "thrift class name"); this.parseSpec = parseSpec; + this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); } public Class<TBase> getThriftClass() @@ -139,10 +143,17 @@ public ThriftInputRowParser( } Map<String, Object> record = parser.parseToMap(json); - + final List<String> dimensions; + if (!this.dimensions.isEmpty()) { + dimensions = this.dimensions; + } else { + dimensions = Lists.newArrayList( + Sets.difference(record.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions()) + ); + } return ImmutableList.of(new MapBasedInputRow( parseSpec.getTimestampSpec().extractTimestamp(record), - parseSpec.getDimensionsSpec().getDimensionNames(), + dimensions, record )); } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java index 19de982b974..83b27349b18 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java @@ -23,6 +23,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.druid.data.input.avro.AvroParsers; import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.java.util.common.parsers.ObjectFlattener; @@ -33,6 +34,7 @@ private final ParseSpec parseSpec; private final boolean fromPigAvroStorage; private final ObjectFlattener<GenericRecord> avroFlattener; + private final MapInputRowParser mapParser; @JsonCreator public AvroHadoopInputRowParser( @@ -43,12 +45,13 @@ public AvroHadoopInputRowParser( this.parseSpec = parseSpec; this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage; this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false); + this.mapParser = new MapInputRowParser(parseSpec); } @Override public List<InputRow> parseBatch(GenericRecord record) { - return AvroParsers.parseGenericRecord(record, parseSpec, avroFlattener); + return AvroParsers.parseGenericRecord(record, mapParser, avroFlattener); } @JsonProperty diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java index ab5c2d36547..749970f6505 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.druid.data.input.avro.AvroBytesDecoder; import org.apache.druid.data.input.avro.AvroParsers; +import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.java.util.common.parsers.ObjectFlattener; @@ -36,6 +37,7 @@ private final ParseSpec parseSpec; private final AvroBytesDecoder avroBytesDecoder; private final ObjectFlattener<GenericRecord> avroFlattener; + private final MapInputRowParser mapParser; @JsonCreator public AvroStreamInputRowParser( @@ -46,12 +48,13 @@ public AvroStreamInputRowParser( this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec"); this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder"); this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false); + this.mapParser = new MapInputRowParser(parseSpec); } @Override public List<InputRow> parseBatch(ByteBuffer input) { - return AvroParsers.parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, avroFlattener); + return AvroParsers.parseGenericRecord(avroBytesDecoder.parse(input), mapParser, avroFlattener); } @JsonProperty diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java index 1adb4bf827e..92ea3ae1bda 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java @@ -54,10 +54,10 @@ private AvroParsers() public static List<InputRow> parseGenericRecord( GenericRecord record, - ParseSpec parseSpec, + MapInputRowParser mapParser, ObjectFlattener<GenericRecord> avroFlattener ) { - return new MapInputRowParser(parseSpec).parseBatch(avroFlattener.flatten(record)); + return mapParser.parseBatch(avroFlattener.flatten(record)); } } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java index cdf1f85c9a5..330d9e11f37 100755 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -40,7 +41,6 @@ import org.joda.time.DateTime; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -51,7 +51,7 @@ private final boolean binaryAsString; private final TimestampSpec timestampSpec; private final ObjectFlattener<GenericRecord> recordFlattener; - + private final List<String> dimensions; @JsonCreator public ParquetAvroHadoopInputRowParser( @@ -61,6 +61,7 @@ public ParquetAvroHadoopInputRowParser( { this.parseSpec = parseSpec; this.timestampSpec = parseSpec.getTimestampSpec(); + this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); this.binaryAsString = binaryAsString == null ? false : binaryAsString; final JSONPathSpec flattenSpec; @@ -95,15 +96,14 @@ private LogicalType determineTimestampSpecLogicalType(Schema schema, String time { Map<String, Object> row = recordFlattener.flatten(record); - final List<String> dimensions = parseSpec.getDimensionsSpec().hasCustomDimensions() - ? parseSpec.getDimensionsSpec().getDimensionNames() - : new ArrayList( - Sets.difference( - row.keySet(), - parseSpec.getDimensionsSpec() - .getDimensionExclusions() - ) - ); + final List<String> dimensions; + if (!this.dimensions.isEmpty()) { + dimensions = this.dimensions; + } else { + dimensions = Lists.newArrayList( + Sets.difference(row.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions()) + ); + } // check for parquet Date // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date LogicalType logicalType = determineTimestampSpecLogicalType(record.getSchema(), timestampSpec.getTimestampColumn()); diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java index e47adeeeaa8..6fb2d5a11a0 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.github.os72.protobuf.dynamic.DynamicSchema; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.Descriptor; @@ -53,7 +55,7 @@ private final String protoMessageType; private final Descriptor descriptor; private Parser<String, Object> parser; - + private final List<String> dimensions; @JsonCreator public ProtobufInputRowParser( @@ -66,6 +68,7 @@ public ProtobufInputRowParser( this.descriptorFilePath = descriptorFilePath; this.protoMessageType = protoMessageType; this.descriptor = getDescriptor(descriptorFilePath); + this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); } @Override @@ -98,9 +101,17 @@ public ProtobufInputRowParser withParseSpec(ParseSpec parseSpec) } Map<String, Object> record = parser.parseToMap(json); + final List<String> dimensions; + if (!this.dimensions.isEmpty()) { + dimensions = this.dimensions; + } else { + dimensions = Lists.newArrayList( + Sets.difference(record.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions()) + ); + } return ImmutableList.of(new MapBasedInputRow( parseSpec.getTimestampSpec().extractTimestamp(record), - parseSpec.getDimensionsSpec().getDimensionNames(), + dimensions, record )); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org