asdf2014 closed pull request #6590: optimize input row parsers
URL: https://github.com/apache/incubator-druid/pull/6590
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java 
b/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
index 133e574f567..80db2c43965 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
@@ -36,6 +36,7 @@
 public class MapInputRowParser implements InputRowParser<Map<String, Object>>
 {
   private final ParseSpec parseSpec;
+  private final List<String> dimensions;
 
   @JsonCreator
   public MapInputRowParser(
@@ -43,20 +44,20 @@ public MapInputRowParser(
   )
   {
     this.parseSpec = parseSpec;
+    this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
   }
 
   @Override
   public List<InputRow> parseBatch(Map<String, Object> theMap)
   {
-    final List<String> dimensions = 
parseSpec.getDimensionsSpec().hasCustomDimensions()
-                                    ? 
parseSpec.getDimensionsSpec().getDimensionNames()
-                                    : Lists.newArrayList(
-                                        Sets.difference(
-                                            theMap.keySet(),
-                                            parseSpec.getDimensionsSpec()
-                                                     .getDimensionExclusions()
-                                        )
-                                    );
+    final List<String> dimensions;
+    if (!this.dimensions.isEmpty()) {
+      dimensions = this.dimensions;
+    } else {
+      dimensions = Lists.newArrayList(
+          Sets.difference(theMap.keySet(), 
parseSpec.getDimensionsSpec().getDimensionExclusions())
+      );
+    }
 
     final DateTime timestamp;
     try {
@@ -75,7 +76,7 @@ public MapInputRowParser(
       throw new ParseException(e, "Unparseable timestamp found! Event: %s", 
theMap);
     }
 
-    return ImmutableList.of(new MapBasedInputRow(timestamp.getMillis(), 
dimensions, theMap));
+    return ImmutableList.of(new MapBasedInputRow(timestamp, dimensions, 
theMap));
   }
 
   @JsonProperty
diff --git 
a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
 
b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
index 96a7e0bcfdc..980f3694ebe 100644
--- 
a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
+++ 
b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
@@ -25,6 +25,7 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.impl.InputRowParser;
@@ -129,6 +130,14 @@ public OrcHadoopInputRowParser(
     TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
     DateTime dateTime = timestampSpec.extractTimestamp(map);
 
+    final List<String> dimensions;
+    if (!this.dimensions.isEmpty()) {
+      dimensions = this.dimensions;
+    } else {
+      dimensions = Lists.newArrayList(
+          Sets.difference(map.keySet(), 
parseSpec.getDimensionsSpec().getDimensionExclusions())
+      );
+    }
     return ImmutableList.of(new MapBasedInputRow(dateTime, dimensions, map));
   }
 
diff --git 
a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java
 
b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java
index 740b888ee78..8e148d207d9 100644
--- 
a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java
+++ 
b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java
@@ -23,6 +23,8 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
@@ -55,6 +57,7 @@
 
   private Parser<String, Object> parser;
   private volatile Class<TBase> thriftClass = null;
+  private final List<String> dimensions;
 
   @JsonCreator
   public ThriftInputRowParser(
@@ -68,6 +71,7 @@ public ThriftInputRowParser(
     Preconditions.checkNotNull(thriftClassName, "thrift class name");
 
     this.parseSpec = parseSpec;
+    this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
   }
 
   public Class<TBase> getThriftClass()
@@ -139,10 +143,17 @@ public ThriftInputRowParser(
     }
 
     Map<String, Object> record = parser.parseToMap(json);
-
+    final List<String> dimensions;
+    if (!this.dimensions.isEmpty()) {
+      dimensions = this.dimensions;
+    } else {
+      dimensions = Lists.newArrayList(
+          Sets.difference(record.keySet(), 
parseSpec.getDimensionsSpec().getDimensionExclusions())
+      );
+    }
     return ImmutableList.of(new MapBasedInputRow(
         parseSpec.getTimestampSpec().extractTimestamp(record),
-        parseSpec.getDimensionsSpec().getDimensionNames(),
+        dimensions,
         record
     ));
   }
diff --git 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
index 19de982b974..83b27349b18 100644
--- 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
+++ 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
@@ -23,6 +23,7 @@
 import org.apache.avro.generic.GenericRecord;
 import org.apache.druid.data.input.avro.AvroParsers;
 import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.java.util.common.parsers.ObjectFlattener;
 
@@ -33,6 +34,7 @@
   private final ParseSpec parseSpec;
   private final boolean fromPigAvroStorage;
   private final ObjectFlattener<GenericRecord> avroFlattener;
+  private final MapInputRowParser mapParser;
 
   @JsonCreator
   public AvroHadoopInputRowParser(
@@ -43,12 +45,13 @@ public AvroHadoopInputRowParser(
     this.parseSpec = parseSpec;
     this.fromPigAvroStorage = fromPigAvroStorage == null ? false : 
fromPigAvroStorage;
     this.avroFlattener = AvroParsers.makeFlattener(parseSpec, 
this.fromPigAvroStorage, false);
+    this.mapParser = new MapInputRowParser(parseSpec);
   }
 
   @Override
   public List<InputRow> parseBatch(GenericRecord record)
   {
-    return AvroParsers.parseGenericRecord(record, parseSpec, avroFlattener);
+    return AvroParsers.parseGenericRecord(record, mapParser, avroFlattener);
   }
 
   @JsonProperty
diff --git 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
index ab5c2d36547..749970f6505 100644
--- 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
+++ 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
@@ -24,6 +24,7 @@
 import org.apache.avro.generic.GenericRecord;
 import org.apache.druid.data.input.avro.AvroBytesDecoder;
 import org.apache.druid.data.input.avro.AvroParsers;
+import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.java.util.common.parsers.ObjectFlattener;
 
@@ -36,6 +37,7 @@
   private final ParseSpec parseSpec;
   private final AvroBytesDecoder avroBytesDecoder;
   private final ObjectFlattener<GenericRecord> avroFlattener;
+  private final MapInputRowParser mapParser;
 
   @JsonCreator
   public AvroStreamInputRowParser(
@@ -46,12 +48,13 @@ public AvroStreamInputRowParser(
     this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec");
     this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, 
"avroBytesDecoder");
     this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false);
+    this.mapParser = new MapInputRowParser(parseSpec);
   }
 
   @Override
   public List<InputRow> parseBatch(ByteBuffer input)
   {
-    return AvroParsers.parseGenericRecord(avroBytesDecoder.parse(input), 
parseSpec, avroFlattener);
+    return AvroParsers.parseGenericRecord(avroBytesDecoder.parse(input), 
mapParser, avroFlattener);
   }
 
   @JsonProperty
diff --git 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
index 1adb4bf827e..92ea3ae1bda 100644
--- 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
+++ 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
@@ -54,10 +54,10 @@ private AvroParsers()
 
   public static List<InputRow> parseGenericRecord(
       GenericRecord record,
-      ParseSpec parseSpec,
+      MapInputRowParser mapParser,
       ObjectFlattener<GenericRecord> avroFlattener
   )
   {
-    return new 
MapInputRowParser(parseSpec).parseBatch(avroFlattener.flatten(record));
+    return mapParser.parseBatch(avroFlattener.flatten(record));
   }
 }
diff --git 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
index cdf1f85c9a5..330d9e11f37 100755
--- 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
+++ 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
@@ -21,6 +21,7 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
@@ -40,7 +41,6 @@
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -51,7 +51,7 @@
   private final boolean binaryAsString;
   private final TimestampSpec timestampSpec;
   private final ObjectFlattener<GenericRecord> recordFlattener;
-
+  private final List<String> dimensions;
 
   @JsonCreator
   public ParquetAvroHadoopInputRowParser(
@@ -61,6 +61,7 @@ public ParquetAvroHadoopInputRowParser(
   {
     this.parseSpec = parseSpec;
     this.timestampSpec = parseSpec.getTimestampSpec();
+    this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
     this.binaryAsString = binaryAsString == null ? false : binaryAsString;
 
     final JSONPathSpec flattenSpec;
@@ -95,15 +96,14 @@ private LogicalType 
determineTimestampSpecLogicalType(Schema schema, String time
   {
     Map<String, Object> row = recordFlattener.flatten(record);
 
-    final List<String> dimensions = 
parseSpec.getDimensionsSpec().hasCustomDimensions()
-                                    ? 
parseSpec.getDimensionsSpec().getDimensionNames()
-                                    : new ArrayList(
-                                        Sets.difference(
-                                            row.keySet(),
-                                            parseSpec.getDimensionsSpec()
-                                                     .getDimensionExclusions()
-                                        )
-                                    );
+    final List<String> dimensions;
+    if (!this.dimensions.isEmpty()) {
+      dimensions = this.dimensions;
+    } else {
+      dimensions = Lists.newArrayList(
+          Sets.difference(row.keySet(), 
parseSpec.getDimensionsSpec().getDimensionExclusions())
+      );
+    }
     // check for parquet Date
     // 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
     LogicalType logicalType = 
determineTimestampSpecLogicalType(record.getSchema(), 
timestampSpec.getTimestampColumn());
diff --git 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java
 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java
index e47adeeeaa8..6fb2d5a11a0 100644
--- 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java
+++ 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java
@@ -23,6 +23,8 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.github.os72.protobuf.dynamic.DynamicSchema;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Descriptors.Descriptor;
@@ -53,7 +55,7 @@
   private final String protoMessageType;
   private final Descriptor descriptor;
   private Parser<String, Object> parser;
-
+  private final List<String> dimensions;
 
   @JsonCreator
   public ProtobufInputRowParser(
@@ -66,6 +68,7 @@ public ProtobufInputRowParser(
     this.descriptorFilePath = descriptorFilePath;
     this.protoMessageType = protoMessageType;
     this.descriptor = getDescriptor(descriptorFilePath);
+    this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
   }
 
   @Override
@@ -98,9 +101,17 @@ public ProtobufInputRowParser withParseSpec(ParseSpec 
parseSpec)
     }
 
     Map<String, Object> record = parser.parseToMap(json);
+    final List<String> dimensions;
+    if (!this.dimensions.isEmpty()) {
+      dimensions = this.dimensions;
+    } else {
+      dimensions = Lists.newArrayList(
+          Sets.difference(record.keySet(), 
parseSpec.getDimensionsSpec().getDimensionExclusions())
+      );
+    }
     return ImmutableList.of(new MapBasedInputRow(
         parseSpec.getTimestampSpec().extractTimestamp(record),
-        parseSpec.getDimensionsSpec().getDimensionNames(),
+        dimensions,
         record
     ));
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to