This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 95c9a8204c [INLONG-12113][SDK] TransformSDK supports encoding of List, 
Struct, and Binary type fields for RowData (#12114)
95c9a8204c is described below

commit 95c9a8204c67c4f2cfe7b6447feeced27f3b6720
Author: ChunLiang Lu <[email protected]>
AuthorDate: Tue Apr 21 15:09:26 2026 +0800

    [INLONG-12113][SDK] TransformSDK supports encoding of List, Struct, and 
Binary type fields for RowData (#12114)
    
    * [INLONG-12113][SDK] TransformSDK supports encoding of List, Struct, and 
Binary type fields for RowData
    
    * fix UT case
    
    * add UT case
---
 .../apache/inlong/sdk/transform/decode/PbNode.java |   4 +
 .../inlong/sdk/transform/decode/PbSourceData.java  | 164 +++++++++++++------
 .../sdk/transform/encode/CsvSinkEncoder.java       |   4 +-
 .../sdk/transform/encode/DefaultSinkData.java      |   7 +-
 .../inlong/sdk/transform/encode/KvSinkEncoder.java |   4 +-
 .../sdk/transform/encode/MapSinkEncoder.java       |   2 +-
 .../sdk/transform/encode/ParquetSinkEncoder.java   |   2 +-
 .../inlong/sdk/transform/encode/PbSinkEncoder.java |   2 +-
 .../sdk/transform/encode/RowDataSinkEncoder.java   |   3 +-
 .../inlong/sdk/transform/encode/SinkData.java      |   4 +-
 .../inlong/sdk/transform/encode/SinkEncoder.java   |   9 ++
 .../sdk/transform/process/TransformProcessor.java  |   2 +-
 .../process/function/string/ConcatFunction.java    |  12 +-
 .../sdk/transform/utils/FieldToRowDataUtils.java   | 175 +++++++++++++++++----
 .../process/processor/TestPb2RowDataProcessor.java | 118 ++++++++++++++
 15 files changed, 416 insertions(+), 96 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
index 4fe166832b..310f6b1abb 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
@@ -41,6 +41,7 @@ public class PbNode {
     private boolean isArray = false;
     private int arrayIndex = -1;
     private boolean isMap = false;
+    private boolean isMapType = false;
     private String mapKey = "";
     private FieldDescriptor mapKeyDesc;
     private FieldDescriptor mapValueDesc;
@@ -60,6 +61,9 @@ public class PbNode {
                 this.fieldDesc = messageDesc.findFieldByName(name);
                 if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
                     this.messageType = this.fieldDesc.getMessageType();
+                    if (isMapDescriptor(messageType)) {
+                        this.isMapType = true;
+                    }
                 }
             }
         } else {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
index dbeeade6f3..e46fb47d8d 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
@@ -21,13 +21,18 @@ import org.apache.inlong.sdk.transform.process.Context;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
 import com.google.protobuf.DynamicMessage;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -54,7 +59,7 @@ public class PbSourceData extends AbstractSourceData {
 
     private List<DynamicMessage> childRoot;
 
-    private Charset srcCharset;
+    protected Charset srcCharset;
 
     /**
      * Constructor
@@ -105,8 +110,8 @@ public class PbSourceData extends AbstractSourceData {
      * @return
      */
     @Override
-    public String getField(int rowNum, String fieldName) {
-        String fieldValue = "";
+    public Object getField(int rowNum, String fieldName) {
+        Object fieldValue = "";
         try {
             if (isContextField(fieldName)) {
                 return getContextField(fieldName);
@@ -130,7 +135,7 @@ public class PbSourceData extends AbstractSourceData {
      * @param fieldName
      * @return
      */
-    private String getRootField(String srcFieldName) {
+    private Object getRootField(String srcFieldName) {
         List<PbNode> childNodes = this.columnNodeMap.get(srcFieldName);
         if (childNodes == null) {
             String fieldName = srcFieldName.substring(ROOT_KEY.length());
@@ -145,7 +150,7 @@ public class PbSourceData extends AbstractSourceData {
             return "";
         }
         // parse other node
-        String fieldValue = this.getNodeValue(childNodes, root);
+        Object fieldValue = this.getNodeValue(childNodes, root);
         return fieldValue;
     }
 
@@ -155,7 +160,7 @@ public class PbSourceData extends AbstractSourceData {
      * @param srcFieldName
      * @return
      */
-    private String getChildField(int rowNum, String srcFieldName) {
+    private Object getChildField(int rowNum, String srcFieldName) {
         if (this.childRoot == null || this.childDesc == null) {
             return "";
         }
@@ -174,7 +179,7 @@ public class PbSourceData extends AbstractSourceData {
         }
         // parse other node
         DynamicMessage child = childRoot.get(rowNum);
-        String fieldValue = this.getNodeValue(childNodes, child);
+        Object fieldValue = this.getNodeValue(childNodes, child);
         return fieldValue;
     }
 
@@ -184,9 +189,8 @@ public class PbSourceData extends AbstractSourceData {
      * @param root
      * @return
      */
-    @SuppressWarnings("rawtypes")
-    private String getNodeValue(List<PbNode> childNodes, DynamicMessage root) {
-        String fieldValue = "";
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private Object getNodeValue(List<PbNode> childNodes, DynamicMessage root) {
         DynamicMessage current = root;
         for (int i = 0; i < childNodes.size(); i++) {
             PbNode node = childNodes.get(i);
@@ -195,62 +199,120 @@ public class PbSourceData extends AbstractSourceData {
                 // error data
                 break;
             }
-            if (node.isLastNode()) {
-                switch (node.getFieldDesc().getJavaType()) {
-                    case STRING:
-                    case INT:
-                    case LONG:
-                    case FLOAT:
-                    case DOUBLE:
-                    case BOOLEAN:
-                        fieldValue = String.valueOf(nodeValue);
-                        break;
-                    case BYTE_STRING:
-                        ByteString byteString = (ByteString) nodeValue;
-                        fieldValue = new String(byteString.toByteArray(), 
srcCharset);
-                        break;
-                    case ENUM:
-                        fieldValue = String.valueOf(nodeValue);
-                        break;
-                    case MESSAGE:
-                        if (node.isArray()) {
-                            fieldValue = String.valueOf(((List) 
nodeValue).get(node.getArrayIndex()));
-                        } else if (node.isMap()) {
-                            List<DynamicMessage> nodeValueList = 
(List<DynamicMessage>) nodeValue;
-                            for (DynamicMessage subnodeValue : nodeValueList) {
-                                String keyValue = 
String.valueOf(subnodeValue.getField(node.getMapKeyDesc()));
-                                if (StringUtils.equals(keyValue, 
node.getMapKey())) {
-                                    fieldValue = 
String.valueOf(subnodeValue.getField(node.getMapValueDesc()));
-                                    break;
-                                }
-                            }
-                        } else {
-                            fieldValue = String.valueOf(nodeValue);
+            if (!node.isLastNode()) {
+                if (node.isArray()) {
+                    current = (DynamicMessage) ((List) 
nodeValue).get(node.getArrayIndex());
+                } else if (node.isMap()) {
+                    List<DynamicMessage> nodeValueList = 
(List<DynamicMessage>) nodeValue;
+                    DynamicMessage newCurrent = null;
+                    for (DynamicMessage subnodeValue : nodeValueList) {
+                        String keyValue = 
String.valueOf(subnodeValue.getField(node.getMapKeyDesc()));
+                        if (StringUtils.equals(keyValue, node.getMapKey())) {
+                            newCurrent = (DynamicMessage) 
subnodeValue.getField(node.getMapValueDesc());
+                            break;
                         }
-                        break;
+                    }
+                    if (newCurrent == null) {
+                        return null;
+                    }
+                    current = newCurrent;
+                } else {
+                    current = (DynamicMessage) nodeValue;
                 }
-                break;
+                continue;
             }
+            // last node
             if (node.isArray()) {
-                current = (DynamicMessage) ((List) 
nodeValue).get(node.getArrayIndex());
+                return buildStructData(node.getMessageType(), ((List) 
nodeValue).get(node.getArrayIndex()));
             } else if (node.isMap()) {
                 List<DynamicMessage> nodeValueList = (List<DynamicMessage>) 
nodeValue;
-                DynamicMessage newCurrent = null;
+                Object fieldValue = null;
                 for (DynamicMessage subnodeValue : nodeValueList) {
                     String keyValue = 
String.valueOf(subnodeValue.getField(node.getMapKeyDesc()));
                     if (StringUtils.equals(keyValue, node.getMapKey())) {
-                        newCurrent = (DynamicMessage) 
subnodeValue.getField(node.getMapValueDesc());
+                        fieldValue = 
subnodeValue.getField(node.getMapValueDesc());
                         break;
                     }
                 }
-                if (newCurrent == null) {
-                    return fieldValue;
+                return this.buildFieldValue(node.getFieldDesc(), fieldValue, 
false);
+            } else if (node.isMapType()) {
+                return this.buildStructData(node.getMessageType(), nodeValue);
+            } else if (node.getFieldDesc().isRepeated()) {
+                List<Object> valueList = (List) nodeValue;
+                List<Object> result = new ArrayList<>(valueList.size());
+                for (Object value : valueList) {
+                    result.add(this.buildFieldValue(node.getFieldDesc(), 
value, false));
                 }
-                current = newCurrent;
+                return new GenericArrayData(result.toArray());
             } else {
-                current = (DynamicMessage) nodeValue;
+                return this.buildFieldValue(node.getFieldDesc(), nodeValue, 
false);
             }
         }
-        return fieldValue;
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Object buildFieldValue(FieldDescriptor fieldDesc, Object 
nodeValue, boolean isRepeated) {
+        if (nodeValue == null) {
+            return null;
+        }
+        switch (fieldDesc.getJavaType()) {
+            case STRING:
+            case INT:
+            case LONG:
+            case FLOAT:
+            case DOUBLE:
+            case BOOLEAN:
+            case ENUM:
+                return nodeValue;
+            case BYTE_STRING:
+                return ((ByteString) nodeValue).toByteArray();
+            case MESSAGE: {
+                if (!isRepeated) {
+                    return this.buildStructData(fieldDesc.getMessageType(), 
nodeValue);
+                } else if (PbNode.isMapDescriptor(fieldDesc.getMessageType())) 
{
+                    return this.buildStructData(fieldDesc.getMessageType(), 
nodeValue);
+                }
+                List<DynamicMessage> valueList = (List<DynamicMessage>) 
nodeValue;
+                List<Object> result = new ArrayList<>(valueList.size());
+                for (DynamicMessage value : valueList) {
+                    
result.add(this.buildStructData(fieldDesc.getMessageType(), value));
+                }
+                return new GenericArrayData(result.toArray());
+            }
+            default:
+                return String.valueOf(nodeValue);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected Object buildStructData(Descriptors.Descriptor messageType, 
Object nodeValue) {
+        // map
+        if (PbNode.isMapDescriptor(messageType)) {
+            Descriptors.FieldDescriptor keyField = 
messageType.findFieldByNumber(1);
+            Descriptors.FieldDescriptor valueField = 
messageType.findFieldByNumber(2);
+            List<DynamicMessage> subNodeValueList = (List<DynamicMessage>) 
nodeValue;
+            Map<Object, Object> result = new HashMap<>();
+            for (DynamicMessage subnodeValue : subNodeValueList) {
+                Object keyValue = buildFieldValue(keyField, 
subnodeValue.getField(keyField), false);
+                Object valueValue = buildFieldValue(valueField, 
subnodeValue.getField(valueField), false);
+                result.put(keyValue, valueValue);
+            }
+            return new GenericMapData(result);
+        }
+        // struct
+        DynamicMessage msgObj = (DynamicMessage) nodeValue;
+        GenericRowData result = new 
GenericRowData(messageType.getFields().size());
+        int index = 0;
+        for (FieldDescriptor fieldDesc : messageType.getFields()) {
+            Object fieldValue = msgObj.getField(fieldDesc);
+            if (fieldValue == null) {
+                result.setField(index++, null);
+                continue;
+            }
+            Object fieldResult = this.buildFieldValue(fieldDesc, fieldValue, 
false);
+            result.setField(index++, fieldResult);
+        }
+        return result;
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
index 97b43c02b9..6c9c99c9dd 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -63,7 +63,7 @@ public class CsvSinkEncoder extends SinkEncoder<String> {
                 sinkData.keyList().forEach(k -> 
builder.append(sinkData.getField(k)).append(delimiter));
             } else {
                 for (String fieldName : sinkData.keyList()) {
-                    String fieldValue = sinkData.getField(fieldName);
+                    String fieldValue = 
formatFieldValue(sinkData.getField(fieldName));
                     if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
                         builder.append(fieldValue);
                     } else {
@@ -78,7 +78,7 @@ public class CsvSinkEncoder extends SinkEncoder<String> {
             } else {
                 for (FieldInfo field : fields) {
                     String fieldName = field.getName();
-                    String fieldValue = sinkData.getField(fieldName);
+                    String fieldValue = 
formatFieldValue(sinkData.getField(fieldName));
                     EscapeUtils.escapeContent(builder, delimiter, escapeChar, 
fieldValue);
                     builder.append(delimiter);
                 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
index 2b470cdc24..dcc5d84e20 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
@@ -26,13 +26,12 @@ import java.util.Map;
 
 /**
  * DefaultSinkData
- * 
  */
 @Data
 public class DefaultSinkData implements SinkData {
 
     private List<String> keyList = new ArrayList<>();
-    private Map<String, String> currentRow = new HashMap<>();
+    private Map<String, Object> currentRow = new HashMap<>();
 
     /**
      * addField
@@ -40,7 +39,7 @@ public class DefaultSinkData implements SinkData {
      * @param fieldValue
      */
     @Override
-    public void addField(String fieldName, String fieldValue) {
+    public void addField(String fieldName, Object fieldValue) {
         this.keyList.add(fieldName);
         this.currentRow.put(fieldName, fieldValue);
     }
@@ -51,7 +50,7 @@ public class DefaultSinkData implements SinkData {
      * @return
      */
     @Override
-    public String getField(String fieldName) {
+    public Object getField(String fieldName) {
         return this.currentRow.getOrDefault(fieldName, "");
     }
 
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
index 094f4d6884..444c73ff6c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -60,7 +60,7 @@ public class KvSinkEncoder extends SinkEncoder<String> {
         builder.delete(0, builder.length());
         if (fields == null || fields.size() == 0) {
             for (String fieldName : sinkData.keyList()) {
-                String fieldValue = sinkData.getField(fieldName);
+                String fieldValue = 
formatFieldValue(sinkData.getField(fieldName));
                 if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
                     builder.append(fieldValue).append(entryDelimiter);
                 } else {
@@ -70,7 +70,7 @@ public class KvSinkEncoder extends SinkEncoder<String> {
         } else {
             for (FieldInfo field : fields) {
                 String fieldName = field.getName();
-                String fieldValue = sinkData.getField(fieldName);
+                String fieldValue = 
formatFieldValue(sinkData.getField(fieldName));
                 
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
             }
         }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
index c76c4e80ff..cdcc8d3fa7 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
@@ -49,7 +49,7 @@ public class MapSinkEncoder extends SinkEncoder<Map<String, 
Object>> {
         Map<String, Object> esMap = new HashMap<>();
         for (FieldInfo fieldInfo : fields) {
             String fieldName = fieldInfo.getName();
-            String strValue = sinkData.getField(fieldName);
+            String strValue = formatFieldValue(sinkData.getField(fieldName));
             TypeConverter converter = converters.get(fieldName);
             if (converter == null) {
                 esMap.put(fieldName, strValue);
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
index 6d377b061c..bbe2d01863 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
@@ -70,7 +70,7 @@ public class ParquetSinkEncoder extends 
SinkEncoder<ByteArrayOutputStream> {
         Object[] rowsInfo = new Object[size];
         Arrays.fill(rowsInfo, "");
         for (int i = 0; i < size; i++) {
-            String fieldData = sinkData.getField(this.fields.get(i).getName());
+            String fieldData = 
formatFieldValue(sinkData.getField(this.fields.get(i).getName()));
             if (fieldData == null) {
                 continue;
             }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
index 226405c515..e0263558ca 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
@@ -58,7 +58,7 @@ public class PbSinkEncoder extends SinkEncoder<byte[]> {
             for (String key : sinkData.keyList()) {
                 Descriptors.FieldDescriptor fieldDescriptor = 
dynamicDescriptor.findFieldByName(key);
                 if (fieldDescriptor != null) {
-                    String fieldValue = sinkData.getField(key);
+                    String fieldValue = 
formatFieldValue(sinkData.getField(key));
                     if (fieldValue != null) {
                         Object value = convertValue(fieldDescriptor, 
fieldValue);
                         dynamicBuilder.setField(fieldDescriptor, value);
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
index 1cab9f6fe3..507741d5c3 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
@@ -42,10 +42,9 @@ public class RowDataSinkEncoder extends SinkEncoder<RowData> 
{
     @Override
     public RowData encode(SinkData sinkData, Context context) {
         GenericRowData rowData = new 
GenericRowData(fieldToRowDataConverters.length);
-
         for (int i = 0; i < fields.size(); i++) {
             String fieldName = fields.get(i).getName();
-            String fieldValue = sinkData.getField(fieldName);
+            Object fieldValue = sinkData.getField(fieldName);
             rowData.setField(i, 
fieldToRowDataConverters[i].convert(fieldValue));
         }
 
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
index 1ad0c38c68..6137eb1522 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
@@ -25,9 +25,9 @@ import java.util.List;
  */
 public interface SinkData {
 
-    void addField(String fieldName, String fieldValue);
+    void addField(String fieldName, Object fieldValue);
 
-    String getField(String fieldName);
+    Object getField(String fieldName);
 
     List<String> keyList();
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
index bd804cd771..05d2e6a9ad 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -44,4 +44,13 @@ public abstract class SinkEncoder<Output> {
     }
 
     public abstract Output encode(SinkData sinkData, Context context);
+
+    protected String formatFieldValue(Object fieldValue) {
+        if (fieldValue == null) {
+            return null;
+        } else if (fieldValue instanceof byte[]) {
+            return new String((byte[]) fieldValue);
+        }
+        return String.valueOf(fieldValue);
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index e6392bbe2d..7346a205d1 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -227,7 +227,7 @@ public class TransformProcessor<I, O> {
                     if (fieldValue == null) {
                         sinkData.addField(fieldName, "");
                     } else {
-                        sinkData.addField(fieldName, fieldValue.toString());
+                        sinkData.addField(fieldName, fieldValue);
                     }
                 } catch (Throwable t) {
                     sinkData.addField(fieldName, "");
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ConcatFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ConcatFunction.java
index 85eba0567f..3f5faa5710 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ConcatFunction.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ConcatFunction.java
@@ -27,6 +27,7 @@ import 
org.apache.inlong.sdk.transform.process.parser.ValueParser;
 import lombok.extern.slf4j.Slf4j;
 import net.sf.jsqlparser.expression.Expression;
 import net.sf.jsqlparser.expression.Function;
+import org.apache.flink.table.data.StringData;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -64,7 +65,16 @@ public class ConcatFunction implements ValueParser {
     public Object parse(SourceData sourceData, int rowIndex, Context context) {
         StringBuilder builder = new StringBuilder();
         for (ValueParser node : nodeList) {
-            builder.append(node.parse(sourceData, rowIndex, context));
+            Object itemValue = node.parse(sourceData, rowIndex, context);
+            if (itemValue == null) {
+                continue;
+            } else if (itemValue instanceof byte[]) {
+                builder.append(new String((byte[]) itemValue));
+            } else if (itemValue instanceof StringData) {
+                builder.append(((StringData) itemValue).toString());
+            } else {
+                builder.append(String.valueOf(itemValue));
+            }
         }
         return builder.toString();
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
index ff61d912b3..2743921c58 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.sdk.transform.decode.TransformException;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.ArrayType;
@@ -39,6 +40,7 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -72,7 +74,6 @@ public class FieldToRowDataUtils {
         converterMap.put(LogicalTypeRoot.BIGINT, (obj) -> parseLong(obj));
         converterMap.put(LogicalTypeRoot.FLOAT, (obj) -> parseFloat(obj));
         converterMap.put(LogicalTypeRoot.DOUBLE, (obj) -> parseDouble(obj));
-        converterMap.put(LogicalTypeRoot.BINARY, (obj) -> parseBinary(obj));
         converterMap.put(LogicalTypeRoot.VARBINARY, (obj) -> parseBinary(obj));
         converterMap.put(LogicalTypeRoot.CHAR, (obj) -> parseVarchar(obj));
         converterMap.put(LogicalTypeRoot.VARCHAR, (obj) -> parseVarchar(obj));
@@ -82,6 +83,10 @@ public class FieldToRowDataUtils {
         converterMap.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, (obj) -> 
parseTimestampWithLocalTimeZone(obj));
         converterMap.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, (obj) -> 
parseTimestampWithLocalTimeZone(obj));
         converterMap.put(LogicalTypeRoot.DECIMAL, (obj) -> parseDecimal(obj));
+        converterMap.put(LogicalTypeRoot.BINARY, (obj) -> parseBinary(obj));
+        converterMap.put(LogicalTypeRoot.ARRAY, (obj) -> parseArray(obj));
+        converterMap.put(LogicalTypeRoot.MAP, (obj) -> parseMap(obj));
+        converterMap.put(LogicalTypeRoot.ROW, (obj) -> parseRow(obj));
     }
 
     private static final ThreadLocal<Map<String, SimpleDateFormat>> 
formatLocal = new ThreadLocal<>();
@@ -122,8 +127,8 @@ public class FieldToRowDataUtils {
             case ARRAY:
                 return obj -> {
                     final Object[] array = (Object[]) obj;
-                    FieldToRowDataConverter elementConverter =
-                            createFieldRowConverter(((ArrayType) 
fieldType).getElementType());
+                    FieldToRowDataConverter elementConverter = 
createFieldRowConverter(
+                            ((ArrayType) fieldType).getElementType());
                     Object[] converted = Arrays.stream(array)
                             .map(elementConverter::convert)
                             .toArray();
@@ -131,10 +136,9 @@ public class FieldToRowDataUtils {
                 };
             case MAP:
                 return obj -> {
-                    FieldToRowDataConverter keyConverter =
-                            createFieldRowConverter(((MapType) 
fieldType).getKeyType());
-                    FieldToRowDataConverter valueConverter =
-                            createFieldRowConverter(((MapType) 
fieldType).getValueType());
+                    FieldToRowDataConverter keyConverter = 
createFieldRowConverter(((MapType) fieldType).getKeyType());
+                    FieldToRowDataConverter valueConverter = 
createFieldRowConverter(
+                            ((MapType) fieldType).getValueType());
                     Map map = (Map) obj;
                     Map<Object, Object> internalMap = new HashMap<>();
                     for (Object k : map.keySet()) {
@@ -153,7 +157,13 @@ public class FieldToRowDataUtils {
 
     private static Object parseBoolean(Object obj) {
         try {
-            return Boolean.parseBoolean(obj.toString());
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof Boolean) {
+                return obj;
+            }
+            return Boolean.parseBoolean(String.valueOf(obj));
         } catch (RuntimeException e) {
             if (isIgnoreError()) {
                 return null;
@@ -164,7 +174,13 @@ public class FieldToRowDataUtils {
 
     private static Object parseTinyint(Object obj) {
         try {
-            return Byte.parseByte(obj.toString());
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof Byte) {
+                return obj;
+            }
+            return Byte.parseByte(String.valueOf(obj));
         } catch (RuntimeException e) {
             if (isIgnoreError()) {
                 return null;
@@ -175,7 +191,13 @@ public class FieldToRowDataUtils {
 
     private static Object parseSmallint(Object obj) {
         try {
-            return Short.parseShort(obj.toString());
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof Short) {
+                return obj;
+            }
+            return Short.parseShort(String.valueOf(obj));
         } catch (RuntimeException e) {
             if (isIgnoreError()) {
                 return null;
@@ -186,7 +208,13 @@ public class FieldToRowDataUtils {
 
     private static Object parseInteger(Object obj) {
         try {
-            return Integer.parseInt(obj.toString());
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof Integer) {
+                return obj;
+            }
+            return Integer.parseInt(String.valueOf(obj));
         } catch (RuntimeException e) {
             if (isIgnoreError()) {
                 return null;
@@ -197,7 +225,13 @@ public class FieldToRowDataUtils {
 
     private static Object parseLong(Object obj) {
         try {
-            return Long.parseLong(obj.toString());
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof Long) {
+                return obj;
+            }
+            return Long.parseLong(String.valueOf(obj));
         } catch (RuntimeException e) {
             if (isIgnoreError()) {
                 return null;
@@ -208,7 +242,13 @@ public class FieldToRowDataUtils {
 
     private static Object parseFloat(Object obj) {
         try {
-            return Float.parseFloat(obj.toString());
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof Float) {
+                return obj;
+            }
+            return Float.parseFloat(String.valueOf(obj));
         } catch (RuntimeException e) {
             if (isIgnoreError()) {
                 return null;
@@ -219,18 +259,13 @@ public class FieldToRowDataUtils {
 
     private static Object parseDouble(Object obj) {
         try {
-            return Double.parseDouble(obj.toString());
-        } catch (RuntimeException e) {
-            if (isIgnoreError()) {
+            if (obj == null) {
                 return null;
             }
-            throw e;
-        }
-    }
-
-    private static Object parseBinary(Object obj) {
-        try {
-            return obj.toString().getBytes();
+            if (obj instanceof Double) {
+                return obj;
+            }
+            return Double.parseDouble(String.valueOf(obj));
         } catch (RuntimeException e) {
             if (isIgnoreError()) {
                 return null;
@@ -241,7 +276,13 @@ public class FieldToRowDataUtils {
 
     private static Object parseVarchar(Object obj) {
         try {
-            return StringData.fromString((String) obj);
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof byte[]) {
+                return StringData.fromString(new String((byte[]) obj));
+            }
+            return StringData.fromString(String.valueOf(obj));
         } catch (RuntimeException e) {
             if (isIgnoreError()) {
                 return null;
@@ -258,7 +299,7 @@ public class FieldToRowDataUtils {
             if (obj instanceof Date) {
                 return ((Date) obj).toLocalDate().toEpochDay();
             }
-            String strObj = obj.toString();
+            String strObj = String.valueOf(obj);
             Date date = parseDateTime(strObj);
             return date.toLocalDate().toEpochDay();
         } catch (RuntimeException e) {
@@ -305,7 +346,7 @@ public class FieldToRowDataUtils {
             if (obj instanceof Time) {
                 return ((Time) obj).toLocalTime().toSecondOfDay() * 1000;
             }
-            String strObj = obj.toString();
+            String strObj = String.valueOf(obj);
             Date date = parseDateTime(strObj);
             return new Time(date.getTime()).toLocalTime().toSecondOfDay() * 
1000;
         } catch (RuntimeException e) {
@@ -324,7 +365,7 @@ public class FieldToRowDataUtils {
             if (obj instanceof Timestamp) {
                 return TimestampData.fromTimestamp((Timestamp) obj);
             }
-            String strObj = obj.toString();
+            String strObj = String.valueOf(obj);
             Date date = parseDateTime(strObj);
             return TimestampData.fromTimestamp(new Timestamp(date.getTime()));
         } catch (RuntimeException e) {
@@ -346,7 +387,7 @@ public class FieldToRowDataUtils {
                         DecimalType.DEFAULT_PRECISION,
                         DecimalType.DEFAULT_SCALE);
             }
-            String strObj = obj.toString();
+            String strObj = String.valueOf(obj);
             return DecimalData.fromBigDecimal(
                     new BigDecimal(strObj),
                     DecimalType.DEFAULT_PRECISION,
@@ -358,4 +399,82 @@ public class FieldToRowDataUtils {
             throw e;
         }
     }
+
+    private static Object parseBinary(Object obj) {
+        try {
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof byte[]) {
+                return obj;
+            }
+            return String.valueOf(obj).getBytes();
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseArray(Object obj) {
+        try {
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof GenericArrayData) {
+                return obj;
+            }
+            if (obj instanceof List<?>) {
+                return new GenericArrayData(((List<?>) obj).toArray());
+            }
+            return new GenericArrayData(new Object[]{obj});
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseMap(Object obj) {
+        try {
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof GenericMapData) {
+                return obj;
+            }
+            if (obj instanceof Map<?, ?>) {
+                return new GenericMapData((Map<?, ?>) obj);
+            }
+            Map<Object, Object> mapObj = new HashMap<>();
+            mapObj.put(obj, obj);
+            return new GenericMapData(mapObj);
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
+
+    private static Object parseRow(Object obj) {
+        try {
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof GenericRowData) {
+                return obj;
+            }
+            GenericRowData result = new GenericRowData(1);
+            result.setField(0, obj);
+            return result;
+        } catch (RuntimeException e) {
+            if (isIgnoreError()) {
+                return null;
+            }
+            throw e;
+        }
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
new file mode 100644
index 0000000000..a126f8d52d
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.processor;
+
+import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.ArrayFormatInfo;
+import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.BinaryFormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.LongFormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.MapFormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
+import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestPb2RowDataProcessor extends AbstractProcessorTestBase {
+
+    @Test
+    public void testPb2RowData() throws Exception {
+        String transformBase64 = this.getPbTestDescription();
+        PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
+        String[] fieldNames = new String[]{"sid", "packageID", "msgTime",
+                "binaryMsg", "mapExtinfo", "structMsgItem", "listMsgs"};
+        List<FieldInfo> sinkFields = this.getTestFieldList("sid", "packageID", 
"msgTime");
+        // binaryMsg
+        FieldInfo binaryMsg = new FieldInfo("binaryMsg");
+        BinaryFormatInfo binaryMsgFormat = new 
BinaryFormatInfo(Integer.MAX_VALUE);
+        binaryMsg.setFormatInfo(binaryMsgFormat);
+        sinkFields.add(binaryMsg);
+        // mapExtinfo
+        FieldInfo mapExtinfo = new FieldInfo("mapExtinfo");
+        MapFormatInfo mapExtinfoFormat = new MapFormatInfo(new 
StringFormatInfo(), new StringFormatInfo());
+        mapExtinfo.setFormatInfo(mapExtinfoFormat);
+        sinkFields.add(mapExtinfo);
+        // structMsgItem
+        FieldInfo structMsgItem = new FieldInfo("structMsgItem");
+        String[] structMsgItemFields = new String[]{"msg", "msgTime", 
"extinfo"};
+        FormatInfo[] structMsgItemFormats = new FormatInfo[]{
+                new BinaryFormatInfo(Integer.MAX_VALUE),
+                new LongFormatInfo(),
+                new MapFormatInfo(new StringFormatInfo(), new 
StringFormatInfo())
+        };
+        RowFormatInfo structMsgItemFormat = new 
RowFormatInfo(structMsgItemFields, structMsgItemFormats);
+        structMsgItem.setFormatInfo(structMsgItemFormat);
+        sinkFields.add(structMsgItem);
+        // listMsgs
+        FieldInfo listMsgs = new FieldInfo("listMsgs");
+        ArrayFormatInfo listMsgsFormat = new 
ArrayFormatInfo(structMsgItemFormat);
+        listMsgs.setFormatInfo(listMsgsFormat);
+        sinkFields.add(listMsgs);
+        // sink
+        RowDataSinkInfo rowSink = new RowDataSinkInfo("UTF-8", sinkFields);
+        // sql
+        String transformSql = "select $root.sid,$root.packageID,$child.msgTime"
+                + ",$child.msg as binaryMsg,"
+                + "$child.extinfo as mapExtinfo,"
+                + "$root.msgs(1) as structMsgItem,"
+                + "$root.msgs as listMsgs from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1
+        TransformProcessor<String, RowData> processor = TransformProcessor
+                .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+                        SinkEncoderFactory.createRowEncoder(rowSink));
+        byte[] srcBytes = this.getPbTestData();
+        List<RowData> output = processor.transformForBytes(srcBytes, new 
HashMap<>());
+        Assert.assertEquals(2, output.size());
+        // 0
+        Assert.assertEquals(output.get(0).getString(0).toString(), "sid");
+        Assert.assertEquals(output.get(0).getString(1).toString(), "1");
+        Assert.assertEquals(output.get(0).getString(2).toString(), 
"1713243918000");
+        Assert.assertEquals(new String(output.get(0).getBinary(3)), 
"msgValue4");
+        Assert.assertEquals(((GenericMapData) 
output.get(0).getMap(4)).get("key"), "value");
+        Assert.assertEquals(((GenericMapData) 
output.get(0).getMap(4)).get("value"), null);
+        Assert.assertEquals(new String(((GenericRowData) 
output.get(0).getRow(5, 3)).getBinary(0)), "msgValue42");
+        Assert.assertEquals(((GenericRowData) output.get(0).getRow(5, 
3)).getLong(1), 1713243918002L);
+        Assert.assertEquals(((GenericRowData) output.get(0).getRow(5, 
3)).getMap(2).size(), 1);
+        Assert.assertEquals(output.get(0).getArray(6).size(), 2);
+        // 1
+        Assert.assertEquals(output.get(1).getString(0).toString(), "sid");
+        Assert.assertEquals(output.get(1).getString(1).toString(), "1");
+        Assert.assertEquals(output.get(1).getString(2).toString(), 
"1713243918002");
+        Assert.assertEquals(new String(output.get(1).getBinary(3)), 
"msgValue42");
+        Assert.assertEquals(((GenericMapData) 
output.get(1).getMap(4)).get("key2"), "value2");
+        Assert.assertEquals(((GenericMapData) 
output.get(1).getMap(4)).get("value"), null);
+        Assert.assertEquals(new String(((GenericRowData) 
output.get(1).getRow(5, 3)).getBinary(0)), "msgValue42");
+        Assert.assertEquals(((GenericRowData) output.get(1).getRow(5, 
3)).getLong(1), 1713243918002L);
+        Assert.assertEquals(((GenericRowData) output.get(1).getRow(5, 
3)).getMap(2).size(), 1);
+        Assert.assertEquals(output.get(1).getArray(6).size(), 2);
+    }
+}


Reply via email to