This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b8e69c99a Add inline descriptor Protobuf bytes decoder (#13192)
9b8e69c99a is described below

commit 9b8e69c99a410ba10496e375fc8cbb9c84f6d59b
Author: Jonathan Wei <jon-...@users.noreply.github.com>
AuthorDate: Tue Oct 11 13:37:28 2022 -0500

    Add inline descriptor Protobuf bytes decoder (#13192)
    
    * Add inline descriptor Protobuf bytes decoder
    
    * PR comments
    
    * Update tests, check for IllegalArgumentException
    
    * Fix license, add equals test
    
    * Update 
extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoder.java
    
    Co-authored-by: Frank Chen <frankc...@apache.org>
    
    Co-authored-by: Frank Chen <frankc...@apache.org>
---
 docs/ingestion/data-formats.md                     |  20 ++++
 extensions-core/protobuf-extensions/pom.xml        |   5 +
 ...va => DescriptorBasedProtobufBytesDecoder.java} |  80 +++-----------
 .../protobuf/FileBasedProtobufBytesDecoder.java    |  80 +++-----------
 .../InlineDescriptorProtobufBytesDecoder.java      |  95 ++++++++++++++++
 .../data/input/protobuf/ProtobufBytesDecoder.java  |   3 +-
 .../FileBasedProtobufBytesDecoderTest.java         |  20 ++++
 .../InlineDescriptorProtobufBytesDecoderTest.java  | 123 +++++++++++++++++++++
 .../input/protobuf/ProtobufInputFormatTest.java    |  31 +++++-
 website/.spelling                                  |   1 +
 10 files changed, 327 insertions(+), 131 deletions(-)

diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md
index 22c1027647..db4e7f062f 100644
--- a/docs/ingestion/data-formats.md
+++ b/docs/ingestion/data-formats.md
@@ -1308,6 +1308,26 @@ Sample spec:
 }
 ```
 
+#### Inline Descriptor Protobuf Bytes Decoder
+
+This Protobuf bytes decoder allows the user to provide the contents of a 
Protobuf descriptor file inline, encoded as a Base64 string, and then parse it 
to get schema used to decode the Protobuf record from bytes.
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | Set value to `inline`. | yes |
+| descriptorString | String | A compiled Protobuf descriptor, encoded as a 
Base64 string. | yes |
+| protoMessageType | String | Protobuf message type in the descriptor.  Both 
short name and fully qualified name are accepted. The parser uses the first 
message type found in the descriptor if not specified. | no |
+
+Sample spec:
+
+```json
+"protoBytesDecoder": {
+  "type": "inline",
+  "descriptorString": <Contents of a Protobuf descriptor file encoded as 
Base64 string>,
+  "protoMessageType": "Metrics"
+}
+```
+
 ##### Confluent Schema Registry-based Protobuf Bytes Decoder
 
 This Protobuf bytes decoder first extracts a unique `id` from input message 
bytes, and then uses it to look up the schema in the Schema Registry used to 
decode the Avro record from bytes.
diff --git a/extensions-core/protobuf-extensions/pom.xml 
b/extensions-core/protobuf-extensions/pom.xml
index e2e8c4a116..c7b7fc6e8b 100644
--- a/extensions-core/protobuf-extensions/pom.xml
+++ b/extensions-core/protobuf-extensions/pom.xml
@@ -163,6 +163,11 @@
       <version>${project.parent.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>nl.jqno.equalsverifier</groupId>
+      <artifactId>equalsverifier</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java
 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/DescriptorBasedProtobufBytesDecoder.java
similarity index 58%
copy from 
extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java
copy to 
extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/DescriptorBasedProtobufBytesDecoder.java
index ed52f7443b..d4c65c6f99 100644
--- 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java
+++ 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/DescriptorBasedProtobufBytesDecoder.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.data.input.protobuf;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.github.os72.protobuf.dynamic.DynamicSchema;
 import com.google.common.annotations.VisibleForTesting;
@@ -29,52 +28,44 @@ import com.google.protobuf.DynamicMessage;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.ParseException;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.Set;
 
-public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder
+public abstract class DescriptorBasedProtobufBytesDecoder implements 
ProtobufBytesDecoder
 {
-  private final String descriptorFilePath;
-  private final String protoMessageType;
   private Descriptors.Descriptor descriptor;
+  private final String protoMessageType;
 
-
-  @JsonCreator
-  public FileBasedProtobufBytesDecoder(
-      @JsonProperty("descriptor") String descriptorFilePath,
-      @JsonProperty("protoMessageType") String protoMessageType
+  public DescriptorBasedProtobufBytesDecoder(
+      final String protoMessageType
   )
   {
-    this.descriptorFilePath = descriptorFilePath;
     this.protoMessageType = protoMessageType;
-    initDescriptor();
   }
 
   @JsonProperty
-  public String getDescriptor()
+  public String getProtoMessageType()
   {
-    return descriptorFilePath;
+    return protoMessageType;
   }
 
-  @JsonProperty
-  public String getProtoMessageType()
+  public Descriptors.Descriptor getDescriptor()
   {
-    return protoMessageType;
+    return descriptor;
   }
 
   @VisibleForTesting
   void initDescriptor()
   {
     if (this.descriptor == null) {
-      this.descriptor = getDescriptor(descriptorFilePath);
+      final DynamicSchema dynamicSchema = generateDynamicSchema();
+      this.descriptor = generateDescriptor(dynamicSchema);
     }
   }
 
+  protected abstract DynamicSchema generateDynamicSchema();
+
   @Override
   public DynamicMessage parse(ByteBuffer bytes)
   {
@@ -87,44 +78,11 @@ public class FileBasedProtobufBytesDecoder implements 
ProtobufBytesDecoder
     }
   }
 
-  private Descriptors.Descriptor getDescriptor(String descriptorFilePath)
+  private Descriptors.Descriptor generateDescriptor(DynamicSchema 
dynamicSchema)
   {
-    InputStream fin;
-
-    fin = 
this.getClass().getClassLoader().getResourceAsStream(descriptorFilePath);
-    if (fin == null) {
-      URL url;
-      try {
-        url = new URL(descriptorFilePath);
-      }
-      catch (MalformedURLException e) {
-        throw new ParseException(
-            descriptorFilePath,
-            e,
-            "Descriptor not found in class path or malformed URL:" + 
descriptorFilePath
-        );
-      }
-      try {
-        fin = url.openConnection().getInputStream();
-      }
-      catch (IOException e) {
-        throw new ParseException(url.toString(), e, "Cannot read descriptor 
file: " + url);
-      }
-    }
-    DynamicSchema dynamicSchema;
-    try {
-      dynamicSchema = DynamicSchema.parseFrom(fin);
-    }
-    catch (Descriptors.DescriptorValidationException e) {
-      throw new ParseException(null, e, "Invalid descriptor file: " + 
descriptorFilePath);
-    }
-    catch (IOException e) {
-      throw new ParseException(null, e, "Cannot read descriptor file: " + 
descriptorFilePath);
-    }
-
     Set<String> messageTypes = dynamicSchema.getMessageTypes();
     if (messageTypes.size() == 0) {
-      throw new ParseException(null, "No message types found in the 
descriptor: " + descriptorFilePath);
+      throw new ParseException(null, "No message types found in the 
descriptor.");
     }
 
     String messageType = protoMessageType == null ? (String) 
messageTypes.toArray()[0] : protoMessageType;
@@ -151,17 +109,13 @@ public class FileBasedProtobufBytesDecoder implements 
ProtobufBytesDecoder
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
-    FileBasedProtobufBytesDecoder that = (FileBasedProtobufBytesDecoder) o;
-
-    return Objects.equals(descriptorFilePath, that.descriptorFilePath) &&
-        Objects.equals(protoMessageType, that.protoMessageType);
+    DescriptorBasedProtobufBytesDecoder that = 
(DescriptorBasedProtobufBytesDecoder) o;
+    return Objects.equals(getProtoMessageType(), that.getProtoMessageType());
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(descriptorFilePath, protoMessageType);
+    return Objects.hash(getProtoMessageType());
   }
-
 }
diff --git 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java
 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java
index ed52f7443b..d49037ec7f 100644
--- 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java
+++ 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java
@@ -22,27 +22,19 @@ package org.apache.druid.data.input.protobuf;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.github.os72.protobuf.dynamic.DynamicSchema;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
+import com.google.common.base.Preconditions;
 import com.google.protobuf.Descriptors;
-import com.google.protobuf.DynamicMessage;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.ParseException;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.nio.ByteBuffer;
 import java.util.Objects;
-import java.util.Set;
 
-public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder
+public class FileBasedProtobufBytesDecoder extends 
DescriptorBasedProtobufBytesDecoder
 {
   private final String descriptorFilePath;
-  private final String protoMessageType;
-  private Descriptors.Descriptor descriptor;
-
 
   @JsonCreator
   public FileBasedProtobufBytesDecoder(
@@ -50,44 +42,20 @@ public class FileBasedProtobufBytesDecoder implements 
ProtobufBytesDecoder
       @JsonProperty("protoMessageType") String protoMessageType
   )
   {
+    super(protoMessageType);
+    Preconditions.checkNotNull(descriptorFilePath);
     this.descriptorFilePath = descriptorFilePath;
-    this.protoMessageType = protoMessageType;
     initDescriptor();
   }
 
-  @JsonProperty
-  public String getDescriptor()
+  @JsonProperty("descriptor")
+  public String getDescriptorFilePath()
   {
     return descriptorFilePath;
   }
 
-  @JsonProperty
-  public String getProtoMessageType()
-  {
-    return protoMessageType;
-  }
-
-  @VisibleForTesting
-  void initDescriptor()
-  {
-    if (this.descriptor == null) {
-      this.descriptor = getDescriptor(descriptorFilePath);
-    }
-  }
-
   @Override
-  public DynamicMessage parse(ByteBuffer bytes)
-  {
-    try {
-      DynamicMessage message = DynamicMessage.parseFrom(descriptor, 
ByteString.copyFrom(bytes));
-      return message;
-    }
-    catch (Exception e) {
-      throw new ParseException(null, e, "Fail to decode protobuf message!");
-    }
-  }
-
-  private Descriptors.Descriptor getDescriptor(String descriptorFilePath)
+  protected DynamicSchema generateDynamicSchema()
   {
     InputStream fin;
 
@@ -111,9 +79,9 @@ public class FileBasedProtobufBytesDecoder implements 
ProtobufBytesDecoder
         throw new ParseException(url.toString(), e, "Cannot read descriptor 
file: " + url);
       }
     }
-    DynamicSchema dynamicSchema;
+
     try {
-      dynamicSchema = DynamicSchema.parseFrom(fin);
+      return DynamicSchema.parseFrom(fin);
     }
     catch (Descriptors.DescriptorValidationException e) {
       throw new ParseException(null, e, "Invalid descriptor file: " + 
descriptorFilePath);
@@ -121,25 +89,6 @@ public class FileBasedProtobufBytesDecoder implements 
ProtobufBytesDecoder
     catch (IOException e) {
       throw new ParseException(null, e, "Cannot read descriptor file: " + 
descriptorFilePath);
     }
-
-    Set<String> messageTypes = dynamicSchema.getMessageTypes();
-    if (messageTypes.size() == 0) {
-      throw new ParseException(null, "No message types found in the 
descriptor: " + descriptorFilePath);
-    }
-
-    String messageType = protoMessageType == null ? (String) 
messageTypes.toArray()[0] : protoMessageType;
-    Descriptors.Descriptor desc = 
dynamicSchema.getMessageDescriptor(messageType);
-    if (desc == null) {
-      throw new ParseException(
-          null,
-          StringUtils.format(
-              "Protobuf message type %s not found in the specified descriptor. 
 Available messages types are %s",
-              protoMessageType,
-              messageTypes
-          )
-      );
-    }
-    return desc;
   }
 
   @Override
@@ -151,17 +100,16 @@ public class FileBasedProtobufBytesDecoder implements 
ProtobufBytesDecoder
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
+    if (!super.equals(o)) {
+      return false;
+    }
     FileBasedProtobufBytesDecoder that = (FileBasedProtobufBytesDecoder) o;
-
-    return Objects.equals(descriptorFilePath, that.descriptorFilePath) &&
-        Objects.equals(protoMessageType, that.protoMessageType);
+    return Objects.equals(descriptorFilePath, that.descriptorFilePath);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(descriptorFilePath, protoMessageType);
+    return Objects.hash(super.hashCode(), descriptorFilePath);
   }
-
 }
diff --git 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoder.java
 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoder.java
new file mode 100644
index 0000000000..9eb7051b68
--- /dev/null
+++ 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoder.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.protobuf;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.ParseException;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class InlineDescriptorProtobufBytesDecoder extends 
DescriptorBasedProtobufBytesDecoder
+{
+  private final String descriptorString;
+
+  @JsonCreator
+  public InlineDescriptorProtobufBytesDecoder(
+      @JsonProperty("descriptorString") String descriptorString,
+      @JsonProperty("protoMessageType") String protoMessageType
+  )
+  {
+    super(protoMessageType);
+    Preconditions.checkNotNull(descriptorString);
+    this.descriptorString = descriptorString;
+    initDescriptor();
+  }
+
+  @JsonProperty
+  public String getDescriptorString()
+  {
+    return descriptorString;
+  }
+
+  @Override
+  protected DynamicSchema generateDynamicSchema()
+  {
+    try {
+      byte[] decodedDesc = StringUtils.decodeBase64String(descriptorString);
+      return DynamicSchema.parseFrom(decodedDesc);
+    }
+    catch (IllegalArgumentException e) {
+      throw new IAE("Descriptor string does not have valid Base64 encoding.");
+    }
+    catch (Descriptors.DescriptorValidationException e) {
+      throw new ParseException(null, e, "Invalid descriptor string: " + 
descriptorString);
+    }
+    catch (IOException e) {
+      throw new ParseException(null, e, "Cannot read descriptor string: " + 
descriptorString);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    InlineDescriptorProtobufBytesDecoder that = 
(InlineDescriptorProtobufBytesDecoder) o;
+    return Objects.equals(getDescriptorString(), that.getDescriptorString());
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), getDescriptorString());
+  }
+}
diff --git 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java
 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java
index 1defa705ae..42a11755f7 100644
--- 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java
+++ 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java
@@ -28,7 +28,8 @@ import java.nio.ByteBuffer;
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = 
SchemaRegistryBasedProtobufBytesDecoder.class)
 @JsonSubTypes(value = {
     @JsonSubTypes.Type(name = "file", value = 
FileBasedProtobufBytesDecoder.class),
-    @JsonSubTypes.Type(name = "schema_registry", value = 
SchemaRegistryBasedProtobufBytesDecoder.class)
+    @JsonSubTypes.Type(name = "schema_registry", value = 
SchemaRegistryBasedProtobufBytesDecoder.class),
+    @JsonSubTypes.Type(name = "inline", value = 
InlineDescriptorProtobufBytesDecoder.class)
 })
 public interface ProtobufBytesDecoder
 {
diff --git 
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java
 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java
index 18db5f12a3..c01575300c 100644
--- 
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java
+++ 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.data.input.protobuf;
 
+import com.google.protobuf.Descriptors;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.junit.Rule;
 import org.junit.Test;
@@ -73,4 +75,22 @@ public class FileBasedProtobufBytesDecoderTest
     FileBasedProtobufBytesDecoder decoder = new 
FileBasedProtobufBytesDecoder("prototest.desc", null);
     decoder.initDescriptor();
   }
+
+  @Test
+  public void testEquals()
+  {
+    FileBasedProtobufBytesDecoder decoder = new 
FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent");
+    decoder.initDescriptor();
+    Descriptors.Descriptor descriptorA = decoder.getDescriptor();
+
+    decoder = new FileBasedProtobufBytesDecoder("prototest.desc", 
"ProtoTestEvent.Foo");
+    decoder.initDescriptor();
+    Descriptors.Descriptor descriptorB = decoder.getDescriptor();
+
+    EqualsVerifier.forClass(FileBasedProtobufBytesDecoder.class)
+                  .usingGetClass()
+                  .withIgnoredFields("descriptor")
+                  .withPrefabValues(Descriptors.Descriptor.class, descriptorA, 
descriptorB)
+                  .verify();
+  }
 }
diff --git 
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoderTest.java
 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoderTest.java
new file mode 100644
index 0000000000..a6fd8fffa5
--- /dev/null
+++ 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoderTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.protobuf;
+
+import com.google.common.io.Files;
+import com.google.protobuf.Descriptors;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+
+public class InlineDescriptorProtobufBytesDecoderTest
+{
+  private String descString;
+
+  @Before
+  public void initDescriptorString() throws Exception
+  {
+    File descFile = new File(this.getClass()
+                                 .getClassLoader()
+                                 .getResource("prototest.desc")
+                                 .toURI());
+    descString = StringUtils.encodeBase64String(Files.toByteArray(descFile));
+  }
+
+  @Test
+  public void testShortMessageType()
+  {
+    @SuppressWarnings("unused") // expected to create parser without exception
+    InlineDescriptorProtobufBytesDecoder decoder = new 
InlineDescriptorProtobufBytesDecoder(
+        descString,
+        "ProtoTestEvent"
+    );
+    decoder.initDescriptor();
+  }
+
+  @Test
+  public void testLongMessageType()
+  {
+    @SuppressWarnings("unused") // expected to create parser without exception
+    InlineDescriptorProtobufBytesDecoder decoder = new 
InlineDescriptorProtobufBytesDecoder(
+        descString,
+        "prototest.ProtoTestEvent"
+    );
+    decoder.initDescriptor();
+  }
+
+  @Test(expected = ParseException.class)
+  public void testBadProto()
+  {
+    @SuppressWarnings("unused") // expected exception
+    InlineDescriptorProtobufBytesDecoder decoder = new 
InlineDescriptorProtobufBytesDecoder(descString, "BadName");
+    decoder.initDescriptor();
+  }
+
+  @Test(expected = IAE.class)
+  public void testMalformedDescriptorBase64()
+  {
+    @SuppressWarnings("unused") // expected exception
+    InlineDescriptorProtobufBytesDecoder decoder = new 
InlineDescriptorProtobufBytesDecoder("invalidString", "BadName");
+    decoder.initDescriptor();
+  }
+
+  @Test(expected = ParseException.class)
+  public void testMalformedDescriptorValidBase64InvalidDescriptor()
+  {
+    @SuppressWarnings("unused") // expected exception
+    InlineDescriptorProtobufBytesDecoder decoder = new 
InlineDescriptorProtobufBytesDecoder(
+        "aGVsbG8gd29ybGQ=",
+        "BadName"
+    );
+    decoder.initDescriptor();
+  }
+
+  @Test
+  public void testSingleDescriptorNoMessageType()
+  {
+    // For the backward compatibility, protoMessageType allows null when the 
desc file has only one message type.
+    @SuppressWarnings("unused") // expected to create parser without exception
+    InlineDescriptorProtobufBytesDecoder decoder = new 
InlineDescriptorProtobufBytesDecoder(descString, null);
+    decoder.initDescriptor();
+  }
+
+  @Test
+  public void testEquals()
+  {
+    InlineDescriptorProtobufBytesDecoder decoder = new 
InlineDescriptorProtobufBytesDecoder(descString, "ProtoTestEvent");
+    decoder.initDescriptor();
+    Descriptors.Descriptor descriptorA = decoder.getDescriptor();
+
+    decoder = new InlineDescriptorProtobufBytesDecoder(descString, 
"ProtoTestEvent.Foo");
+    decoder.initDescriptor();
+    Descriptors.Descriptor descriptorB = decoder.getDescriptor();
+
+    EqualsVerifier.forClass(InlineDescriptorProtobufBytesDecoder.class)
+                  .usingGetClass()
+                  .withIgnoredFields("descriptor")
+                  .withPrefabValues(Descriptors.Descriptor.class, descriptorA, 
descriptorB)
+                  .verify();
+  }
+
+}
diff --git 
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
index 44e9af9845..219fb23aac 100644
--- 
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
+++ 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
+import com.google.common.io.Files;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.impl.ByteEntity;
@@ -31,6 +32,7 @@ import org.apache.druid.data.input.impl.NestedInputFormat;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
 import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@@ -42,6 +44,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.io.File;
 import java.io.IOException;
 
 public class ProtobufInputFormatTest
@@ -53,11 +56,12 @@ public class ProtobufInputFormatTest
   private DimensionsSpec dimensionsSpec;
   private JSONPathSpec flattenSpec;
   private FileBasedProtobufBytesDecoder decoder;
+  private InlineDescriptorProtobufBytesDecoder inlineSchemaDecoder;
 
   private final ObjectMapper jsonMapper = new DefaultObjectMapper();
 
   @Before
-  public void setUp()
+  public void setUp() throws Exception
   {
     timestampSpec = new TimestampSpec("timestamp", "iso", null);
     dimensionsSpec = new DimensionsSpec(Lists.newArrayList(
@@ -75,6 +79,14 @@ public class ProtobufInputFormatTest
         )
     );
     decoder = new FileBasedProtobufBytesDecoder("prototest.desc", 
"ProtoTestEvent");
+
+    File descFile = new File(this.getClass()
+                                 .getClassLoader()
+                                 .getResource("prototest.desc")
+                                 .toURI());
+    String descString = 
StringUtils.encodeBase64String(Files.toByteArray(descFile));
+    inlineSchemaDecoder = new InlineDescriptorProtobufBytesDecoder(descString, 
"ProtoTestEvent");
+
     for (Module jacksonModule : new 
ProtobufExtensionsModule().getJacksonModules()) {
       jsonMapper.registerModule(jacksonModule);
     }
@@ -145,4 +157,21 @@ public class ProtobufInputFormatTest
 
     ProtobufInputRowParserTest.verifyFlatData(row, dateTime);
   }
+
+  @Test
+  public void testParseNestedDataWithInlineSchema() throws Exception
+  {
+    //configure parser with inline schema decoder
+    ProtobufInputFormat protobufInputFormat = new 
ProtobufInputFormat(flattenSpec, inlineSchemaDecoder);
+
+    //create binary of proto test event
+    DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, 
ISOChronology.getInstanceUTC());
+    ProtoTestEventWrapper.ProtoTestEvent event = 
ProtobufInputRowParserTest.buildNestedData(dateTime);
+
+    final ByteEntity entity = new 
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
+
+    InputRow row = protobufInputFormat.createReader(new 
InputRowSchema(timestampSpec, dimensionsSpec, null), entity, 
null).read().next();
+
+    ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
+  }
 }
diff --git a/website/.spelling b/website/.spelling
index 0b13916f03..afa6c881b2 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1233,6 +1233,7 @@ column_1
 column_n
 com.opencsv
 ctrl
+descriptorString
 headerFormat
 headerLabelPrefix
 jsonLowercase


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to