This is an automated email from the ASF dual-hosted git repository. austin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new aebd8780724 [YAML] - Pick file descriptor based on messageName (#30314) aebd8780724 is described below commit aebd878072455d40d1bd1b302e0fc60d2f9df573 Author: Ferran Fernández Garrido <ffernandez....@gmail.com> AuthorDate: Thu Feb 15 16:58:44 2024 +0100 [YAML] - Pick file descriptor based on messageName (#30314) --- .../sdk/extensions/protobuf/ProtoByteUtils.java | 35 ++++++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java index dd73739246d..6d048a088b7 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java @@ -36,6 +36,7 @@ import java.io.InputStream; import java.io.Serializable; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; +import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.MatchResult; @@ -72,7 +73,7 @@ public class ProtoByteUtils { * @return The Beam Schema representing the Protocol Buffer message. */ public static Schema getBeamSchemaFromProto(String fileDescriptorPath, String messageName) { - ProtoSchemaInfo dpd = getProtoDomain(fileDescriptorPath); + ProtoSchemaInfo dpd = getProtoDomain(fileDescriptorPath, messageName); ProtoDomain protoDomain = dpd.getProtoDomain(); return ProtoDynamicMessageSchema.forDescriptor(protoDomain, messageName).getSchema(); } @@ -146,7 +147,7 @@ public class ProtoByteUtils { public static SerializableFunction<byte[], Row> getProtoBytesToRowFunction( String fileDescriptorPath, String messageName) { - ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath); + ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath, messageName); ProtoDomain protoDomain = dynamicProtoDomain.getProtoDomain(); @SuppressWarnings("unchecked") ProtoDynamicMessageSchema<DynamicMessage> protoDynamicMessageSchema = @@ -192,7 +193,7 @@ public class ProtoByteUtils { public static SerializableFunction<Row, byte[]> getRowToProtoBytes( String fileDescriptorPath, String messageName) { - ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath); + ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath, messageName); ProtoDomain protoDomain = dynamicProtoDomain.getProtoDomain(); @SuppressWarnings("unchecked") ProtoDynamicMessageSchema<DynamicMessage> protoDynamicMessageSchema = @@ -213,16 +214,38 @@ public class ProtoByteUtils { * file. * * @param fileDescriptorPath The path to the File Descriptor Set file. + * @param messageName The name of the message type for which the descriptor is desired. * @return ProtoSchemaInfo containing the associated ProtoDomain and File Name. * @throws RuntimeException if an error occurs during schema retrieval. */ - private static ProtoSchemaInfo getProtoDomain(String fileDescriptorPath) { + private static ProtoSchemaInfo getProtoDomain(String fileDescriptorPath, String messageName) { byte[] from = getFileAsBytes(fileDescriptorPath); try { + List<String> messageElements = Splitter.on('.').splitToList(messageName); + String messageTypeByName = messageElements.get(messageElements.size() - 1); + DescriptorProtos.FileDescriptorSet descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(from); - return new ProtoSchemaInfo( - descriptorSet.getFile(0).getName(), ProtoDomain.buildFrom(descriptorSet)); + + ProtoDomain protoDomain = ProtoDomain.buildFrom(descriptorSet); + List<String> fileProtoNames = new ArrayList<>(); + + descriptorSet + .getFileList() + .forEach(fileDescriptorProto -> fileProtoNames.add(fileDescriptorProto.getName())); + + String fullName = + fileProtoNames.stream() + .filter( + name -> + protoDomain.getFileDescriptor(name).findMessageTypeByName(messageTypeByName) + != null) + .findFirst() + .orElseThrow( + () -> + new NullPointerException("Couldn't locate the proto for that message name")); + + return new ProtoSchemaInfo(fullName, protoDomain); } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); }