This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c87fb8ab7e [INLONG-12108][SDK] TransformSDK supports decoding and
transformation of ProtoBuf description files with multiple Proto files and
multi-level NestedType (#12110)
c87fb8ab7e is described below
commit c87fb8ab7e7bbb8137dc7c8c2b759208d8f5abcb
Author: ChunLiang Lu <[email protected]>
AuthorDate: Fri Apr 3 17:16:58 2026 +0800
[INLONG-12108][SDK] TransformSDK supports decoding and transformation of
ProtoBuf description files with multiple Proto files and multi-level NestedType
(#12110)
* [INLONG-12108][SDK] TransformSDK supports decoding and transformation of
ProtoBuf description files with multiple Proto files and multi-level NestedType
* support that root message type has package name
---
.../sdk/transform/decode/PbSourceDecoder.java | 147 ++++++++++++++++++++-
1 file changed, 145 insertions(+), 2 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index 43db490232..cc4a6a8e99 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -21,6 +21,7 @@ import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
import org.apache.inlong.sdk.transform.process.Context;
import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.DynamicMessage;
@@ -29,10 +30,13 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Base64;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -71,10 +75,13 @@ public class PbSourceDecoder extends SourceDecoder<String> {
// parse description
byte[] protoBytes = Base64.getDecoder().decode(protoDescription);
DescriptorProtos.FileDescriptorSet descriptorSet =
DescriptorProtos.FileDescriptorSet.parseFrom(protoBytes);
- DescriptorProtos.FileDescriptorProto fileDesc =
descriptorSet.getFile(0);
+ // format proto
+ DescriptorProtos.FileDescriptorProto fileDesc =
formatFileDescription(descriptorSet, rootMessageType);
Descriptors.FileDescriptor fileDescriptor =
Descriptors.FileDescriptor.buildFrom(fileDesc,
new Descriptors.FileDescriptor[]{});
- this.rootDesc =
fileDescriptor.findMessageTypeByName(rootMessageType);
+ // find root
+ String fullRootMessageType =
formatTypeName(findRootType(descriptorSet, rootMessageType));
+ this.rootDesc =
fileDescriptor.findMessageTypeByName(fullRootMessageType);
// child
this.rowsNodePath = sourceInfo.getRowsNodePath();
this.childNodes = PbNode.parseNodePath(rootDesc, rowsNodePath);
@@ -156,4 +163,140 @@ public class PbSourceDecoder extends
SourceDecoder<String> {
byte[] srcBytes = Base64.getDecoder().decode(input);
return decode(srcBytes, context);
}
+
+ private static DescriptorProtos.FileDescriptorProto formatFileDescription(
+ DescriptorProtos.FileDescriptorSet descriptorSet,
+ String rootMessageType) throws DescriptorValidationException,
IOException {
+ // load
+ Map<String, DescriptorProtos.DescriptorProto> messageTypeMap = new
ConcurrentHashMap<>();
+ Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap = new
ConcurrentHashMap<>();
+ loadTypeFromFileSet(descriptorSet, messageTypeMap, enumTypeMap);
+ // find root type
+ String rootTypeName = findRootType(descriptorSet, rootMessageType);
+ // build root type
+ DescriptorProtos.FileDescriptorProto.Builder newFileBuilder =
DescriptorProtos.FileDescriptorProto.newBuilder();
+ Set<String> addedTypeNames = new HashSet<>();
+ formatMessageTypeDescription(rootTypeName, newFileBuilder,
messageTypeMap, enumTypeMap, addedTypeNames);
+ // DescriptorProtos.FileDescriptorSet newDescriptorSet =
DescriptorProtos.FileDescriptorSet.newBuilder()
+ // .addFile(newFileBuilder).build();
+ // byte[] newProtoBytes = newDescriptorSet.toByteArray();
+ return newFileBuilder.build();
+ }
+
+ private static void formatMessageTypeDescription(String typeName,
+ DescriptorProtos.FileDescriptorProto.Builder newFileBuilder,
+ Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
+ Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap,
+ Set<String> addedTypeNames) {
+ DescriptorProtos.DescriptorProto typeDesc =
messageTypeMap.get(typeName);
+ if (typeDesc == null) {
+ return;
+ }
+ DescriptorProtos.DescriptorProto.Builder typeBuilder =
DescriptorProtos.DescriptorProto.newBuilder();
+ String newTypeName = formatTypeName(typeName);
+ typeBuilder.setName(newTypeName);
+ for (DescriptorProtos.FieldDescriptorProto fieldDesc :
typeDesc.getFieldList()) {
+ DescriptorProtos.FieldDescriptorProto.Builder fieldBuilder =
DescriptorProtos.FieldDescriptorProto
+ .newBuilder().mergeFrom(fieldDesc);
+ if
(fieldDesc.getType().equals(FieldDescriptorProto.Type.TYPE_MESSAGE)) {
+ String fieldTypeName = fieldDesc.getTypeName();
+ String newFieldTypeName = formatTypeName(fieldTypeName);
+ fieldBuilder.setTypeName(newFieldTypeName);
+ if (!addedTypeNames.contains(newFieldTypeName)) {
+ addedTypeNames.add(newFieldTypeName);
+ formatMessageTypeDescription(fieldTypeName,
newFileBuilder, messageTypeMap, enumTypeMap,
+ addedTypeNames);
+ }
+ } else if
(fieldDesc.getType().equals(FieldDescriptorProto.Type.TYPE_ENUM)) {
+ String fieldTypeName = fieldDesc.getTypeName();
+ String newFieldTypeName = formatTypeName(fieldTypeName);
+ fieldBuilder.setTypeName(newFieldTypeName);
+ if (!addedTypeNames.contains(newFieldTypeName)) {
+ addedTypeNames.add(newFieldTypeName);
+ formatEnumTypeDescription(fieldTypeName, newFileBuilder,
enumTypeMap, addedTypeNames);
+ }
+ }
+ typeBuilder.addField(fieldBuilder);
+ }
+ newFileBuilder.addMessageType(typeBuilder.build());
+ }
+
+ private static void formatEnumTypeDescription(String typeName,
+ DescriptorProtos.FileDescriptorProto.Builder newFileBuilder,
+ Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap,
+ Set<String> addedTypeNames) {
+ DescriptorProtos.EnumDescriptorProto typeDesc =
enumTypeMap.get(typeName);
+ if (typeDesc == null) {
+ return;
+ }
+ DescriptorProtos.EnumDescriptorProto.Builder typeBuilder =
DescriptorProtos.EnumDescriptorProto.newBuilder()
+ .mergeFrom(typeDesc);
+ String newTypeName = formatTypeName(typeName);
+ typeBuilder.setName(newTypeName);
+ newFileBuilder.addEnumType(typeBuilder.build());
+ }
+
+ private static String formatTypeName(String typeName) {
+ return typeName.substring(1).replace('.', '_');
+ }
+
+ private static String findRootType(DescriptorProtos.FileDescriptorSet
descriptorSet,
+ String rootMessageType) {
+ for (DescriptorProtos.FileDescriptorProto fileDesc :
descriptorSet.getFileList()) {
+ String packageName = fileDesc.getPackage();
+ String packagePrefix = "";
+ if (!StringUtils.isBlank(packageName)) {
+ packagePrefix = "." + packageName;
+ }
+ // message type
+ for (DescriptorProtos.DescriptorProto typeDesc :
fileDesc.getMessageTypeList()) {
+ if (StringUtils.equals(typeDesc.getName(), rootMessageType)) {
+ String fullTypeName = packagePrefix + "." +
typeDesc.getName();
+ return fullTypeName;
+ }
+ }
+ }
+ return null;
+ }
+
+ private static void loadTypeFromFileSet(DescriptorProtos.FileDescriptorSet
descriptorSet,
+ Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
+ Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap) {
+ for (DescriptorProtos.FileDescriptorProto fileDesc :
descriptorSet.getFileList()) {
+ loadTypeFromFile(fileDesc, messageTypeMap, enumTypeMap);
+ }
+ }
+
+ private static void loadTypeFromFile(DescriptorProtos.FileDescriptorProto
fileDesc,
+ Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
+ Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap) {
+ String packageName = fileDesc.getPackage();
+ String packagePrefix = "";
+ if (!StringUtils.isBlank(packageName)) {
+ packagePrefix = "." + packageName;
+ }
+ for (DescriptorProtos.DescriptorProto typeDesc :
fileDesc.getMessageTypeList()) {
+ loadTypeFromMessage(fileDesc, packagePrefix, typeDesc,
messageTypeMap, enumTypeMap);
+ }
+ for (DescriptorProtos.EnumDescriptorProto typeDesc :
fileDesc.getEnumTypeList()) {
+ String enumTypeName = packagePrefix + "." + typeDesc.getName();
+ enumTypeMap.putIfAbsent(enumTypeName, typeDesc);
+ }
+ }
+
+ private static void
loadTypeFromMessage(DescriptorProtos.FileDescriptorProto fileDesc,
+ String parentTypeName,
+ DescriptorProtos.DescriptorProto typeDesc,
+ Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
+ Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap) {
+ String fullTypeName = parentTypeName + "." + typeDesc.getName();
+ messageTypeMap.putIfAbsent(fullTypeName, typeDesc);
+ for (DescriptorProtos.DescriptorProto nestedTypeDesc :
typeDesc.getNestedTypeList()) {
+ loadTypeFromMessage(fileDesc, fullTypeName, nestedTypeDesc,
messageTypeMap, enumTypeMap);
+ }
+ for (DescriptorProtos.EnumDescriptorProto enumTypeDesc :
typeDesc.getEnumTypeList()) {
+ String enumTypeName = fullTypeName + "." + enumTypeDesc.getName();
+ enumTypeMap.putIfAbsent(enumTypeName, enumTypeDesc);
+ }
+ }
}