This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 965f9da3e9 [INLONG-12125][SDK] Enhance Transform SDK protobuf
processing and SQL alias parsing (#12126)
965f9da3e9 is described below
commit 965f9da3e96b7f3319d3559e7193532db3e53fa6
Author: ChunLiang Lu <[email protected]>
AuthorDate: Wed May 27 17:22:57 2026 +0800
[INLONG-12125][SDK] Enhance Transform SDK protobuf processing and SQL alias
parsing (#12126)
---
.../inlong/sdk/transform/decode/PbSourceData.java | 102 ++++++++++++++++++---
.../sdk/transform/encode/DefaultSinkData.java | 2 +-
.../sdk/transform/process/TransformProcessor.java | 5 +
.../sdk/transform/utils/FieldToRowDataUtils.java | 24 ++---
.../TestCsv2CsvForErrorOrderProcessor.java | 2 +-
.../process/processor/TestPb2RowDataProcessor.java | 11 ++-
6 files changed, 116 insertions(+), 30 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
index ca63769c84..f5d3569c2c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
@@ -27,6 +27,7 @@ import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import com.google.protobuf.DynamicMessage;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
@@ -35,8 +36,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -68,8 +71,8 @@ public class PbSourceData extends AbstractSourceData {
protected Charset srcCharset;
- private Map<DynamicMessage, Map<String, Object>> nodeValueCache = new
HashMap<>();
- private Map<DynamicMessage, Map<String, Map<Object, Object>>> mapNodeCache
= new HashMap<>();
+ private Map<DynamicMessage, Map<String, Object>> nodeValueCache = new
IdentityHashMap<>();
+ private Map<DynamicMessage, Map<String, Map<Object, Object>>> mapNodeCache
= new IdentityHashMap<>();
/**
* Constructor
@@ -158,7 +161,10 @@ public class PbSourceData extends AbstractSourceData {
List<Object> valueList = (List) fieldValue;
List<Object> result = new ArrayList<>(valueList.size());
for (Object value : valueList) {
-
result.add(this.buildFieldValue(lastNode.getFieldDesc(), value));
+ Object itemValue =
this.buildFieldValue(lastNode.getFieldDesc(), value);
+ if (itemValue != null) {
+ result.add(itemValue);
+ }
}
return new GenericArrayData(result.toArray());
}
@@ -184,12 +190,38 @@ public class PbSourceData extends AbstractSourceData {
}
switch (fieldDesc.getJavaType()) {
case STRING:
+ if (nodeValue instanceof BinaryStringData) {
+ return nodeValue;
+ }
+ if (nodeValue instanceof String) {
+ return new BinaryStringData((String) nodeValue);
+ }
+ return new BinaryStringData(String.valueOf(nodeValue));
case INT:
+ if (nodeValue instanceof Integer) {
+ return nodeValue;
+ }
+ return NumberUtils.toInt(String.valueOf(nodeValue), 0);
case LONG:
+ if (nodeValue instanceof Long) {
+ return nodeValue;
+ }
+ return NumberUtils.toLong(String.valueOf(nodeValue), 0);
case FLOAT:
+ if (nodeValue instanceof Float) {
+ return nodeValue;
+ }
+ return NumberUtils.toFloat(String.valueOf(nodeValue), 0);
case DOUBLE:
+ if (nodeValue instanceof Double) {
+ return nodeValue;
+ }
+ return NumberUtils.toDouble(String.valueOf(nodeValue), 0);
case BOOLEAN:
- return nodeValue;
+ if (nodeValue instanceof Boolean) {
+ return nodeValue;
+ }
+ return
Boolean.TRUE.toString().equals(String.valueOf(nodeValue));
case ENUM:
if (nodeValue instanceof EnumValueDescriptor) {
EnumValueDescriptor enumDesc = (EnumValueDescriptor)
nodeValue;
@@ -197,15 +229,17 @@ public class PbSourceData extends AbstractSourceData {
}
return null;
case BYTE_STRING:
- if (nodeValue instanceof ByteString) {
+ if (nodeValue instanceof byte[]) {
+ return nodeValue;
+ } else if (nodeValue instanceof ByteString) {
return ((ByteString) nodeValue).toByteArray();
} else {
- return nodeValue;
+ return
String.valueOf(nodeValue).getBytes(StandardCharsets.ISO_8859_1);
}
case MESSAGE:
return this.buildStructData(fieldDesc.getMessageType(),
nodeValue);
default:
- return String.valueOf(nodeValue);
+ return nodeValue;
}
}
@@ -269,7 +303,9 @@ public class PbSourceData extends AbstractSourceData {
DynamicMessage subnodeValue = (DynamicMessage) value;
Object keyValue = buildFieldValue(keyField,
subnodeValue.getField(keyField));
Object valueValue = buildFieldValue(valueField,
subnodeValue.getField(valueField));
- result.put(keyValue, valueValue);
+ if (keyValue != null && valueValue != null) {
+ result.put(keyValue, valueValue);
+ }
}
return new GenericMapData(result);
}
@@ -307,7 +343,7 @@ public class PbSourceData extends AbstractSourceData {
}
public Object findFieldNode(int rowNum, String fieldName) {
- Object fieldValue = "";
+ Object fieldValue = null;
try {
if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
fieldValue = this.findRootField(fieldName);
@@ -326,7 +362,7 @@ public class PbSourceData extends AbstractSourceData {
}
// error config
if (childNodes.size() == 0) {
- return "";
+ return null;
}
// parse other node
fieldValue = this.findNodeValueByCache(childNodes, root);
@@ -377,7 +413,7 @@ public class PbSourceData extends AbstractSourceData {
private Object findChildField(int rowNum, String srcFieldName) {
if (this.childRoot == null || this.childDesc == null) {
- return "";
+ return null;
}
List<PbNode> childNodes = this.columnNodeMap.get(srcFieldName);
if (childNodes == null) {
@@ -390,7 +426,7 @@ public class PbSourceData extends AbstractSourceData {
}
// error config
if (childNodes.size() == 0) {
- return "";
+ return null;
}
// parse other node
DynamicMessage child = childRoot.get(rowNum);
@@ -423,7 +459,47 @@ public class PbSourceData extends AbstractSourceData {
subNodeValue = subNodeValueCache.get(node.getCurrentPath());
if (subNodeValue != null) {
if (i == childNodes.size() - 1) {
- return subNodeValue;
+ // primitive
+ if (node.isPrimitiveType()) {
+ return subNodeValue;
+ }
+ // struct
+ if (node.isStructType()) {
+ return subNodeValue;
+ }
+ // array
+ if (node.isArrayType()) {
+ if (!node.isHasArrayIndex()) {
+ return subNodeValue;
+ }
+ if (!(subNodeValue instanceof List)) {
+ return null;
+ }
+ List<?> nodeValueList = (List<?>) subNodeValue;
+ if (node.getArrayIndex() >= nodeValueList.size()) {
+ return null;
+ }
+ Object newNode =
nodeValueList.get(node.getArrayIndex());
+ if (!(newNode instanceof DynamicMessage)) {
+ return null;
+ }
+ return newNode;
+ }
+ // map
+ if (node.isMapType()) {
+ if (!node.isHasMapKey()) {
+ return subNodeValue;
+ }
+ final Object mapNodeValue = subNodeValue;
+ Map<Object, Object> mapCache =
subMapNodeCache.computeIfAbsent(node.getCurrentPath(),
+ k -> parseMapNode(mapNodeValue, node));
+ Object fieldValue = mapCache.get(node.getMapKey());
+ if (fieldValue == null || !(fieldValue instanceof
DynamicMessage)) {
+ return null;
+ }
+ return fieldValue;
+ }
+ return null;
} else {
// primitive
if (node.isPrimitiveType()) {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
index dcc5d84e20..908641fbee 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
@@ -51,7 +51,7 @@ public class DefaultSinkData implements SinkData {
*/
@Override
public Object getField(String fieldName) {
- return this.currentRow.getOrDefault(fieldName, "");
+ return this.currentRow.get(fieldName);
}
/**
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 7346a205d1..cc70ee3f95 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -116,6 +116,11 @@ public class TransformProcessor<I, O> {
fieldName = exprItem.toString();
} else {
fieldName = exprItem.getAlias().getName();
+ // Strip surrounding backticks if present
+ if (fieldName != null && fieldName.length() >= 2
+ && fieldName.startsWith("`") &&
fieldName.endsWith("`")) {
+ fieldName = fieldName.substring(1,
fieldName.length() - 1);
+ }
}
if (!this.checkSelectField(fieldName)) {
throw new JSQLParserException(
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
index 2743921c58..01d2263020 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -139,7 +139,7 @@ public class FieldToRowDataUtils {
FieldToRowDataConverter keyConverter =
createFieldRowConverter(((MapType) fieldType).getKeyType());
FieldToRowDataConverter valueConverter =
createFieldRowConverter(
((MapType) fieldType).getValueType());
- Map map = (Map) obj;
+ Map<?, ?> map = (Map<?, ?>) obj;
Map<Object, Object> internalMap = new HashMap<>();
for (Object k : map.keySet()) {
internalMap.put(keyConverter.convert(k),
@@ -279,10 +279,16 @@ public class FieldToRowDataUtils {
if (obj == null) {
return null;
}
+ if (obj instanceof BinaryStringData) {
+ return obj;
+ }
+ if (obj instanceof String) {
+ return new BinaryStringData((String) obj);
+ }
if (obj instanceof byte[]) {
- return StringData.fromString(new String((byte[]) obj));
+ return new BinaryStringData(new String((byte[]) obj));
}
- return StringData.fromString(String.valueOf(obj));
+ return new BinaryStringData(String.valueOf(obj));
} catch (RuntimeException e) {
if (isIgnoreError()) {
return null;
@@ -428,7 +434,7 @@ public class FieldToRowDataUtils {
if (obj instanceof List<?>) {
return new GenericArrayData(((List<?>) obj).toArray());
}
- return new GenericArrayData(new Object[]{obj});
+ return null;
} catch (RuntimeException e) {
if (isIgnoreError()) {
return null;
@@ -448,9 +454,7 @@ public class FieldToRowDataUtils {
if (obj instanceof Map<?, ?>) {
return new GenericMapData((Map<?, ?>) obj);
}
- Map<Object, Object> mapObj = new HashMap<>();
- mapObj.put(obj, obj);
- return new GenericMapData(mapObj);
+ return null;
} catch (RuntimeException e) {
if (isIgnoreError()) {
return null;
@@ -467,9 +471,7 @@ public class FieldToRowDataUtils {
if (obj instanceof GenericRowData) {
return obj;
}
- GenericRowData result = new GenericRowData(1);
- result.setField(0, obj);
- return result;
+ return null;
} catch (RuntimeException e) {
if (isIgnoreError()) {
return null;
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
index 300b5e3c9c..20c159f9ee 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
@@ -74,7 +74,7 @@ public class TestCsv2CsvForErrorOrderProcessor extends
AbstractProcessorTestBase
List<String> output1 =
processor1.transform("key1=string11&key2=string12&key3=number11&key4=number12",
new HashMap<>());
Assert.assertEquals(1, output1.size());
- Assert.assertEquals("field1=string11&field2=&field3=number12",
output1.get(0));
+ Assert.assertEquals("field1=string11&field2=null&field3=number12",
output1.get(0));
}
@Test
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
index e6dcb6e44e..88d023bf38 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
import org.junit.Assert;
import org.junit.Test;
@@ -98,8 +99,9 @@ public class TestPb2RowDataProcessor extends
AbstractProcessorTestBase {
Assert.assertEquals(output.get(0).getString(1).toString(), "1");
Assert.assertEquals(output.get(0).getString(2).toString(),
"1713243918000");
Assert.assertEquals(new String(output.get(0).getBinary(3)),
"msgValue4");
- Assert.assertEquals(((GenericMapData)
output.get(0).getMap(4)).get("key"), "value");
- Assert.assertEquals(((GenericMapData)
output.get(0).getMap(4)).get("value"), null);
+ Assert.assertEquals(((GenericMapData) output.get(0).getMap(4)).get(new
BinaryStringData("key")).toString(),
+ "value");
+ Assert.assertEquals(((GenericMapData) output.get(0).getMap(4)).get(new
BinaryStringData("value")), null);
Assert.assertEquals(new String(((GenericRowData)
output.get(0).getRow(5, 3)).getBinary(0)), "msgValue42");
Assert.assertEquals(((GenericRowData) output.get(0).getRow(5,
3)).getLong(1), 1713243918002L);
Assert.assertEquals(((GenericRowData) output.get(0).getRow(5,
3)).getMap(2).size(), 1);
@@ -109,8 +111,9 @@ public class TestPb2RowDataProcessor extends
AbstractProcessorTestBase {
Assert.assertEquals(output.get(1).getString(1).toString(), "1");
Assert.assertEquals(output.get(1).getString(2).toString(),
"1713243918002");
Assert.assertEquals(new String(output.get(1).getBinary(3)),
"msgValue42");
- Assert.assertEquals(((GenericMapData)
output.get(1).getMap(4)).get("key2"), "value2");
- Assert.assertEquals(((GenericMapData)
output.get(1).getMap(4)).get("value"), null);
+ Assert.assertEquals(((GenericMapData) output.get(1).getMap(4)).get(new
BinaryStringData("key2")).toString(),
+ "value2");
+ Assert.assertEquals(((GenericMapData) output.get(1).getMap(4)).get(new
BinaryStringData("value")), null);
Assert.assertEquals(new String(((GenericRowData)
output.get(1).getRow(5, 3)).getBinary(0)), "msgValue42");
Assert.assertEquals(((GenericRowData) output.get(1).getRow(5,
3)).getLong(1), 1713243918002L);
Assert.assertEquals(((GenericRowData) output.get(1).getRow(5,
3)).getMap(2).size(), 1);