This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 95c9a8204c [INLONG-12113][SDK] TransformSDK supports encoding of List,
Struct, and Binary type fields for RowData (#12114)
95c9a8204c is described below
commit 95c9a8204c67c4f2cfe7b6447feeced27f3b6720
Author: ChunLiang Lu <[email protected]>
AuthorDate: Tue Apr 21 15:09:26 2026 +0800
[INLONG-12113][SDK] TransformSDK supports encoding of List, Struct, and
Binary type fields for RowData (#12114)
* [INLONG-12113][SDK] TransformSDK supports encoding of List, Struct, and
Binary type fields for RowData
* fix UT case
* add UT case
---
.../apache/inlong/sdk/transform/decode/PbNode.java | 4 +
.../inlong/sdk/transform/decode/PbSourceData.java | 164 +++++++++++++------
.../sdk/transform/encode/CsvSinkEncoder.java | 4 +-
.../sdk/transform/encode/DefaultSinkData.java | 7 +-
.../inlong/sdk/transform/encode/KvSinkEncoder.java | 4 +-
.../sdk/transform/encode/MapSinkEncoder.java | 2 +-
.../sdk/transform/encode/ParquetSinkEncoder.java | 2 +-
.../inlong/sdk/transform/encode/PbSinkEncoder.java | 2 +-
.../sdk/transform/encode/RowDataSinkEncoder.java | 3 +-
.../inlong/sdk/transform/encode/SinkData.java | 4 +-
.../inlong/sdk/transform/encode/SinkEncoder.java | 9 ++
.../sdk/transform/process/TransformProcessor.java | 2 +-
.../process/function/string/ConcatFunction.java | 12 +-
.../sdk/transform/utils/FieldToRowDataUtils.java | 175 +++++++++++++++++----
.../process/processor/TestPb2RowDataProcessor.java | 118 ++++++++++++++
15 files changed, 416 insertions(+), 96 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
index 4fe166832b..310f6b1abb 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
@@ -41,6 +41,7 @@ public class PbNode {
private boolean isArray = false;
private int arrayIndex = -1;
private boolean isMap = false;
+ private boolean isMapType = false;
private String mapKey = "";
private FieldDescriptor mapKeyDesc;
private FieldDescriptor mapValueDesc;
@@ -60,6 +61,9 @@ public class PbNode {
this.fieldDesc = messageDesc.findFieldByName(name);
if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
this.messageType = this.fieldDesc.getMessageType();
+ if (isMapDescriptor(messageType)) {
+ this.isMapType = true;
+ }
}
}
} else {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
index dbeeade6f3..e46fb47d8d 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
@@ -21,13 +21,18 @@ import org.apache.inlong.sdk.transform.process.Context;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.DynamicMessage;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -54,7 +59,7 @@ public class PbSourceData extends AbstractSourceData {
private List<DynamicMessage> childRoot;
- private Charset srcCharset;
+ protected Charset srcCharset;
/**
* Constructor
@@ -105,8 +110,8 @@ public class PbSourceData extends AbstractSourceData {
* @return
*/
@Override
- public String getField(int rowNum, String fieldName) {
- String fieldValue = "";
+ public Object getField(int rowNum, String fieldName) {
+ Object fieldValue = "";
try {
if (isContextField(fieldName)) {
return getContextField(fieldName);
@@ -130,7 +135,7 @@ public class PbSourceData extends AbstractSourceData {
* @param fieldName
* @return
*/
- private String getRootField(String srcFieldName) {
+ private Object getRootField(String srcFieldName) {
List<PbNode> childNodes = this.columnNodeMap.get(srcFieldName);
if (childNodes == null) {
String fieldName = srcFieldName.substring(ROOT_KEY.length());
@@ -145,7 +150,7 @@ public class PbSourceData extends AbstractSourceData {
return "";
}
// parse other node
- String fieldValue = this.getNodeValue(childNodes, root);
+ Object fieldValue = this.getNodeValue(childNodes, root);
return fieldValue;
}
@@ -155,7 +160,7 @@ public class PbSourceData extends AbstractSourceData {
* @param srcFieldName
* @return
*/
- private String getChildField(int rowNum, String srcFieldName) {
+ private Object getChildField(int rowNum, String srcFieldName) {
if (this.childRoot == null || this.childDesc == null) {
return "";
}
@@ -174,7 +179,7 @@ public class PbSourceData extends AbstractSourceData {
}
// parse other node
DynamicMessage child = childRoot.get(rowNum);
- String fieldValue = this.getNodeValue(childNodes, child);
+ Object fieldValue = this.getNodeValue(childNodes, child);
return fieldValue;
}
@@ -184,9 +189,8 @@ public class PbSourceData extends AbstractSourceData {
* @param root
* @return
*/
- @SuppressWarnings("rawtypes")
- private String getNodeValue(List<PbNode> childNodes, DynamicMessage root) {
- String fieldValue = "";
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private Object getNodeValue(List<PbNode> childNodes, DynamicMessage root) {
DynamicMessage current = root;
for (int i = 0; i < childNodes.size(); i++) {
PbNode node = childNodes.get(i);
@@ -195,62 +199,120 @@ public class PbSourceData extends AbstractSourceData {
// error data
break;
}
- if (node.isLastNode()) {
- switch (node.getFieldDesc().getJavaType()) {
- case STRING:
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE:
- case BOOLEAN:
- fieldValue = String.valueOf(nodeValue);
- break;
- case BYTE_STRING:
- ByteString byteString = (ByteString) nodeValue;
- fieldValue = new String(byteString.toByteArray(),
srcCharset);
- break;
- case ENUM:
- fieldValue = String.valueOf(nodeValue);
- break;
- case MESSAGE:
- if (node.isArray()) {
- fieldValue = String.valueOf(((List)
nodeValue).get(node.getArrayIndex()));
- } else if (node.isMap()) {
- List<DynamicMessage> nodeValueList =
(List<DynamicMessage>) nodeValue;
- for (DynamicMessage subnodeValue : nodeValueList) {
- String keyValue =
String.valueOf(subnodeValue.getField(node.getMapKeyDesc()));
- if (StringUtils.equals(keyValue,
node.getMapKey())) {
- fieldValue =
String.valueOf(subnodeValue.getField(node.getMapValueDesc()));
- break;
- }
- }
- } else {
- fieldValue = String.valueOf(nodeValue);
+ if (!node.isLastNode()) {
+ if (node.isArray()) {
+ current = (DynamicMessage) ((List)
nodeValue).get(node.getArrayIndex());
+ } else if (node.isMap()) {
+ List<DynamicMessage> nodeValueList =
(List<DynamicMessage>) nodeValue;
+ DynamicMessage newCurrent = null;
+ for (DynamicMessage subnodeValue : nodeValueList) {
+ String keyValue =
String.valueOf(subnodeValue.getField(node.getMapKeyDesc()));
+ if (StringUtils.equals(keyValue, node.getMapKey())) {
+ newCurrent = (DynamicMessage)
subnodeValue.getField(node.getMapValueDesc());
+ break;
}
- break;
+ }
+ if (newCurrent == null) {
+ return null;
+ }
+ current = newCurrent;
+ } else {
+ current = (DynamicMessage) nodeValue;
}
- break;
+ continue;
}
+ // last node
if (node.isArray()) {
- current = (DynamicMessage) ((List)
nodeValue).get(node.getArrayIndex());
+ return buildStructData(node.getMessageType(), ((List)
nodeValue).get(node.getArrayIndex()));
} else if (node.isMap()) {
List<DynamicMessage> nodeValueList = (List<DynamicMessage>)
nodeValue;
- DynamicMessage newCurrent = null;
+ Object fieldValue = null;
for (DynamicMessage subnodeValue : nodeValueList) {
String keyValue =
String.valueOf(subnodeValue.getField(node.getMapKeyDesc()));
if (StringUtils.equals(keyValue, node.getMapKey())) {
- newCurrent = (DynamicMessage)
subnodeValue.getField(node.getMapValueDesc());
+ fieldValue =
subnodeValue.getField(node.getMapValueDesc());
break;
}
}
- if (newCurrent == null) {
- return fieldValue;
+ return this.buildFieldValue(node.getFieldDesc(), fieldValue,
false);
+ } else if (node.isMapType()) {
+ return this.buildStructData(node.getMessageType(), nodeValue);
+ } else if (node.getFieldDesc().isRepeated()) {
+ List<Object> valueList = (List) nodeValue;
+ List<Object> result = new ArrayList<>(valueList.size());
+ for (Object value : valueList) {
+ result.add(this.buildFieldValue(node.getFieldDesc(),
value, false));
}
- current = newCurrent;
+ return new GenericArrayData(result.toArray());
} else {
- current = (DynamicMessage) nodeValue;
+ return this.buildFieldValue(node.getFieldDesc(), nodeValue,
false);
}
}
- return fieldValue;
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Object buildFieldValue(FieldDescriptor fieldDesc, Object
nodeValue, boolean isRepeated) {
+ if (nodeValue == null) {
+ return null;
+ }
+ switch (fieldDesc.getJavaType()) {
+ case STRING:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ case ENUM:
+ return nodeValue;
+ case BYTE_STRING:
+ return ((ByteString) nodeValue).toByteArray();
+ case MESSAGE: {
+ if (!isRepeated) {
+ return this.buildStructData(fieldDesc.getMessageType(),
nodeValue);
+ } else if (PbNode.isMapDescriptor(fieldDesc.getMessageType()))
{
+ return this.buildStructData(fieldDesc.getMessageType(),
nodeValue);
+ }
+ List<DynamicMessage> valueList = (List<DynamicMessage>)
nodeValue;
+ List<Object> result = new ArrayList<>(valueList.size());
+ for (DynamicMessage value : valueList) {
+
result.add(this.buildStructData(fieldDesc.getMessageType(), value));
+ }
+ return new GenericArrayData(result.toArray());
+ }
+ default:
+ return String.valueOf(nodeValue);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Object buildStructData(Descriptors.Descriptor messageType,
Object nodeValue) {
+ // map
+ if (PbNode.isMapDescriptor(messageType)) {
+ Descriptors.FieldDescriptor keyField =
messageType.findFieldByNumber(1);
+ Descriptors.FieldDescriptor valueField =
messageType.findFieldByNumber(2);
+ List<DynamicMessage> subNodeValueList = (List<DynamicMessage>)
nodeValue;
+ Map<Object, Object> result = new HashMap<>();
+ for (DynamicMessage subnodeValue : subNodeValueList) {
+ Object keyValue = buildFieldValue(keyField,
subnodeValue.getField(keyField), false);
+ Object valueValue = buildFieldValue(valueField,
subnodeValue.getField(valueField), false);
+ result.put(keyValue, valueValue);
+ }
+ return new GenericMapData(result);
+ }
+ // struct
+ DynamicMessage msgObj = (DynamicMessage) nodeValue;
+ GenericRowData result = new
GenericRowData(messageType.getFields().size());
+ int index = 0;
+ for (FieldDescriptor fieldDesc : messageType.getFields()) {
+ Object fieldValue = msgObj.getField(fieldDesc);
+ if (fieldValue == null) {
+ result.setField(index++, null);
+ continue;
+ }
+ Object fieldResult = this.buildFieldValue(fieldDesc, fieldValue,
false);
+ result.setField(index++, fieldResult);
+ }
+ return result;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
index 97b43c02b9..6c9c99c9dd 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -63,7 +63,7 @@ public class CsvSinkEncoder extends SinkEncoder<String> {
sinkData.keyList().forEach(k ->
builder.append(sinkData.getField(k)).append(delimiter));
} else {
for (String fieldName : sinkData.keyList()) {
- String fieldValue = sinkData.getField(fieldName);
+ String fieldValue =
formatFieldValue(sinkData.getField(fieldName));
if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
builder.append(fieldValue);
} else {
@@ -78,7 +78,7 @@ public class CsvSinkEncoder extends SinkEncoder<String> {
} else {
for (FieldInfo field : fields) {
String fieldName = field.getName();
- String fieldValue = sinkData.getField(fieldName);
+ String fieldValue =
formatFieldValue(sinkData.getField(fieldName));
EscapeUtils.escapeContent(builder, delimiter, escapeChar,
fieldValue);
builder.append(delimiter);
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
index 2b470cdc24..dcc5d84e20 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
@@ -26,13 +26,12 @@ import java.util.Map;
/**
* DefaultSinkData
- *
*/
@Data
public class DefaultSinkData implements SinkData {
private List<String> keyList = new ArrayList<>();
- private Map<String, String> currentRow = new HashMap<>();
+ private Map<String, Object> currentRow = new HashMap<>();
/**
* addField
@@ -40,7 +39,7 @@ public class DefaultSinkData implements SinkData {
* @param fieldValue
*/
@Override
- public void addField(String fieldName, String fieldValue) {
+ public void addField(String fieldName, Object fieldValue) {
this.keyList.add(fieldName);
this.currentRow.put(fieldName, fieldValue);
}
@@ -51,7 +50,7 @@ public class DefaultSinkData implements SinkData {
* @return
*/
@Override
- public String getField(String fieldName) {
+ public Object getField(String fieldName) {
return this.currentRow.getOrDefault(fieldName, "");
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
index 094f4d6884..444c73ff6c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -60,7 +60,7 @@ public class KvSinkEncoder extends SinkEncoder<String> {
builder.delete(0, builder.length());
if (fields == null || fields.size() == 0) {
for (String fieldName : sinkData.keyList()) {
- String fieldValue = sinkData.getField(fieldName);
+ String fieldValue =
formatFieldValue(sinkData.getField(fieldName));
if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
builder.append(fieldValue).append(entryDelimiter);
} else {
@@ -70,7 +70,7 @@ public class KvSinkEncoder extends SinkEncoder<String> {
} else {
for (FieldInfo field : fields) {
String fieldName = field.getName();
- String fieldValue = sinkData.getField(fieldName);
+ String fieldValue =
formatFieldValue(sinkData.getField(fieldName));
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
index c76c4e80ff..cdcc8d3fa7 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
@@ -49,7 +49,7 @@ public class MapSinkEncoder extends SinkEncoder<Map<String,
Object>> {
Map<String, Object> esMap = new HashMap<>();
for (FieldInfo fieldInfo : fields) {
String fieldName = fieldInfo.getName();
- String strValue = sinkData.getField(fieldName);
+ String strValue = formatFieldValue(sinkData.getField(fieldName));
TypeConverter converter = converters.get(fieldName);
if (converter == null) {
esMap.put(fieldName, strValue);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
index 6d377b061c..bbe2d01863 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
@@ -70,7 +70,7 @@ public class ParquetSinkEncoder extends
SinkEncoder<ByteArrayOutputStream> {
Object[] rowsInfo = new Object[size];
Arrays.fill(rowsInfo, "");
for (int i = 0; i < size; i++) {
- String fieldData = sinkData.getField(this.fields.get(i).getName());
+ String fieldData =
formatFieldValue(sinkData.getField(this.fields.get(i).getName()));
if (fieldData == null) {
continue;
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
index 226405c515..e0263558ca 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
@@ -58,7 +58,7 @@ public class PbSinkEncoder extends SinkEncoder<byte[]> {
for (String key : sinkData.keyList()) {
Descriptors.FieldDescriptor fieldDescriptor =
dynamicDescriptor.findFieldByName(key);
if (fieldDescriptor != null) {
- String fieldValue = sinkData.getField(key);
+ String fieldValue =
formatFieldValue(sinkData.getField(key));
if (fieldValue != null) {
Object value = convertValue(fieldDescriptor,
fieldValue);
dynamicBuilder.setField(fieldDescriptor, value);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
index 1cab9f6fe3..507741d5c3 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
@@ -42,10 +42,9 @@ public class RowDataSinkEncoder extends SinkEncoder<RowData>
{
@Override
public RowData encode(SinkData sinkData, Context context) {
GenericRowData rowData = new
GenericRowData(fieldToRowDataConverters.length);
-
for (int i = 0; i < fields.size(); i++) {
String fieldName = fields.get(i).getName();
- String fieldValue = sinkData.getField(fieldName);
+ Object fieldValue = sinkData.getField(fieldName);
rowData.setField(i,
fieldToRowDataConverters[i].convert(fieldValue));
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
index 1ad0c38c68..6137eb1522 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
@@ -25,9 +25,9 @@ import java.util.List;
*/
public interface SinkData {
- void addField(String fieldName, String fieldValue);
+ void addField(String fieldName, Object fieldValue);
- String getField(String fieldName);
+ Object getField(String fieldName);
List<String> keyList();
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
index bd804cd771..05d2e6a9ad 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -44,4 +44,13 @@ public abstract class SinkEncoder<Output> {
}
public abstract Output encode(SinkData sinkData, Context context);
+
+ protected String formatFieldValue(Object fieldValue) {
+ if (fieldValue == null) {
+ return null;
+ } else if (fieldValue instanceof byte[]) {
+ return new String((byte[]) fieldValue);
+ }
+ return String.valueOf(fieldValue);
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index e6392bbe2d..7346a205d1 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -227,7 +227,7 @@ public class TransformProcessor<I, O> {
if (fieldValue == null) {
sinkData.addField(fieldName, "");
} else {
- sinkData.addField(fieldName, fieldValue.toString());
+ sinkData.addField(fieldName, fieldValue);
}
} catch (Throwable t) {
sinkData.addField(fieldName, "");
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ConcatFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ConcatFunction.java
index 85eba0567f..3f5faa5710 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ConcatFunction.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ConcatFunction.java
@@ -27,6 +27,7 @@ import
org.apache.inlong.sdk.transform.process.parser.ValueParser;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;
+import org.apache.flink.table.data.StringData;
import java.util.ArrayList;
import java.util.List;
@@ -64,7 +65,16 @@ public class ConcatFunction implements ValueParser {
public Object parse(SourceData sourceData, int rowIndex, Context context) {
StringBuilder builder = new StringBuilder();
for (ValueParser node : nodeList) {
- builder.append(node.parse(sourceData, rowIndex, context));
+ Object itemValue = node.parse(sourceData, rowIndex, context);
+ if (itemValue == null) {
+ continue;
+ } else if (itemValue instanceof byte[]) {
+ builder.append(new String((byte[]) itemValue));
+ } else if (itemValue instanceof StringData) {
+ builder.append(((StringData) itemValue).toString());
+ } else {
+ builder.append(String.valueOf(itemValue));
+ }
}
return builder.toString();
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
index ff61d912b3..2743921c58 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
@@ -22,6 +22,7 @@ import
org.apache.inlong.sdk.transform.decode.TransformException;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
@@ -39,6 +40,7 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -72,7 +74,6 @@ public class FieldToRowDataUtils {
converterMap.put(LogicalTypeRoot.BIGINT, (obj) -> parseLong(obj));
converterMap.put(LogicalTypeRoot.FLOAT, (obj) -> parseFloat(obj));
converterMap.put(LogicalTypeRoot.DOUBLE, (obj) -> parseDouble(obj));
- converterMap.put(LogicalTypeRoot.BINARY, (obj) -> parseBinary(obj));
converterMap.put(LogicalTypeRoot.VARBINARY, (obj) -> parseBinary(obj));
converterMap.put(LogicalTypeRoot.CHAR, (obj) -> parseVarchar(obj));
converterMap.put(LogicalTypeRoot.VARCHAR, (obj) -> parseVarchar(obj));
@@ -82,6 +83,10 @@ public class FieldToRowDataUtils {
converterMap.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, (obj) ->
parseTimestampWithLocalTimeZone(obj));
converterMap.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, (obj) ->
parseTimestampWithLocalTimeZone(obj));
converterMap.put(LogicalTypeRoot.DECIMAL, (obj) -> parseDecimal(obj));
+ converterMap.put(LogicalTypeRoot.BINARY, (obj) -> parseBinary(obj));
+ converterMap.put(LogicalTypeRoot.ARRAY, (obj) -> parseArray(obj));
+ converterMap.put(LogicalTypeRoot.MAP, (obj) -> parseMap(obj));
+ converterMap.put(LogicalTypeRoot.ROW, (obj) -> parseRow(obj));
}
private static final ThreadLocal<Map<String, SimpleDateFormat>>
formatLocal = new ThreadLocal<>();
@@ -122,8 +127,8 @@ public class FieldToRowDataUtils {
case ARRAY:
return obj -> {
final Object[] array = (Object[]) obj;
- FieldToRowDataConverter elementConverter =
- createFieldRowConverter(((ArrayType)
fieldType).getElementType());
+ FieldToRowDataConverter elementConverter =
createFieldRowConverter(
+ ((ArrayType) fieldType).getElementType());
Object[] converted = Arrays.stream(array)
.map(elementConverter::convert)
.toArray();
@@ -131,10 +136,9 @@ public class FieldToRowDataUtils {
};
case MAP:
return obj -> {
- FieldToRowDataConverter keyConverter =
- createFieldRowConverter(((MapType)
fieldType).getKeyType());
- FieldToRowDataConverter valueConverter =
- createFieldRowConverter(((MapType)
fieldType).getValueType());
+ FieldToRowDataConverter keyConverter =
createFieldRowConverter(((MapType) fieldType).getKeyType());
+ FieldToRowDataConverter valueConverter =
createFieldRowConverter(
+ ((MapType) fieldType).getValueType());
Map map = (Map) obj;
Map<Object, Object> internalMap = new HashMap<>();
for (Object k : map.keySet()) {
@@ -153,7 +157,13 @@ public class FieldToRowDataUtils {
private static Object parseBoolean(Object obj) {
try {
- return Boolean.parseBoolean(obj.toString());
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof Boolean) {
+ return obj;
+ }
+ return Boolean.parseBoolean(String.valueOf(obj));
} catch (RuntimeException e) {
if (isIgnoreError()) {
return null;
@@ -164,7 +174,13 @@ public class FieldToRowDataUtils {
private static Object parseTinyint(Object obj) {
try {
- return Byte.parseByte(obj.toString());
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof Byte) {
+ return obj;
+ }
+ return Byte.parseByte(String.valueOf(obj));
} catch (RuntimeException e) {
if (isIgnoreError()) {
return null;
@@ -175,7 +191,13 @@ public class FieldToRowDataUtils {
private static Object parseSmallint(Object obj) {
try {
- return Short.parseShort(obj.toString());
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof Short) {
+ return obj;
+ }
+ return Short.parseShort(String.valueOf(obj));
} catch (RuntimeException e) {
if (isIgnoreError()) {
return null;
@@ -186,7 +208,13 @@ public class FieldToRowDataUtils {
private static Object parseInteger(Object obj) {
try {
- return Integer.parseInt(obj.toString());
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof Integer) {
+ return obj;
+ }
+ return Integer.parseInt(String.valueOf(obj));
} catch (RuntimeException e) {
if (isIgnoreError()) {
return null;
@@ -197,7 +225,13 @@ public class FieldToRowDataUtils {
private static Object parseLong(Object obj) {
try {
- return Long.parseLong(obj.toString());
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof Long) {
+ return obj;
+ }
+ return Long.parseLong(String.valueOf(obj));
} catch (RuntimeException e) {
if (isIgnoreError()) {
return null;
@@ -208,7 +242,13 @@ public class FieldToRowDataUtils {
private static Object parseFloat(Object obj) {
try {
- return Float.parseFloat(obj.toString());
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof Float) {
+ return obj;
+ }
+ return Float.parseFloat(String.valueOf(obj));
} catch (RuntimeException e) {
if (isIgnoreError()) {
return null;
@@ -219,18 +259,13 @@ public class FieldToRowDataUtils {
private static Object parseDouble(Object obj) {
try {
- return Double.parseDouble(obj.toString());
- } catch (RuntimeException e) {
- if (isIgnoreError()) {
+ if (obj == null) {
return null;
}
- throw e;
- }
- }
-
- private static Object parseBinary(Object obj) {
- try {
- return obj.toString().getBytes();
+ if (obj instanceof Double) {
+ return obj;
+ }
+ return Double.parseDouble(String.valueOf(obj));
} catch (RuntimeException e) {
if (isIgnoreError()) {
return null;
@@ -241,7 +276,13 @@ public class FieldToRowDataUtils {
private static Object parseVarchar(Object obj) {
try {
- return StringData.fromString((String) obj);
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof byte[]) {
+ return StringData.fromString(new String((byte[]) obj));
+ }
+ return StringData.fromString(String.valueOf(obj));
} catch (RuntimeException e) {
if (isIgnoreError()) {
return null;
@@ -258,7 +299,7 @@ public class FieldToRowDataUtils {
if (obj instanceof Date) {
return ((Date) obj).toLocalDate().toEpochDay();
}
- String strObj = obj.toString();
+ String strObj = String.valueOf(obj);
Date date = parseDateTime(strObj);
return date.toLocalDate().toEpochDay();
} catch (RuntimeException e) {
@@ -305,7 +346,7 @@ public class FieldToRowDataUtils {
if (obj instanceof Time) {
return ((Time) obj).toLocalTime().toSecondOfDay() * 1000;
}
- String strObj = obj.toString();
+ String strObj = String.valueOf(obj);
Date date = parseDateTime(strObj);
return new Time(date.getTime()).toLocalTime().toSecondOfDay() *
1000;
} catch (RuntimeException e) {
@@ -324,7 +365,7 @@ public class FieldToRowDataUtils {
if (obj instanceof Timestamp) {
return TimestampData.fromTimestamp((Timestamp) obj);
}
- String strObj = obj.toString();
+ String strObj = String.valueOf(obj);
Date date = parseDateTime(strObj);
return TimestampData.fromTimestamp(new Timestamp(date.getTime()));
} catch (RuntimeException e) {
@@ -346,7 +387,7 @@ public class FieldToRowDataUtils {
DecimalType.DEFAULT_PRECISION,
DecimalType.DEFAULT_SCALE);
}
- String strObj = obj.toString();
+ String strObj = String.valueOf(obj);
return DecimalData.fromBigDecimal(
new BigDecimal(strObj),
DecimalType.DEFAULT_PRECISION,
@@ -358,4 +399,82 @@ public class FieldToRowDataUtils {
throw e;
}
}
+
+ private static Object parseBinary(Object obj) {
+ try {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof byte[]) {
+ return obj;
+ }
+ return String.valueOf(obj).getBytes();
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseArray(Object obj) {
+ try {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof GenericArrayData) {
+ return obj;
+ }
+ if (obj instanceof List<?>) {
+ return new GenericArrayData(((List<?>) obj).toArray());
+ }
+ return new GenericArrayData(new Object[]{obj});
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseMap(Object obj) {
+ try {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof GenericMapData) {
+ return obj;
+ }
+ if (obj instanceof Map<?, ?>) {
+ return new GenericMapData((Map<?, ?>) obj);
+ }
+ Map<Object, Object> mapObj = new HashMap<>();
+ mapObj.put(obj, obj);
+ return new GenericMapData(mapObj);
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private static Object parseRow(Object obj) {
+ try {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof GenericRowData) {
+ return obj;
+ }
+ GenericRowData result = new GenericRowData(1);
+ result.setField(0, obj);
+ return result;
+ } catch (RuntimeException e) {
+ if (isIgnoreError()) {
+ return null;
+ }
+ throw e;
+ }
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
new file mode 100644
index 0000000000..a126f8d52d
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.processor;
+
+import
org.apache.inlong.common.pojo.sort.dataflow.field.format.ArrayFormatInfo;
+import
org.apache.inlong.common.pojo.sort.dataflow.field.format.BinaryFormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.LongFormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.MapFormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
+import
org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestPb2RowDataProcessor extends AbstractProcessorTestBase {
+
+ @Test
+ public void testPb2RowData() throws Exception {
+ String transformBase64 = this.getPbTestDescription();
+ PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", "msgs");
+ String[] fieldNames = new String[]{"sid", "packageID", "msgTime",
+ "binaryMsg", "mapExtinfo", "structMsgItem", "listMsgs"};
+ List<FieldInfo> sinkFields = this.getTestFieldList("sid", "packageID",
"msgTime");
+ // binaryMsg
+ FieldInfo binaryMsg = new FieldInfo("binaryMsg");
+ BinaryFormatInfo binaryMsgFormat = new
BinaryFormatInfo(Integer.MAX_VALUE);
+ binaryMsg.setFormatInfo(binaryMsgFormat);
+ sinkFields.add(binaryMsg);
+ // mapExtinfo
+ FieldInfo mapExtinfo = new FieldInfo("mapExtinfo");
+ MapFormatInfo mapExtinfoFormat = new MapFormatInfo(new
StringFormatInfo(), new StringFormatInfo());
+ mapExtinfo.setFormatInfo(mapExtinfoFormat);
+ sinkFields.add(mapExtinfo);
+ // structMsgItem
+ FieldInfo structMsgItem = new FieldInfo("structMsgItem");
+ String[] structMsgItemFields = new String[]{"msg", "msgTime",
"extinfo"};
+ FormatInfo[] structMsgItemFormats = new FormatInfo[]{
+ new BinaryFormatInfo(Integer.MAX_VALUE),
+ new LongFormatInfo(),
+ new MapFormatInfo(new StringFormatInfo(), new
StringFormatInfo())
+ };
+ RowFormatInfo structMsgItemFormat = new
RowFormatInfo(structMsgItemFields, structMsgItemFormats);
+ structMsgItem.setFormatInfo(structMsgItemFormat);
+ sinkFields.add(structMsgItem);
+ // listMsgs
+ FieldInfo listMsgs = new FieldInfo("listMsgs");
+ ArrayFormatInfo listMsgsFormat = new
ArrayFormatInfo(structMsgItemFormat);
+ listMsgs.setFormatInfo(listMsgsFormat);
+ sinkFields.add(listMsgs);
+ // sink
+ RowDataSinkInfo rowSink = new RowDataSinkInfo("UTF-8", sinkFields);
+ // sql
+ String transformSql = "select $root.sid,$root.packageID,$child.msgTime"
+ + ",$child.msg as binaryMsg,"
+ + "$child.extinfo as mapExtinfo,"
+ + "$root.msgs(1) as structMsgItem,"
+ + "$root.msgs as listMsgs from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1
+ TransformProcessor<String, RowData> processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+ SinkEncoderFactory.createRowEncoder(rowSink));
+ byte[] srcBytes = this.getPbTestData();
+ List<RowData> output = processor.transformForBytes(srcBytes, new
HashMap<>());
+ Assert.assertEquals(2, output.size());
+ // 0
+ Assert.assertEquals(output.get(0).getString(0).toString(), "sid");
+ Assert.assertEquals(output.get(0).getString(1).toString(), "1");
+ Assert.assertEquals(output.get(0).getString(2).toString(),
"1713243918000");
+ Assert.assertEquals(new String(output.get(0).getBinary(3)),
"msgValue4");
+ Assert.assertEquals(((GenericMapData)
output.get(0).getMap(4)).get("key"), "value");
+ Assert.assertEquals(((GenericMapData)
output.get(0).getMap(4)).get("value"), null);
+ Assert.assertEquals(new String(((GenericRowData)
output.get(0).getRow(5, 3)).getBinary(0)), "msgValue42");
+ Assert.assertEquals(((GenericRowData) output.get(0).getRow(5,
3)).getLong(1), 1713243918002L);
+ Assert.assertEquals(((GenericRowData) output.get(0).getRow(5,
3)).getMap(2).size(), 1);
+ Assert.assertEquals(output.get(0).getArray(6).size(), 2);
+ // 1
+ Assert.assertEquals(output.get(1).getString(0).toString(), "sid");
+ Assert.assertEquals(output.get(1).getString(1).toString(), "1");
+ Assert.assertEquals(output.get(1).getString(2).toString(),
"1713243918002");
+ Assert.assertEquals(new String(output.get(1).getBinary(3)),
"msgValue42");
+ Assert.assertEquals(((GenericMapData)
output.get(1).getMap(4)).get("key2"), "value2");
+ Assert.assertEquals(((GenericMapData)
output.get(1).getMap(4)).get("value"), null);
+ Assert.assertEquals(new String(((GenericRowData)
output.get(1).getRow(5, 3)).getBinary(0)), "msgValue42");
+ Assert.assertEquals(((GenericRowData) output.get(1).getRow(5,
3)).getLong(1), 1713243918002L);
+ Assert.assertEquals(((GenericRowData) output.get(1).getRow(5,
3)).getMap(2).size(), 1);
+ Assert.assertEquals(output.get(1).getArray(6).size(), 2);
+ }
+}