This is an automated email from the ASF dual-hosted git repository.

austin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aebd8780724 [YAML] - Pick file descriptor based on messageName (#30314)
aebd8780724 is described below

commit aebd878072455d40d1bd1b302e0fc60d2f9df573
Author: Ferran Fernández Garrido <ffernandez....@gmail.com>
AuthorDate: Thu Feb 15 16:58:44 2024 +0100

    [YAML] - Pick file descriptor based on messageName (#30314)
---
 .../sdk/extensions/protobuf/ProtoByteUtils.java    | 35 ++++++++++++++++++----
 1 file changed, 29 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java
 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java
index dd73739246d..6d048a088b7 100644
--- 
a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java
+++ 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java
@@ -36,6 +36,7 @@ import java.io.InputStream;
 import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.MatchResult;
@@ -72,7 +73,7 @@ public class ProtoByteUtils {
    * @return The Beam Schema representing the Protocol Buffer message.
    */
   public static Schema getBeamSchemaFromProto(String fileDescriptorPath, 
String messageName) {
-    ProtoSchemaInfo dpd = getProtoDomain(fileDescriptorPath);
+    ProtoSchemaInfo dpd = getProtoDomain(fileDescriptorPath, messageName);
     ProtoDomain protoDomain = dpd.getProtoDomain();
     return ProtoDynamicMessageSchema.forDescriptor(protoDomain, 
messageName).getSchema();
   }
@@ -146,7 +147,7 @@ public class ProtoByteUtils {
   public static SerializableFunction<byte[], Row> getProtoBytesToRowFunction(
       String fileDescriptorPath, String messageName) {
 
-    ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath);
+    ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath, 
messageName);
     ProtoDomain protoDomain = dynamicProtoDomain.getProtoDomain();
     @SuppressWarnings("unchecked")
     ProtoDynamicMessageSchema<DynamicMessage> protoDynamicMessageSchema =
@@ -192,7 +193,7 @@ public class ProtoByteUtils {
 
   public static SerializableFunction<Row, byte[]> getRowToProtoBytes(
       String fileDescriptorPath, String messageName) {
-    ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath);
+    ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath, 
messageName);
     ProtoDomain protoDomain = dynamicProtoDomain.getProtoDomain();
     @SuppressWarnings("unchecked")
     ProtoDynamicMessageSchema<DynamicMessage> protoDynamicMessageSchema =
@@ -213,16 +214,38 @@ public class ProtoByteUtils {
    * file.
    *
    * @param fileDescriptorPath The path to the File Descriptor Set file.
+   * @param messageName The name of the message type for which the descriptor 
is desired.
    * @return ProtoSchemaInfo containing the associated ProtoDomain and File 
Name.
    * @throws RuntimeException if an error occurs during schema retrieval.
    */
-  private static ProtoSchemaInfo getProtoDomain(String fileDescriptorPath) {
+  private static ProtoSchemaInfo getProtoDomain(String fileDescriptorPath, 
String messageName) {
     byte[] from = getFileAsBytes(fileDescriptorPath);
     try {
+      List<String> messageElements = Splitter.on('.').splitToList(messageName);
+      String messageTypeByName = messageElements.get(messageElements.size() - 
1);
+
       DescriptorProtos.FileDescriptorSet descriptorSet =
           DescriptorProtos.FileDescriptorSet.parseFrom(from);
-      return new ProtoSchemaInfo(
-          descriptorSet.getFile(0).getName(), 
ProtoDomain.buildFrom(descriptorSet));
+
+      ProtoDomain protoDomain = ProtoDomain.buildFrom(descriptorSet);
+      List<String> fileProtoNames = new ArrayList<>();
+
+      descriptorSet
+          .getFileList()
+          .forEach(fileDescriptorProto -> 
fileProtoNames.add(fileDescriptorProto.getName()));
+
+      String fullName =
+          fileProtoNames.stream()
+              .filter(
+                  name ->
+                      
protoDomain.getFileDescriptor(name).findMessageTypeByName(messageTypeByName)
+                          != null)
+              .findFirst()
+              .orElseThrow(
+                  () ->
+                      new NullPointerException("Couldn't locate the proto for 
that message name"));
+
+      return new ProtoSchemaInfo(fullName, protoDomain);
     } catch (InvalidProtocolBufferException e) {
       throw new RuntimeException(e);
     }

Reply via email to