This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c87fb8ab7e [INLONG-12108][SDK] TransformSDK supports decoding and 
transformation of ProtoBuf description files with multiple Proto files and 
multi-level NestedType (#12110)
c87fb8ab7e is described below

commit c87fb8ab7e7bbb8137dc7c8c2b759208d8f5abcb
Author: ChunLiang Lu <[email protected]>
AuthorDate: Fri Apr 3 17:16:58 2026 +0800

    [INLONG-12108][SDK] TransformSDK supports decoding and transformation of 
ProtoBuf description files with multiple Proto files and multi-level NestedType 
(#12110)
    
    * [INLONG-12108][SDK] TransformSDK supports decoding and transformation of 
ProtoBuf description files with multiple Proto files and multi-level NestedType
    
    * support that root message type has package name
---
 .../sdk/transform/decode/PbSourceDecoder.java      | 147 ++++++++++++++++++++-
 1 file changed, 145 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index 43db490232..cc4a6a8e99 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -21,6 +21,7 @@ import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
 import org.apache.inlong.sdk.transform.process.Context;
 
 import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Descriptors.DescriptorValidationException;
 import com.google.protobuf.DynamicMessage;
@@ -29,10 +30,13 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.Base64;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -71,10 +75,13 @@ public class PbSourceDecoder extends SourceDecoder<String> {
             // parse description
             byte[] protoBytes = Base64.getDecoder().decode(protoDescription);
             DescriptorProtos.FileDescriptorSet descriptorSet = 
DescriptorProtos.FileDescriptorSet.parseFrom(protoBytes);
-            DescriptorProtos.FileDescriptorProto fileDesc = 
descriptorSet.getFile(0);
+            // format proto
+            DescriptorProtos.FileDescriptorProto fileDesc = 
formatFileDescription(descriptorSet, rootMessageType);
             Descriptors.FileDescriptor fileDescriptor = 
Descriptors.FileDescriptor.buildFrom(fileDesc,
                     new Descriptors.FileDescriptor[]{});
-            this.rootDesc = 
fileDescriptor.findMessageTypeByName(rootMessageType);
+            // find root
+            String fullRootMessageType = 
formatTypeName(findRootType(descriptorSet, rootMessageType));
+            this.rootDesc = 
fileDescriptor.findMessageTypeByName(fullRootMessageType);
             // child
             this.rowsNodePath = sourceInfo.getRowsNodePath();
             this.childNodes = PbNode.parseNodePath(rootDesc, rowsNodePath);
@@ -156,4 +163,140 @@ public class PbSourceDecoder extends 
SourceDecoder<String> {
         byte[] srcBytes = Base64.getDecoder().decode(input);
         return decode(srcBytes, context);
     }
+
+    private static DescriptorProtos.FileDescriptorProto formatFileDescription(
+            DescriptorProtos.FileDescriptorSet descriptorSet,
+            String rootMessageType) throws DescriptorValidationException, 
IOException {
+        // load
+        Map<String, DescriptorProtos.DescriptorProto> messageTypeMap = new 
ConcurrentHashMap<>();
+        Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap = new 
ConcurrentHashMap<>();
+        loadTypeFromFileSet(descriptorSet, messageTypeMap, enumTypeMap);
+        // find root type
+        String rootTypeName = findRootType(descriptorSet, rootMessageType);
+        // build root type
+        DescriptorProtos.FileDescriptorProto.Builder newFileBuilder = 
DescriptorProtos.FileDescriptorProto.newBuilder();
+        Set<String> addedTypeNames = new HashSet<>();
+        formatMessageTypeDescription(rootTypeName, newFileBuilder, 
messageTypeMap, enumTypeMap, addedTypeNames);
+        // DescriptorProtos.FileDescriptorSet newDescriptorSet = 
DescriptorProtos.FileDescriptorSet.newBuilder()
+        // .addFile(newFileBuilder).build();
+        // byte[] newProtoBytes = newDescriptorSet.toByteArray();
+        return newFileBuilder.build();
+    }
+
+    private static void formatMessageTypeDescription(String typeName,
+            DescriptorProtos.FileDescriptorProto.Builder newFileBuilder,
+            Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
+            Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap,
+            Set<String> addedTypeNames) {
+        DescriptorProtos.DescriptorProto typeDesc = 
messageTypeMap.get(typeName);
+        if (typeDesc == null) {
+            return;
+        }
+        DescriptorProtos.DescriptorProto.Builder typeBuilder = 
DescriptorProtos.DescriptorProto.newBuilder();
+        String newTypeName = formatTypeName(typeName);
+        typeBuilder.setName(newTypeName);
+        for (DescriptorProtos.FieldDescriptorProto fieldDesc : 
typeDesc.getFieldList()) {
+            DescriptorProtos.FieldDescriptorProto.Builder fieldBuilder = 
DescriptorProtos.FieldDescriptorProto
+                    .newBuilder().mergeFrom(fieldDesc);
+            if 
(fieldDesc.getType().equals(FieldDescriptorProto.Type.TYPE_MESSAGE)) {
+                String fieldTypeName = fieldDesc.getTypeName();
+                String newFieldTypeName = formatTypeName(fieldTypeName);
+                fieldBuilder.setTypeName(newFieldTypeName);
+                if (!addedTypeNames.contains(newFieldTypeName)) {
+                    addedTypeNames.add(newFieldTypeName);
+                    formatMessageTypeDescription(fieldTypeName, 
newFileBuilder, messageTypeMap, enumTypeMap,
+                            addedTypeNames);
+                }
+            } else if 
(fieldDesc.getType().equals(FieldDescriptorProto.Type.TYPE_ENUM)) {
+                String fieldTypeName = fieldDesc.getTypeName();
+                String newFieldTypeName = formatTypeName(fieldTypeName);
+                fieldBuilder.setTypeName(newFieldTypeName);
+                if (!addedTypeNames.contains(newFieldTypeName)) {
+                    addedTypeNames.add(newFieldTypeName);
+                    formatEnumTypeDescription(fieldTypeName, newFileBuilder, 
enumTypeMap, addedTypeNames);
+                }
+            }
+            typeBuilder.addField(fieldBuilder);
+        }
+        newFileBuilder.addMessageType(typeBuilder.build());
+    }
+
+    private static void formatEnumTypeDescription(String typeName,
+            DescriptorProtos.FileDescriptorProto.Builder newFileBuilder,
+            Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap,
+            Set<String> addedTypeNames) {
+        DescriptorProtos.EnumDescriptorProto typeDesc = 
enumTypeMap.get(typeName);
+        if (typeDesc == null) {
+            return;
+        }
+        DescriptorProtos.EnumDescriptorProto.Builder typeBuilder = 
DescriptorProtos.EnumDescriptorProto.newBuilder()
+                .mergeFrom(typeDesc);
+        String newTypeName = formatTypeName(typeName);
+        typeBuilder.setName(newTypeName);
+        newFileBuilder.addEnumType(typeBuilder.build());
+    }
+
+    private static String formatTypeName(String typeName) {
+        return typeName.substring(1).replace('.', '_');
+    }
+
+    private static String findRootType(DescriptorProtos.FileDescriptorSet 
descriptorSet,
+            String rootMessageType) {
+        for (DescriptorProtos.FileDescriptorProto fileDesc : 
descriptorSet.getFileList()) {
+            String packageName = fileDesc.getPackage();
+            String packagePrefix = "";
+            if (!StringUtils.isBlank(packageName)) {
+                packagePrefix = "." + packageName;
+            }
+            // message type
+            for (DescriptorProtos.DescriptorProto typeDesc : 
fileDesc.getMessageTypeList()) {
+                if (StringUtils.equals(typeDesc.getName(), rootMessageType)) {
+                    String fullTypeName = packagePrefix + "." + 
typeDesc.getName();
+                    return fullTypeName;
+                }
+            }
+        }
+        return null;
+    }
+
+    private static void loadTypeFromFileSet(DescriptorProtos.FileDescriptorSet 
descriptorSet,
+            Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
+            Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap) {
+        for (DescriptorProtos.FileDescriptorProto fileDesc : 
descriptorSet.getFileList()) {
+            loadTypeFromFile(fileDesc, messageTypeMap, enumTypeMap);
+        }
+    }
+
+    private static void loadTypeFromFile(DescriptorProtos.FileDescriptorProto 
fileDesc,
+            Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
+            Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap) {
+        String packageName = fileDesc.getPackage();
+        String packagePrefix = "";
+        if (!StringUtils.isBlank(packageName)) {
+            packagePrefix = "." + packageName;
+        }
+        for (DescriptorProtos.DescriptorProto typeDesc : 
fileDesc.getMessageTypeList()) {
+            loadTypeFromMessage(fileDesc, packagePrefix, typeDesc, 
messageTypeMap, enumTypeMap);
+        }
+        for (DescriptorProtos.EnumDescriptorProto typeDesc : 
fileDesc.getEnumTypeList()) {
+            String enumTypeName = packagePrefix + "." + typeDesc.getName();
+            enumTypeMap.putIfAbsent(enumTypeName, typeDesc);
+        }
+    }
+
+    private static void 
loadTypeFromMessage(DescriptorProtos.FileDescriptorProto fileDesc,
+            String parentTypeName,
+            DescriptorProtos.DescriptorProto typeDesc,
+            Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
+            Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap) {
+        String fullTypeName = parentTypeName + "." + typeDesc.getName();
+        messageTypeMap.putIfAbsent(fullTypeName, typeDesc);
+        for (DescriptorProtos.DescriptorProto nestedTypeDesc : 
typeDesc.getNestedTypeList()) {
+            loadTypeFromMessage(fileDesc, fullTypeName, nestedTypeDesc, 
messageTypeMap, enumTypeMap);
+        }
+        for (DescriptorProtos.EnumDescriptorProto enumTypeDesc : 
typeDesc.getEnumTypeList()) {
+            String enumTypeName = fullTypeName + "." + enumTypeDesc.getName();
+            enumTypeMap.putIfAbsent(enumTypeName, enumTypeDesc);
+        }
+    }
 }

Reply via email to