This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 965f9da3e9 [INLONG-12125][SDK] Enhance Transform SDK protobuf 
processing and SQL alias parsing (#12126)
965f9da3e9 is described below

commit 965f9da3e96b7f3319d3559e7193532db3e53fa6
Author: ChunLiang Lu <[email protected]>
AuthorDate: Wed May 27 17:22:57 2026 +0800

    [INLONG-12125][SDK] Enhance Transform SDK protobuf processing and SQL alias 
parsing (#12126)
---
 .../inlong/sdk/transform/decode/PbSourceData.java  | 102 ++++++++++++++++++---
 .../sdk/transform/encode/DefaultSinkData.java      |   2 +-
 .../sdk/transform/process/TransformProcessor.java  |   5 +
 .../sdk/transform/utils/FieldToRowDataUtils.java   |  24 ++---
 .../TestCsv2CsvForErrorOrderProcessor.java         |   2 +-
 .../process/processor/TestPb2RowDataProcessor.java |  11 ++-
 6 files changed, 116 insertions(+), 30 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
index ca63769c84..f5d3569c2c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
@@ -27,6 +27,7 @@ import com.google.protobuf.Descriptors.FieldDescriptor;
 import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
 import com.google.protobuf.DynamicMessage;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
@@ -35,8 +36,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -68,8 +71,8 @@ public class PbSourceData extends AbstractSourceData {
 
     protected Charset srcCharset;
 
-    private Map<DynamicMessage, Map<String, Object>> nodeValueCache = new 
HashMap<>();
-    private Map<DynamicMessage, Map<String, Map<Object, Object>>> mapNodeCache 
= new HashMap<>();
+    private Map<DynamicMessage, Map<String, Object>> nodeValueCache = new 
IdentityHashMap<>();
+    private Map<DynamicMessage, Map<String, Map<Object, Object>>> mapNodeCache 
= new IdentityHashMap<>();
 
     /**
      * Constructor
@@ -158,7 +161,10 @@ public class PbSourceData extends AbstractSourceData {
                     List<Object> valueList = (List) fieldValue;
                     List<Object> result = new ArrayList<>(valueList.size());
                     for (Object value : valueList) {
-                        
result.add(this.buildFieldValue(lastNode.getFieldDesc(), value));
+                        Object itemValue = 
this.buildFieldValue(lastNode.getFieldDesc(), value);
+                        if (itemValue != null) {
+                            result.add(itemValue);
+                        }
                     }
                     return new GenericArrayData(result.toArray());
                 }
@@ -184,12 +190,38 @@ public class PbSourceData extends AbstractSourceData {
         }
         switch (fieldDesc.getJavaType()) {
             case STRING:
+                if (nodeValue instanceof BinaryStringData) {
+                    return nodeValue;
+                }
+                if (nodeValue instanceof String) {
+                    return new BinaryStringData((String) nodeValue);
+                }
+                return new BinaryStringData(String.valueOf(nodeValue));
             case INT:
+                if (nodeValue instanceof Integer) {
+                    return nodeValue;
+                }
+                return NumberUtils.toInt(String.valueOf(nodeValue), 0);
             case LONG:
+                if (nodeValue instanceof Long) {
+                    return nodeValue;
+                }
+                return NumberUtils.toLong(String.valueOf(nodeValue), 0);
             case FLOAT:
+                if (nodeValue instanceof Float) {
+                    return nodeValue;
+                }
+                return NumberUtils.toFloat(String.valueOf(nodeValue), 0);
             case DOUBLE:
+                if (nodeValue instanceof Double) {
+                    return nodeValue;
+                }
+                return NumberUtils.toDouble(String.valueOf(nodeValue), 0);
             case BOOLEAN:
-                return nodeValue;
+                if (nodeValue instanceof Boolean) {
+                    return nodeValue;
+                }
+                return 
Boolean.TRUE.toString().equals(String.valueOf(nodeValue));
             case ENUM:
                 if (nodeValue instanceof EnumValueDescriptor) {
                     EnumValueDescriptor enumDesc = (EnumValueDescriptor) 
nodeValue;
@@ -197,15 +229,17 @@ public class PbSourceData extends AbstractSourceData {
                 }
                 return null;
             case BYTE_STRING:
-                if (nodeValue instanceof ByteString) {
+                if (nodeValue instanceof byte[]) {
+                    return nodeValue;
+                } else if (nodeValue instanceof ByteString) {
                     return ((ByteString) nodeValue).toByteArray();
                 } else {
-                    return nodeValue;
+                    return 
String.valueOf(nodeValue).getBytes(StandardCharsets.ISO_8859_1);
                 }
             case MESSAGE:
                 return this.buildStructData(fieldDesc.getMessageType(), 
nodeValue);
             default:
-                return String.valueOf(nodeValue);
+                return nodeValue;
         }
     }
 
@@ -269,7 +303,9 @@ public class PbSourceData extends AbstractSourceData {
             DynamicMessage subnodeValue = (DynamicMessage) value;
             Object keyValue = buildFieldValue(keyField, 
subnodeValue.getField(keyField));
             Object valueValue = buildFieldValue(valueField, 
subnodeValue.getField(valueField));
-            result.put(keyValue, valueValue);
+            if (keyValue != null && valueValue != null) {
+                result.put(keyValue, valueValue);
+            }
         }
         return new GenericMapData(result);
     }
@@ -307,7 +343,7 @@ public class PbSourceData extends AbstractSourceData {
     }
 
     public Object findFieldNode(int rowNum, String fieldName) {
-        Object fieldValue = "";
+        Object fieldValue = null;
         try {
             if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
                 fieldValue = this.findRootField(fieldName);
@@ -326,7 +362,7 @@ public class PbSourceData extends AbstractSourceData {
                 }
                 // error config
                 if (childNodes.size() == 0) {
-                    return "";
+                    return null;
                 }
                 // parse other node
                 fieldValue = this.findNodeValueByCache(childNodes, root);
@@ -377,7 +413,7 @@ public class PbSourceData extends AbstractSourceData {
 
     private Object findChildField(int rowNum, String srcFieldName) {
         if (this.childRoot == null || this.childDesc == null) {
-            return "";
+            return null;
         }
         List<PbNode> childNodes = this.columnNodeMap.get(srcFieldName);
         if (childNodes == null) {
@@ -390,7 +426,7 @@ public class PbSourceData extends AbstractSourceData {
         }
         // error config
         if (childNodes.size() == 0) {
-            return "";
+            return null;
         }
         // parse other node
         DynamicMessage child = childRoot.get(rowNum);
@@ -423,7 +459,47 @@ public class PbSourceData extends AbstractSourceData {
             subNodeValue = subNodeValueCache.get(node.getCurrentPath());
             if (subNodeValue != null) {
                 if (i == childNodes.size() - 1) {
-                    return subNodeValue;
+                    // primitive
+                    if (node.isPrimitiveType()) {
+                        return subNodeValue;
+                    }
+                    // struct
+                    if (node.isStructType()) {
+                        return subNodeValue;
+                    }
+                    // array
+                    if (node.isArrayType()) {
+                        if (!node.isHasArrayIndex()) {
+                            return subNodeValue;
+                        }
+                        if (!(subNodeValue instanceof List)) {
+                            return null;
+                        }
+                        List<?> nodeValueList = (List<?>) subNodeValue;
+                        if (node.getArrayIndex() >= nodeValueList.size()) {
+                            return null;
+                        }
+                        Object newNode = 
nodeValueList.get(node.getArrayIndex());
+                        if (!(newNode instanceof DynamicMessage)) {
+                            return null;
+                        }
+                        return newNode;
+                    }
+                    // map
+                    if (node.isMapType()) {
+                        if (!node.isHasMapKey()) {
+                            return subNodeValue;
+                        }
+                        final Object mapNodeValue = subNodeValue;
+                        Map<Object, Object> mapCache = 
subMapNodeCache.computeIfAbsent(node.getCurrentPath(),
+                                k -> parseMapNode(mapNodeValue, node));
+                        Object fieldValue = mapCache.get(node.getMapKey());
+                        if (fieldValue == null || !(fieldValue instanceof 
DynamicMessage)) {
+                            return null;
+                        }
+                        return fieldValue;
+                    }
+                    return null;
                 } else {
                     // primitive
                     if (node.isPrimitiveType()) {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
index dcc5d84e20..908641fbee 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
@@ -51,7 +51,7 @@ public class DefaultSinkData implements SinkData {
      */
     @Override
     public Object getField(String fieldName) {
-        return this.currentRow.getOrDefault(fieldName, "");
+        return this.currentRow.get(fieldName);
     }
 
     /**
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 7346a205d1..cc70ee3f95 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -116,6 +116,11 @@ public class TransformProcessor<I, O> {
                         fieldName = exprItem.toString();
                     } else {
                         fieldName = exprItem.getAlias().getName();
+                        // Strip surrounding backticks if present
+                        if (fieldName != null && fieldName.length() >= 2
+                                && fieldName.startsWith("`") && 
fieldName.endsWith("`")) {
+                            fieldName = fieldName.substring(1, 
fieldName.length() - 1);
+                        }
                     }
                     if (!this.checkSelectField(fieldName)) {
                         throw new JSQLParserException(
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
index 2743921c58..01d2263020 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -139,7 +139,7 @@ public class FieldToRowDataUtils {
                     FieldToRowDataConverter keyConverter = 
createFieldRowConverter(((MapType) fieldType).getKeyType());
                     FieldToRowDataConverter valueConverter = 
createFieldRowConverter(
                             ((MapType) fieldType).getValueType());
-                    Map map = (Map) obj;
+                    Map<?, ?> map = (Map<?, ?>) obj;
                     Map<Object, Object> internalMap = new HashMap<>();
                     for (Object k : map.keySet()) {
                         internalMap.put(keyConverter.convert(k),
@@ -279,10 +279,16 @@ public class FieldToRowDataUtils {
             if (obj == null) {
                 return null;
             }
+            if (obj instanceof BinaryStringData) {
+                return obj;
+            }
+            if (obj instanceof String) {
+                return new BinaryStringData((String) obj);
+            }
             if (obj instanceof byte[]) {
-                return StringData.fromString(new String((byte[]) obj));
+                return new BinaryStringData(new String((byte[]) obj));
             }
-            return StringData.fromString(String.valueOf(obj));
+            return new BinaryStringData(String.valueOf(obj));
         } catch (RuntimeException e) {
             if (isIgnoreError()) {
                 return null;
@@ -428,7 +434,7 @@ public class FieldToRowDataUtils {
             if (obj instanceof List<?>) {
                 return new GenericArrayData(((List<?>) obj).toArray());
             }
-            return new GenericArrayData(new Object[]{obj});
+            return null;
         } catch (RuntimeException e) {
             if (isIgnoreError()) {
                 return null;
@@ -448,9 +454,7 @@ public class FieldToRowDataUtils {
             if (obj instanceof Map<?, ?>) {
                 return new GenericMapData((Map<?, ?>) obj);
             }
-            Map<Object, Object> mapObj = new HashMap<>();
-            mapObj.put(obj, obj);
-            return new GenericMapData(mapObj);
+            return null;
         } catch (RuntimeException e) {
             if (isIgnoreError()) {
                 return null;
@@ -467,9 +471,7 @@ public class FieldToRowDataUtils {
             if (obj instanceof GenericRowData) {
                 return obj;
             }
-            GenericRowData result = new GenericRowData(1);
-            result.setField(0, obj);
-            return result;
+            return null;
         } catch (RuntimeException e) {
             if (isIgnoreError()) {
                 return null;
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
index 300b5e3c9c..20c159f9ee 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
@@ -74,7 +74,7 @@ public class TestCsv2CsvForErrorOrderProcessor extends 
AbstractProcessorTestBase
         List<String> output1 = 
processor1.transform("key1=string11&key2=string12&key3=number11&key4=number12",
                 new HashMap<>());
         Assert.assertEquals(1, output1.size());
-        Assert.assertEquals("field1=string11&field2=&field3=number12", 
output1.get(0));
+        Assert.assertEquals("field1=string11&field2=null&field3=number12", 
output1.get(0));
     }
 
     @Test
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
index e6dcb6e44e..88d023bf38 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -98,8 +99,9 @@ public class TestPb2RowDataProcessor extends 
AbstractProcessorTestBase {
         Assert.assertEquals(output.get(0).getString(1).toString(), "1");
         Assert.assertEquals(output.get(0).getString(2).toString(), 
"1713243918000");
         Assert.assertEquals(new String(output.get(0).getBinary(3)), 
"msgValue4");
-        Assert.assertEquals(((GenericMapData) 
output.get(0).getMap(4)).get("key"), "value");
-        Assert.assertEquals(((GenericMapData) 
output.get(0).getMap(4)).get("value"), null);
+        Assert.assertEquals(((GenericMapData) output.get(0).getMap(4)).get(new 
BinaryStringData("key")).toString(),
+                "value");
+        Assert.assertEquals(((GenericMapData) output.get(0).getMap(4)).get(new 
BinaryStringData("value")), null);
         Assert.assertEquals(new String(((GenericRowData) 
output.get(0).getRow(5, 3)).getBinary(0)), "msgValue42");
         Assert.assertEquals(((GenericRowData) output.get(0).getRow(5, 
3)).getLong(1), 1713243918002L);
         Assert.assertEquals(((GenericRowData) output.get(0).getRow(5, 
3)).getMap(2).size(), 1);
@@ -109,8 +111,9 @@ public class TestPb2RowDataProcessor extends 
AbstractProcessorTestBase {
         Assert.assertEquals(output.get(1).getString(1).toString(), "1");
         Assert.assertEquals(output.get(1).getString(2).toString(), 
"1713243918002");
         Assert.assertEquals(new String(output.get(1).getBinary(3)), 
"msgValue42");
-        Assert.assertEquals(((GenericMapData) 
output.get(1).getMap(4)).get("key2"), "value2");
-        Assert.assertEquals(((GenericMapData) 
output.get(1).getMap(4)).get("value"), null);
+        Assert.assertEquals(((GenericMapData) output.get(1).getMap(4)).get(new 
BinaryStringData("key2")).toString(),
+                "value2");
+        Assert.assertEquals(((GenericMapData) output.get(1).getMap(4)).get(new 
BinaryStringData("value")), null);
         Assert.assertEquals(new String(((GenericRowData) 
output.get(1).getRow(5, 3)).getBinary(0)), "msgValue42");
         Assert.assertEquals(((GenericRowData) output.get(1).getRow(5, 
3)).getLong(1), 1713243918002L);
         Assert.assertEquals(((GenericRowData) output.get(1).getRow(5, 
3)).getMap(2).size(), 1);

Reply via email to