This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ae3c3dfa11 [INLONG-12117][SDK] Support
concat_struct/extract_struct/extract_binary function (#12118)
ae3c3dfa11 is described below
commit ae3c3dfa11eacb61fe135016e0580e977559b966
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon May 18 16:48:34 2026 +0800
[INLONG-12117][SDK] Support concat_struct/extract_struct/extract_binary
function (#12118)
* [INLONG-12117][SDK] Support concat_struct/extract_struct/extract_binary
function
* fix AI comments
* fix comments
* For enum types, return the integer index.
* Allow extract_struct to accept the results of extract_binary as its
parameters. And the extract_struct function supports extracting array nodes.
* Add support for array nodes in the extract_struct function.
* New function concat_struct(node1, node2, ...) — picks values from
multiple PB nodes and assembles them into one RowData.
New function extract_struct_excluding(structPath, excludeChild1,
excludeChild2, ...) — returns a copy of the struct field with the given
sub-nodes removed.
Composition extract_binary(extract_struct_excluding(...)) re-encodes the
trimmed message as a byte[], allowing bulky sub-nodes to be stripped before
being written to a binary sink column.
Multi-level protobuf path lookups now reuse cached intermediate node
values, avoiding repeated tree descents and improving transform throughput.
---
.../sdk/transform/decode/AbstractSourceData.java | 4 +-
.../apache/inlong/sdk/transform/decode/PbNode.java | 196 ++++--
.../inlong/sdk/transform/decode/PbSourceData.java | 734 +++++++++++++++++----
.../sdk/transform/decode/PbSourceDecoder.java | 4 +-
.../process/function/FunctionConstant.java | 2 +
.../process/function/pb/ConcatStructFunction.java | 72 ++
.../process/function/pb/ExtractBinaryFunction.java | 217 ++++++
.../pb/ExtractStructExcludingFunction.java | 182 +++++
.../process/function/pb/ExtractStructFunction.java | 179 +++++
.../sdk/transform/process/parser/ColumnParser.java | 7 +
.../process/function/TestFunctionDoc.java | 2 +-
.../process/processor/TestPb2RowDataProcessor.java | 137 ++++
12 files changed, 1566 insertions(+), 170 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
index c7e40282ae..0caf9619eb 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
@@ -29,11 +29,11 @@ public abstract class AbstractSourceData implements
SourceData {
protected Context context;
- protected boolean isContextField(String fieldName) {
+ public boolean isContextField(String fieldName) {
return fieldName.startsWith(CTX_KEY);
}
- protected String getContextField(String fieldName) {
+ public String getContextField(String fieldName) {
if (context == null) {
return "";
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
index 310f6b1abb..7351958216 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
@@ -23,6 +23,8 @@ import
com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import lombok.Data;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@@ -34,63 +36,118 @@ import java.util.List;
@Data
public class PbNode {
+ public static final Logger LOG = LoggerFactory.getLogger(PbNode.class);
+
private String name;
private FieldDescriptor fieldDesc;
- private Descriptors.Descriptor messageType;
private boolean isLastNode = false;
- private boolean isArray = false;
- private int arrayIndex = -1;
- private boolean isMap = false;
+ // primitive
+ private boolean isPrimitiveType = false;
+ // array
+ private boolean isArrayType = false;
+ private boolean hasArrayIndex = false;
+ private Integer arrayIndex;
+ // struct
+ private boolean isStructType = false;
+ // map
private boolean isMapType = false;
- private String mapKey = "";
+ private boolean hasMapKey = false;
+ private Object mapKey;
private FieldDescriptor mapKeyDesc;
private FieldDescriptor mapValueDesc;
+ // parent path
+ private String parentPath;
+ private String currentPath;
+ private String currentIndexPath;
- public PbNode(Descriptors.Descriptor messageDesc, String nodeString,
boolean isLastNode) {
- int beginIndex = nodeString.indexOf('(');
- if (beginIndex < 0) {
- this.name = nodeString;
- if (isMapDescriptor(messageDesc)) {
- FieldDescriptor valueFieldDesc =
messageDesc.getFields().get(1);
- Descriptors.Descriptor valueTypeDesc =
valueFieldDesc.getMessageType();
- this.fieldDesc = valueTypeDesc.findFieldByName(name);
- if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
- this.messageType = this.fieldDesc.getMessageType();
+ public PbNode(Descriptors.Descriptor parentDesc, String parentPath, String
nodeString, boolean isLastNode) {
+ try {
+ if (parentDesc == null) {
+ return;
+ }
+ this.isLastNode = isLastNode;
+ // parse name & index
+ int beginIndex = nodeString.indexOf('(');
+ String indexString = null;
+ if (beginIndex < 0) {
+ this.name = nodeString;
+ } else {
+ this.name = StringUtils.trim(nodeString.substring(0,
beginIndex));
+ int endIndex = nodeString.lastIndexOf(')');
+ if (endIndex >= 0) {
+ indexString = nodeString.substring(beginIndex + 1,
endIndex);
}
+ }
+ // cache path key
+ this.parentPath = parentPath;
+ if (this.parentPath == null) {
+ this.currentPath = this.name;
+ this.currentIndexPath = nodeString;
} else {
- this.fieldDesc = messageDesc.findFieldByName(name);
- if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
- this.messageType = this.fieldDesc.getMessageType();
- if (isMapDescriptor(messageType)) {
- this.isMapType = true;
- }
+ this.currentPath = this.parentPath + "." + this.name;
+ this.currentIndexPath = this.parentPath + "." + nodeString;
+ }
+ // field desc
+ this.fieldDesc = parentDesc.findFieldByName(name);
+ if (this.fieldDesc == null) {
+ return;
+ }
+ // map
+ if (this.fieldDesc.getJavaType() == JavaType.MESSAGE
+ && isMapDescriptor(this.fieldDesc.getMessageType())) {
+ this.isMapType = true;
+ this.mapKeyDesc =
this.fieldDesc.getMessageType().getFields().get(0);
+ this.mapValueDesc =
this.fieldDesc.getMessageType().getFields().get(1);
+ if (indexString != null) {
+ this.hasMapKey = true;
+ this.mapKey = parseMapKey(indexString, mapKeyDesc);
}
+ return;
}
- } else {
- this.name = StringUtils.trim(nodeString.substring(0, beginIndex));
- this.fieldDesc = messageDesc.findFieldByName(name);
- if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
- this.messageType = this.fieldDesc.getMessageType();
- int endIndex = nodeString.lastIndexOf(')');
- if (isMapDescriptor(messageType)) {
- this.isMap = true;
- if (endIndex >= 0) {
- this.mapKey = nodeString.substring(beginIndex + 1,
endIndex);
- this.mapKeyDesc = messageType.getFields().get(0);
- this.mapValueDesc = messageType.getFields().get(1);
- }
- } else {
- this.isArray = true;
- if (endIndex >= 0) {
- this.arrayIndex =
NumberUtils.toInt(nodeString.substring(beginIndex + 1, endIndex), -1);
- if (this.arrayIndex < 0) {
- this.arrayIndex = 0;
- }
- }
+ // array
+ if (this.fieldDesc.isRepeated()) {
+ this.isArrayType = true;
+ this.arrayIndex = NumberUtils.toInt(indexString, -1);
+ if (arrayIndex >= 0) {
+ this.hasArrayIndex = true;
}
+ return;
+ }
+ // struct
+ if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
+ this.isStructType = true;
+ return;
}
+ // primitive
+ this.isPrimitiveType = true;
+ } catch (RuntimeException t) {
+ LOG.error("Fail to
PbNode,error:{},fullName:{},nodePath:{},isLastNode:{}",
+ t.getMessage(), parentDesc.getName(), nodeString,
isLastNode, t);
+ throw t;
+ }
+ }
+
+ private static Object parseMapKey(String indexString, FieldDescriptor
mapKeyDesc) {
+ switch (mapKeyDesc.getJavaType()) {
+ case STRING:
+ return indexString;
+ case INT:
+ return NumberUtils.toInt(indexString, 0);
+ case LONG:
+ return NumberUtils.toLong(indexString, 0);
+ case FLOAT:
+ return NumberUtils.toFloat(indexString, 0);
+ case DOUBLE:
+ return NumberUtils.toDouble(indexString, 0);
+ case BOOLEAN:
+ return Boolean.TRUE.toString().equals(indexString);
+ case ENUM:
+ return mapKeyDesc.getEnumType().findValueByName(indexString);
+ case BYTE_STRING:
+ case MESSAGE:
+ default:
+ return indexString;
}
- this.isLastNode = isLastNode;
}
/**
@@ -105,16 +162,61 @@ public class PbNode {
}
List<PbNode> nodes = new ArrayList<>();
String[] nodeStrings = nodePath.split("\\.");
- int lastIndex = nodeStrings.length - 1;
+ final int lastIndex = nodeStrings.length - 1;
+ String parentPath = null;
+ StringBuilder currentPathBuilder = new StringBuilder();
Descriptors.Descriptor current = rootDesc;
for (int i = 0; i <= lastIndex; i++) {
if (current == null) {
return null;
}
+ // pbNode
String nodeString = nodeStrings[i];
- PbNode pbNode = new PbNode(current, nodeString, (i == lastIndex));
- current = pbNode.getMessageType();
- nodes.add(pbNode);
+ PbNode pbNode = new PbNode(current, parentPath, nodeString, (i ==
lastIndex));
+ if (parentPath == null) {
+ currentPathBuilder.append(nodeString);
+ } else {
+ currentPathBuilder.append(".").append(nodeString);
+ }
+ parentPath = currentPathBuilder.toString();
+ if (pbNode.getFieldDesc() == null) {
+ return null;
+ }
+ // primitive
+ if (pbNode.isPrimitiveType()) {
+ current = null;
+ nodes.add(pbNode);
+ continue;
+ } else if (pbNode.isArrayType()) {
+ // array
+ if (pbNode.getFieldDesc().getJavaType() == JavaType.MESSAGE) {
+ current = pbNode.getFieldDesc().getMessageType();
+ } else {
+ current = null;
+ }
+ nodes.add(pbNode);
+ continue;
+ } else if (pbNode.isMapType()) {
+ // map
+ if (pbNode.isHasMapKey()) {
+ if (pbNode.getMapValueDesc().getJavaType() ==
JavaType.MESSAGE) {
+ current = pbNode.getMapValueDesc().getMessageType();
+ } else {
+ current = null;
+ }
+ } else {
+ current = null;
+ }
+ nodes.add(pbNode);
+ continue;
+ } else if (pbNode.isStructType()) {
+ // struct
+ current = pbNode.getFieldDesc().getMessageType();
+ nodes.add(pbNode);
+ continue;
+ } else {
+ return null;
+ }
}
return nodes;
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
index e46fb47d8d..ca63769c84 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
@@ -21,12 +21,16 @@ import org.apache.inlong.sdk.transform.process.Context;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.EnumValueDescriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import com.google.protobuf.DynamicMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +39,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -45,6 +50,8 @@ public class PbSourceData extends AbstractSourceData {
private static final Logger LOG =
LoggerFactory.getLogger(PbSourceData.class);
+ public static final String ROOT = "$root";
+
public static final String ROOT_KEY = "$root.";
public static final String CHILD_KEY = "$child.";
@@ -61,6 +68,9 @@ public class PbSourceData extends AbstractSourceData {
protected Charset srcCharset;
+ private Map<DynamicMessage, Map<String, Object>> nodeValueCache = new
HashMap<>();
+ private Map<DynamicMessage, Map<String, Map<Object, Object>>> mapNodeCache
= new HashMap<>();
+
/**
* Constructor
*/
@@ -110,18 +120,216 @@ public class PbSourceData extends AbstractSourceData {
* @return
*/
@Override
+ @SuppressWarnings({"rawtypes", "unchecked"})
public Object getField(int rowNum, String fieldName) {
- Object fieldValue = "";
try {
+ // check(root);
if (isContextField(fieldName)) {
return getContextField(fieldName);
}
+ Object fieldValue = findFieldNode(rowNum, fieldName);
+ List<PbNode> childNodes = this.columnNodeMap.get(fieldName);
+ if (childNodes == null || childNodes.size() == 0) {
+ return null;
+ }
+ PbNode lastNode = childNodes.get(childNodes.size() - 1);
+ // primitive
+ if (lastNode.isPrimitiveType()) {
+ if (fieldValue instanceof ByteString) {
+ ByteString byteString = (ByteString) fieldValue;
+ return byteString.toByteArray();
+ } else {
+ return fieldValue;
+ }
+ }
+ // struct
+ if (lastNode.isStructType()) {
+ if (!(fieldValue instanceof DynamicMessage)) {
+ return null;
+ }
+ return
buildStructData(lastNode.getFieldDesc().getMessageType(), (DynamicMessage)
fieldValue);
+ }
+ // array
+ if (lastNode.isArrayType()) {
+ if (!lastNode.isHasArrayIndex()) {
+ if (!(fieldValue instanceof List)) {
+ return null;
+ }
+ List<Object> valueList = (List) fieldValue;
+ List<Object> result = new ArrayList<>(valueList.size());
+ for (Object value : valueList) {
+
result.add(this.buildFieldValue(lastNode.getFieldDesc(), value));
+ }
+ return new GenericArrayData(result.toArray());
+ }
+ return this.buildFieldValue(lastNode.getFieldDesc(),
fieldValue);
+ }
+ // map
+ if (lastNode.isMapType()) {
+ if (!lastNode.isHasMapKey()) {
+ return
buildMapData(lastNode.getFieldDesc().getMessageType(), fieldValue);
+ }
+ return this.buildFieldValue(lastNode.getMapValueDesc(),
fieldValue);
+ }
+ return null;
+ } catch (Exception e) {
+ LOG.error("fail to getField,error:{},rowNum:{},fieldName:{}",
e.getMessage(), rowNum, fieldName, e);
+ return null;
+ }
+ }
+
+ public Object buildFieldValue(FieldDescriptor fieldDesc, Object nodeValue)
{
+ if (fieldDesc == null || nodeValue == null) {
+ return null;
+ }
+ switch (fieldDesc.getJavaType()) {
+ case STRING:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ return nodeValue;
+ case ENUM:
+ if (nodeValue instanceof EnumValueDescriptor) {
+ EnumValueDescriptor enumDesc = (EnumValueDescriptor)
nodeValue;
+ return enumDesc.getIndex();
+ }
+ return null;
+ case BYTE_STRING:
+ if (nodeValue instanceof ByteString) {
+ return ((ByteString) nodeValue).toByteArray();
+ } else {
+ return nodeValue;
+ }
+ case MESSAGE:
+ return this.buildStructData(fieldDesc.getMessageType(),
nodeValue);
+ default:
+ return String.valueOf(nodeValue);
+ }
+ }
+
+ public Object buildStructData(Descriptors.Descriptor messageType, Object
nodeValue) {
+ // map
+ if (PbNode.isMapDescriptor(messageType)) {
+ return this.buildMapData(messageType, nodeValue);
+ }
+ // struct
+ if (!(nodeValue instanceof DynamicMessage)) {
+ return null;
+ }
+ DynamicMessage msgObj = (DynamicMessage) nodeValue;
+ GenericRowData result = new
GenericRowData(messageType.getFields().size());
+ int index = 0;
+ for (FieldDescriptor fieldDesc : messageType.getFields()) {
+ Object fieldValue = msgObj.getField(fieldDesc);
+ if (fieldValue == null) {
+ result.setField(index++, null);
+ continue;
+ }
+ // field
+ if (!fieldDesc.isRepeated()) {
+ Object fieldResult = this.buildFieldValue(fieldDesc,
fieldValue);
+ result.setField(index++, fieldResult);
+ continue;
+ }
+ // array
+ if (!(fieldValue instanceof List)) {
+ result.setField(index++, null);
+ continue;
+ }
+ // map
+ if (fieldDesc.getJavaType().equals(JavaType.MESSAGE)
+ && PbNode.isMapDescriptor(fieldDesc.getMessageType())) {
+ result.setField(index++,
buildMapData(fieldDesc.getMessageType(), fieldValue));
+ } else {
+ List<?> valueList = (List<?>) fieldValue;
+ List<Object> fieldResult = new ArrayList<>(valueList.size());
+ for (Object value : valueList) {
+ fieldResult.add(this.buildFieldValue(fieldDesc, value));
+ }
+ result.setField(index++, new
GenericArrayData(fieldResult.toArray()));
+ }
+ }
+ return result;
+ }
+
+ protected Object buildMapData(Descriptors.Descriptor messageType, Object
nodeValue) {
+ if (!(nodeValue instanceof List)) {
+ return null;
+ }
+ Descriptors.FieldDescriptor keyField =
messageType.findFieldByNumber(1);
+ Descriptors.FieldDescriptor valueField =
messageType.findFieldByNumber(2);
+ List<?> subNodeValueList = (List<?>) nodeValue;
+ Map<Object, Object> result = new HashMap<>();
+ for (Object value : subNodeValueList) {
+ if (!(value instanceof DynamicMessage)) {
+ continue;
+ }
+ DynamicMessage subnodeValue = (DynamicMessage) value;
+ Object keyValue = buildFieldValue(keyField,
subnodeValue.getField(keyField));
+ Object valueValue = buildFieldValue(valueField,
subnodeValue.getField(valueField));
+ result.put(keyValue, valueValue);
+ }
+ return new GenericMapData(result);
+ }
+
+ /**
+ * get rootDesc
+ * @return the rootDesc
+ */
+ public Descriptors.Descriptor getRootDesc() {
+ return rootDesc;
+ }
+
+ /**
+ * get childDesc
+ * @return the childDesc
+ */
+ public Descriptors.Descriptor getChildDesc() {
+ return childDesc;
+ }
+
+ /**
+ * get root
+ * @return the root
+ */
+ public DynamicMessage getRoot() {
+ return root;
+ }
+
+ /**
+ * get childRoot
+ * @return the childRoot
+ */
+ public List<DynamicMessage> getChildRoot() {
+ return childRoot;
+ }
+
+ public Object findFieldNode(int rowNum, String fieldName) {
+ Object fieldValue = "";
+ try {
if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
- fieldValue = this.getRootField(fieldName);
+ fieldValue = this.findRootField(fieldName);
} else if (StringUtils.startsWith(fieldName, CHILD_KEY)) {
if (childRoot != null && rowNum < childRoot.size()) {
- fieldValue = this.getChildField(rowNum, fieldName);
+ fieldValue = this.findChildField(rowNum, fieldName);
+ }
+ } else {
+ List<PbNode> childNodes = this.columnNodeMap.get(fieldName);
+ if (childNodes == null) {
+ childNodes = PbNode.parseNodePath(rootDesc, fieldName);
+ if (childNodes == null) {
+ childNodes = new ArrayList<>();
+ }
+ this.columnNodeMap.put(fieldName, childNodes);
+ }
+ // error config
+ if (childNodes.size() == 0) {
+ return "";
}
+ // parse other node
+ fieldValue = this.findNodeValueByCache(childNodes, root);
}
return fieldValue;
} catch (Exception e) {
@@ -130,12 +338,25 @@ public class PbSourceData extends AbstractSourceData {
return fieldValue;
}
- /**
- * getRootField
- * @param fieldName
- * @return
- */
- private Object getRootField(String srcFieldName) {
+ public List<PbNode> parseStructNodeList(String srcFieldName, Descriptor
currentDesc) {
+ List<PbNode> childNodes = this.columnNodeMap.get(srcFieldName);
+ if (childNodes == null) {
+ String fieldName = srcFieldName;
+ if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
+ fieldName = srcFieldName.substring(ROOT_KEY.length());
+ } else if (StringUtils.startsWith(fieldName, CHILD_KEY)) {
+ fieldName = srcFieldName.substring(CHILD_KEY.length());
+ }
+ childNodes = PbNode.parseNodePath(currentDesc, fieldName);
+ if (childNodes == null) {
+ childNodes = new ArrayList<>();
+ }
+ this.columnNodeMap.put(srcFieldName, childNodes);
+ }
+ return childNodes;
+ }
+
+ private Object findRootField(String srcFieldName) {
List<PbNode> childNodes = this.columnNodeMap.get(srcFieldName);
if (childNodes == null) {
String fieldName = srcFieldName.substring(ROOT_KEY.length());
@@ -147,20 +368,14 @@ public class PbSourceData extends AbstractSourceData {
}
// error config
if (childNodes.size() == 0) {
- return "";
+ return null;
}
// parse other node
- Object fieldValue = this.getNodeValue(childNodes, root);
+ Object fieldValue = this.findNodeValueByCache(childNodes, root);
return fieldValue;
}
- /**
- * getChildField
- * @param rowNum
- * @param srcFieldName
- * @return
- */
- private Object getChildField(int rowNum, String srcFieldName) {
+ private Object findChildField(int rowNum, String srcFieldName) {
if (this.childRoot == null || this.childDesc == null) {
return "";
}
@@ -179,18 +394,111 @@ public class PbSourceData extends AbstractSourceData {
}
// parse other node
DynamicMessage child = childRoot.get(rowNum);
- Object fieldValue = this.getNodeValue(childNodes, child);
+ Object fieldValue = this.findNodeValueByCache(childNodes, child);
return fieldValue;
}
- /**
- * getNodeValue
- * @param childNodes
- * @param root
- * @return
- */
- @SuppressWarnings({"rawtypes", "unchecked"})
- private Object getNodeValue(List<PbNode> childNodes, DynamicMessage root) {
+ public Object findNodeValueByCache(List<PbNode> childNodes, DynamicMessage
root) {
+ Map<String, Object> subNodeValueCache =
this.nodeValueCache.computeIfAbsent(root,
+ k -> new HashMap<>());
+ Map<String, Map<Object, Object>> subMapNodeCache =
this.mapNodeCache.computeIfAbsent(root,
+ k -> new HashMap<>());
+ for (int i = childNodes.size() - 1; i >= 0; i--) {
+ PbNode node = childNodes.get(i);
+ // index path
+ Object subNodeValue =
subNodeValueCache.get(node.getCurrentIndexPath());
+ if (subNodeValue != null) {
+ if (i == childNodes.size() - 1) {
+ return subNodeValue;
+ } else {
+ if (subNodeValue instanceof DynamicMessage) {
+ List<PbNode> subChildNodes = childNodes.subList(i + 1,
childNodes.size());
+ return this.findNodeValue(subChildNodes,
(DynamicMessage) subNodeValue);
+ } else {
+ return null;
+ }
+ }
+ }
+ // path
+ subNodeValue = subNodeValueCache.get(node.getCurrentPath());
+ if (subNodeValue != null) {
+ if (i == childNodes.size() - 1) {
+ return subNodeValue;
+ } else {
+ // primitive
+ if (node.isPrimitiveType()) {
+ return null;
+ }
+ // struct
+ if (node.isStructType()) {
+ List<PbNode> subChildNodes = childNodes.subList(i + 1,
childNodes.size());
+ return this.findNodeValue(subChildNodes,
(DynamicMessage) subNodeValue);
+ }
+ // array
+ if (node.isArrayType()) {
+ if (!node.isHasArrayIndex()) {
+ return null;
+ }
+ if (!(subNodeValue instanceof List)) {
+ return null;
+ }
+ List<?> nodeValueList = (List<?>) subNodeValue;
+ if (node.getArrayIndex() >= nodeValueList.size()) {
+ return null;
+ }
+ Object newNode =
nodeValueList.get(node.getArrayIndex());
+ if (!(newNode instanceof DynamicMessage)) {
+ return null;
+ }
+ List<PbNode> subChildNodes = childNodes.subList(i + 1,
childNodes.size());
+ return this.findNodeValue(subChildNodes,
(DynamicMessage) newNode);
+ }
+ // map
+ if (node.isMapType()) {
+ if (!node.isHasMapKey()) {
+ return null;
+ }
+ final Object mapNodeValue = subNodeValue;
+ Map<Object, Object> mapCache =
subMapNodeCache.computeIfAbsent(node.getCurrentPath(),
+ k -> parseMapNode(mapNodeValue, node));
+ Object fieldValue = mapCache.get(node.getMapKey());
+ if (fieldValue == null || !(fieldValue instanceof
DynamicMessage)) {
+ return null;
+ }
+ List<PbNode> subChildNodes = childNodes.subList(i + 1,
childNodes.size());
+ return this.findNodeValue(subChildNodes,
(DynamicMessage) fieldValue);
+ }
+ return null;
+ }
+ }
+ }
+ return this.findNodeValue(childNodes, root);
+ }
+
+ private static Map<Object, Object> parseMapNode(Object nodeValue, PbNode
node) {
+ if (!(nodeValue instanceof List)) {
+ return new HashMap<>();
+ }
+ List<?> nodeValueList = (List<?>) nodeValue;
+ Map<Object, Object> mapCache = new HashMap<>();
+ for (Object value : nodeValueList) {
+ if (!(value instanceof DynamicMessage)) {
+ continue;
+ }
+ DynamicMessage msg = (DynamicMessage) value;
+ Object keyValue = msg.getField(node.getMapKeyDesc());
+ Object valueValue = msg.getField(node.getMapValueDesc());
+ mapCache.put(keyValue, valueValue);
+ }
+ return mapCache;
+ }
+
+ // @SuppressWarnings({"rawtypes", "unchecked"})
+ public Object findNodeValue(List<PbNode> childNodes, DynamicMessage root) {
+ Map<String, Object> subNodeValueCache =
this.nodeValueCache.computeIfAbsent(root,
+ k -> new HashMap<>());
+ Map<String, Map<Object, Object>> subMapNodeCache =
this.mapNodeCache.computeIfAbsent(root,
+ k -> new HashMap<>());
DynamicMessage current = root;
for (int i = 0; i < childNodes.size(); i++) {
PbNode node = childNodes.get(i);
@@ -199,120 +507,310 @@ public class PbSourceData extends AbstractSourceData {
// error data
break;
}
- if (!node.isLastNode()) {
- if (node.isArray()) {
- current = (DynamicMessage) ((List)
nodeValue).get(node.getArrayIndex());
- } else if (node.isMap()) {
- List<DynamicMessage> nodeValueList =
(List<DynamicMessage>) nodeValue;
- DynamicMessage newCurrent = null;
- for (DynamicMessage subnodeValue : nodeValueList) {
- String keyValue =
String.valueOf(subnodeValue.getField(node.getMapKeyDesc()));
- if (StringUtils.equals(keyValue, node.getMapKey())) {
- newCurrent = (DynamicMessage)
subnodeValue.getField(node.getMapValueDesc());
- break;
- }
+ if (node.isLastNode()) {
+ // primitive
+ if (node.isPrimitiveType()) {
+ if (nodeValue instanceof ByteString) {
+ ByteString byteString = (ByteString) nodeValue;
+ return byteString.toByteArray();
+ } else if
(node.getFieldDesc().getJavaType().equals(JavaType.STRING)) {
+ return new BinaryStringData(String.valueOf(nodeValue));
+ } else {
+ return nodeValue;
}
- if (newCurrent == null) {
+ }
+ // struct
+ if (node.isStructType()) {
+ subNodeValueCache.put(node.getCurrentPath(), nodeValue);
+ return nodeValue;
+ }
+ // array
+ if (node.isArrayType()) {
+ subNodeValueCache.put(node.getCurrentPath(), nodeValue);
+ if (!node.isHasArrayIndex()) {
+ return nodeValue;
+ }
+ if (!(nodeValue instanceof List)) {
+ return null;
+ }
+ List<?> nodeValueList = (List<?>) nodeValue;
+ if (node.getArrayIndex() >= nodeValueList.size()) {
+ return null;
+ }
+ Object arrayIndexNodeValue =
nodeValueList.get(node.getArrayIndex());
+ subNodeValueCache.put(node.getCurrentIndexPath(),
arrayIndexNodeValue);
+ return arrayIndexNodeValue;
+ }
+ // map
+ if (node.isMapType()) {
+ subNodeValueCache.put(node.getCurrentPath(), nodeValue);
+ if (!node.isHasMapKey()) {
+ return nodeValue;
+ }
+ final Object mapNodeValue = nodeValue;
+ Map<Object, Object> mapCache =
subMapNodeCache.computeIfAbsent(node.getCurrentPath(),
+ k -> parseMapNode(mapNodeValue, node));
+ Object fieldValue = mapCache.get(node.getMapKey());
+ subNodeValueCache.put(node.getCurrentIndexPath(),
fieldValue);
+ return fieldValue;
+ }
+ return null;
+ } else {
+ // primitive
+ if (node.isPrimitiveType()) {
+ return null;
+ }
+ // struct
+ if (node.isStructType()) {
+ subNodeValueCache.put(node.getCurrentPath(), nodeValue);
+ if (!(nodeValue instanceof DynamicMessage)) {
return null;
}
- current = newCurrent;
- } else {
current = (DynamicMessage) nodeValue;
+ continue;
}
- continue;
- }
- // last node
- if (node.isArray()) {
- return buildStructData(node.getMessageType(), ((List)
nodeValue).get(node.getArrayIndex()));
- } else if (node.isMap()) {
- List<DynamicMessage> nodeValueList = (List<DynamicMessage>)
nodeValue;
- Object fieldValue = null;
- for (DynamicMessage subnodeValue : nodeValueList) {
- String keyValue =
String.valueOf(subnodeValue.getField(node.getMapKeyDesc()));
- if (StringUtils.equals(keyValue, node.getMapKey())) {
- fieldValue =
subnodeValue.getField(node.getMapValueDesc());
- break;
+ // array
+ if (node.isArrayType()) {
+ subNodeValueCache.put(node.getCurrentPath(), nodeValue);
+ if (!node.isHasArrayIndex()) {
+ return null;
}
+ if (!(nodeValue instanceof List)) {
+ return null;
+ }
+ List<?> nodeValueList = (List<?>) nodeValue;
+ if (node.getArrayIndex() >= nodeValueList.size()) {
+ return null;
+ }
+ Object newNode = nodeValueList.get(node.getArrayIndex());
+ subNodeValueCache.put(node.getCurrentIndexPath(), newNode);
+ if (!(newNode instanceof DynamicMessage)) {
+ return null;
+ }
+ current = (DynamicMessage) newNode;
+ continue;
}
- return this.buildFieldValue(node.getFieldDesc(), fieldValue,
false);
- } else if (node.isMapType()) {
- return this.buildStructData(node.getMessageType(), nodeValue);
- } else if (node.getFieldDesc().isRepeated()) {
- List<Object> valueList = (List) nodeValue;
- List<Object> result = new ArrayList<>(valueList.size());
- for (Object value : valueList) {
- result.add(this.buildFieldValue(node.getFieldDesc(),
value, false));
+ // map
+ if (node.isMapType()) {
+ subNodeValueCache.put(node.getCurrentPath(), nodeValue);
+ if (!node.isHasMapKey()) {
+ return null;
+ }
+ final Object mapNodeValue = nodeValue;
+ Map<Object, Object> mapCache =
subMapNodeCache.computeIfAbsent(node.getCurrentPath(),
+ k -> parseMapNode(mapNodeValue, node));
+ Object fieldValue = mapCache.get(node.getMapKey());
+ subNodeValueCache.put(node.getCurrentIndexPath(),
fieldValue);
+ if (fieldValue == null || !(fieldValue instanceof
DynamicMessage)) {
+ return null;
+ }
+ current = (DynamicMessage) fieldValue;
+ continue;
}
- return new GenericArrayData(result.toArray());
- } else {
- return this.buildFieldValue(node.getFieldDesc(), nodeValue,
false);
+ return null;
}
}
return null;
}
- @SuppressWarnings("unchecked")
- private Object buildFieldValue(FieldDescriptor fieldDesc, Object
nodeValue, boolean isRepeated) {
- if (nodeValue == null) {
- return null;
+ /**
+ * Clear the leaf field referenced by {@code childNodes} on a copy of
{@code root}.
+ * <p>
+ * Implementation notes (important):
+ * <ul>
+ * <li>Intermediate nodes are descended by reading the value out of the
parent
+ * builder, creating a sub-builder via {@link
DynamicMessage#toBuilder()},
+ * recursing into it, and then writing the rebuilt sub-message back
via
+ * {@code setField} / {@code setRepeatedField}. We never rely on
automatic
+ * reverse propagation from {@code getFieldBuilder}, which is not
consistent
+ * across protobuf-java versions for {@code
DynamicMessage.Builder}.</li>
+ * <li>Repeated and map entries are NEVER mutated through the list
returned by
+ * {@code getField} (it is an unmodifiable view in many protobuf
versions).
+ * Instead the field is cleared and the kept entries are re-added via
+ * {@code addRepeatedField}, which is the portable way to "remove"
an entry
+ * from a {@code DynamicMessage.Builder}.</li>
+ * </ul>
+ *
+ * @param childNodes path to the leaf node to clear
+ * @param root the top-level builder; modifications are applied to it
+ */
+ public void clearNodeValue(List<PbNode> childNodes, DynamicMessage.Builder
root) {
+ if (childNodes == null || childNodes.isEmpty() || root == null) {
+ return;
}
- switch (fieldDesc.getJavaType()) {
- case STRING:
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE:
- case BOOLEAN:
- case ENUM:
- return nodeValue;
- case BYTE_STRING:
- return ((ByteString) nodeValue).toByteArray();
- case MESSAGE: {
- if (!isRepeated) {
- return this.buildStructData(fieldDesc.getMessageType(),
nodeValue);
- } else if (PbNode.isMapDescriptor(fieldDesc.getMessageType()))
{
- return this.buildStructData(fieldDesc.getMessageType(),
nodeValue);
+ clearNodeValueRec(root, childNodes, 0);
+ }
+
+ /**
+ * Recursive helper. Modifies {@code builder} in place; for nested levels,
every
+ * change is written back to {@code builder} via setField/setRepeatedField
as we
+ * unwind, so the topmost caller sees the modification.
+ */
+ private void clearNodeValueRec(DynamicMessage.Builder builder,
List<PbNode> nodes, int from) {
+ PbNode node = nodes.get(from);
+ FieldDescriptor fd = node.getFieldDesc();
+ if (fd == null) {
+ return;
+ }
+
+ boolean isLast = (from == nodes.size() - 1);
+
+ // ============================== LEAF ==============================
+ if (isLast) {
+ // primitive / struct / repeated-without-index / map-without-key:
clearField wipes
+ // the whole field. This is the safe single-call PB API.
+ if (node.isPrimitiveType()
+ || node.isStructType()
+ || (node.isArrayType() && !node.isHasArrayIndex())
+ || (node.isMapType() && !node.isHasMapKey())) {
+ builder.clearField(fd);
+ return;
+ }
+ // repeated with explicit index: rebuild the list without the
target element.
+ if (node.isArrayType() && node.isHasArrayIndex()) {
+ removeRepeatedAt(builder, fd, node.getArrayIndex());
+ return;
+ }
+ // map with explicit key: rebuild the entries dropping the
matching key.
+ if (node.isMapType() && node.isHasMapKey()) {
+ removeMapEntryByKey(builder, fd, node.getMapKeyDesc(),
node.getMapKey());
+ return;
+ }
+ return;
+ }
+
+ // ============================ INTERMEDIATE
============================
+ // primitive intermediate: invalid path, abort
+ if (node.isPrimitiveType()) {
+ return;
+ }
+
+ // struct intermediate: descend into the message field, mutate, then
setField back
+ if (node.isStructType()) {
+ if (!builder.hasField(fd)) {
+ return;
+ }
+ Object child = builder.getField(fd);
+ if (!(child instanceof DynamicMessage)) {
+ return;
+ }
+ DynamicMessage.Builder childBuilder = ((DynamicMessage)
child).toBuilder();
+ clearNodeValueRec(childBuilder, nodes, from + 1);
+ builder.setField(fd, childBuilder.build());
+ return;
+ }
+
+ // array intermediate (only meaningful with an explicit index):
descend into that element
+ if (node.isArrayType()) {
+ if (!node.isHasArrayIndex() || fd.getJavaType() !=
JavaType.MESSAGE) {
+ return;
+ }
+ int idx = node.getArrayIndex();
+ int count = builder.getRepeatedFieldCount(fd);
+ if (idx < 0 || idx >= count) {
+ return;
+ }
+ Object element = builder.getRepeatedField(fd, idx);
+ if (!(element instanceof DynamicMessage)) {
+ return;
+ }
+ DynamicMessage.Builder eb = ((DynamicMessage) element).toBuilder();
+ clearNodeValueRec(eb, nodes, from + 1);
+ builder.setRepeatedField(fd, idx, eb.build());
+ return;
+ }
+
+ // map intermediate: only meaningful with an explicit key.
+ // Descend into the value of the matching entry, mutate it, and
replace the entry.
+ if (node.isMapType()) {
+ if (!node.isHasMapKey()) {
+ return;
+ }
+ FieldDescriptor mapKeyDesc = node.getMapKeyDesc();
+ FieldDescriptor mapValueDesc = node.getMapValueDesc();
+ if (mapKeyDesc == null || mapValueDesc == null
+ || mapValueDesc.getJavaType() != JavaType.MESSAGE) {
+ return;
+ }
+ int count = builder.getRepeatedFieldCount(fd);
+ for (int i = 0; i < count; i++) {
+ Object entry = builder.getRepeatedField(fd, i);
+ if (!(entry instanceof DynamicMessage)) {
+ continue;
+ }
+ DynamicMessage entryMsg = (DynamicMessage) entry;
+ Object keyVal = entryMsg.getField(mapKeyDesc);
+ if (keyVal == null || !Objects.equals(node.getMapKey(),
keyVal)) {
+ continue;
}
- List<DynamicMessage> valueList = (List<DynamicMessage>)
nodeValue;
- List<Object> result = new ArrayList<>(valueList.size());
- for (DynamicMessage value : valueList) {
-
result.add(this.buildStructData(fieldDesc.getMessageType(), value));
+ Object valObj = entryMsg.getField(mapValueDesc);
+ if (!(valObj instanceof DynamicMessage)) {
+ return;
}
- return new GenericArrayData(result.toArray());
+ DynamicMessage.Builder valBuilder = ((DynamicMessage)
valObj).toBuilder();
+ clearNodeValueRec(valBuilder, nodes, from + 1);
+ DynamicMessage.Builder entryBuilder = entryMsg.toBuilder();
+ entryBuilder.setField(mapValueDesc, valBuilder.build());
+ builder.setRepeatedField(fd, i, entryBuilder.build());
+ return;
}
- default:
- return String.valueOf(nodeValue);
}
}
- @SuppressWarnings("unchecked")
- protected Object buildStructData(Descriptors.Descriptor messageType,
Object nodeValue) {
- // map
- if (PbNode.isMapDescriptor(messageType)) {
- Descriptors.FieldDescriptor keyField =
messageType.findFieldByNumber(1);
- Descriptors.FieldDescriptor valueField =
messageType.findFieldByNumber(2);
- List<DynamicMessage> subNodeValueList = (List<DynamicMessage>)
nodeValue;
- Map<Object, Object> result = new HashMap<>();
- for (DynamicMessage subnodeValue : subNodeValueList) {
- Object keyValue = buildFieldValue(keyField,
subnodeValue.getField(keyField), false);
- Object valueValue = buildFieldValue(valueField,
subnodeValue.getField(valueField), false);
- result.put(keyValue, valueValue);
- }
- return new GenericMapData(result);
+ /**
+ * Remove the {@code targetIndex}-th element from the given repeated field
on
+ * {@code builder}. {@code DynamicMessage.Builder} does not expose
+ * {@code removeRepeatedField} portably across protobuf-java versions, so
we
+ * rebuild the field via clearField + addRepeatedField.
+ */
+ private static void removeRepeatedAt(DynamicMessage.Builder builder,
+ FieldDescriptor fd, int targetIndex) {
+ int count = builder.getRepeatedFieldCount(fd);
+ if (targetIndex < 0 || targetIndex >= count) {
+ return;
}
- // struct
- DynamicMessage msgObj = (DynamicMessage) nodeValue;
- GenericRowData result = new
GenericRowData(messageType.getFields().size());
- int index = 0;
- for (FieldDescriptor fieldDesc : messageType.getFields()) {
- Object fieldValue = msgObj.getField(fieldDesc);
- if (fieldValue == null) {
- result.setField(index++, null);
+ List<Object> kept = new ArrayList<>(count - 1);
+ for (int i = 0; i < count; i++) {
+ if (i == targetIndex) {
continue;
}
- Object fieldResult = this.buildFieldValue(fieldDesc, fieldValue,
false);
- result.setField(index++, fieldResult);
+ kept.add(builder.getRepeatedField(fd, i));
+ }
+ builder.clearField(fd);
+ for (Object v : kept) {
+ builder.addRepeatedField(fd, v);
+ }
+ }
+
+ /**
+ * Remove the map entry whose key equals {@code targetKey} from the given
map field
+ * on {@code builder}. Uses the portable clearField + addRepeatedField
approach.
+ */
+ private static void removeMapEntryByKey(DynamicMessage.Builder builder,
+ FieldDescriptor fd, FieldDescriptor mapKeyDesc, Object targetKey) {
+ if (mapKeyDesc == null || targetKey == null) {
+ return;
+ }
+ int count = builder.getRepeatedFieldCount(fd);
+ List<Object> kept = new ArrayList<>(count);
+ boolean removed = false;
+ for (int i = 0; i < count; i++) {
+ Object entry = builder.getRepeatedField(fd, i);
+ if (entry instanceof DynamicMessage) {
+ Object keyVal = ((DynamicMessage) entry).getField(mapKeyDesc);
+ if (Objects.equals(targetKey, keyVal)) {
+ removed = true;
+ continue;
+ }
+ }
+ kept.add(entry);
+ }
+ if (!removed) {
+ return;
+ }
+ builder.clearField(fd);
+ for (Object e : kept) {
+ builder.addRepeatedField(fd, e);
}
- return result;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index cc4a6a8e99..46ce0215f1 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -86,7 +86,7 @@ public class PbSourceDecoder extends SourceDecoder<String> {
this.rowsNodePath = sourceInfo.getRowsNodePath();
this.childNodes = PbNode.parseNodePath(rootDesc, rowsNodePath);
if (this.childNodes != null && this.childNodes.size() > 0) {
- this.childDesc = this.childNodes.get(this.childNodes.size() -
1).getMessageType();
+ this.childDesc = this.childNodes.get(this.childNodes.size() -
1).getFieldDesc().getMessageType();
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -127,7 +127,7 @@ public class PbSourceDecoder extends SourceDecoder<String> {
break;
}
}
- if (!node.isArray()) {
+ if (!node.isArrayType()) {
if (!(nodeValue instanceof DynamicMessage)) {
// error data
return new PbSourceData(root, rootDesc,
columnNodeMap, srcCharset);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FunctionConstant.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FunctionConstant.java
index 1abe442f84..bca3ec36dd 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FunctionConstant.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FunctionConstant.java
@@ -35,4 +35,6 @@ public class FunctionConstant {
public static final String TEMPORAL_TYPE = "temporal";
+ public static final String PB_TYPE = "pb";
+
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ConcatStructFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ConcatStructFunction.java
new file mode 100644
index 0000000000..6d23644ab1
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ConcatStructFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.pb;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.function.FunctionConstant;
+import org.apache.inlong.sdk.transform.process.function.TransformFunction;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import org.apache.flink.table.data.GenericRowData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ConcatStructFunction -> concat_struct(field1, field2, field3...)
+ * description:
+ * - Always returns a GenericRowData whose arity equals the number of input
parameters.
+ * - If any parameter evaluates to NULL, the corresponding position in the
returned
+ * GenericRowData is set to NULL while the other positions are populated
normally.
+ * - Each field value is taken from the protobuf source data based on its path.
+ */
+@TransformFunction(type = FunctionConstant.PB_TYPE, names = {
+ "concat_struct"}, parameter = "(field1,field2,field3...)",
descriptions = {
+ "- Always returns a GenericRowData whose arity equals the
number of input parameters;",
+ "- If any parameter is NULL, the corresponding position in the
returned "
+ + "GenericRowData is set to NULL while the other
positions are populated normally;",
+ "- Each field value is taken from the protobuf source data
based on its 'path'."
+ }, examples = {
+ "concat_struct($root.name,$root.age) = +I(\"Alice\",11)"
+ })
+public class ConcatStructFunction implements ValueParser {
+
+ private final List<ValueParser> fieldParsers;
+
+ public ConcatStructFunction(Function expr) {
+ List<Expression> expressions = expr.getParameters().getExpressions();
+ this.fieldParsers = new ArrayList<>();
+ for (int i = 0; i < expressions.size(); i++) {
+
this.fieldParsers.add(OperatorTools.buildParser(expressions.get(i)));
+ }
+ }
+
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ GenericRowData result = new GenericRowData(fieldParsers.size());
+ int index = 0;
+ for (ValueParser parser : fieldParsers) {
+ result.setField(index++, parser.parse(sourceData, rowIndex,
context));
+ }
+ return result;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractBinaryFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractBinaryFunction.java
new file mode 100644
index 0000000000..102031f82f
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractBinaryFunction.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.pb;
+
+import org.apache.inlong.sdk.transform.decode.PbNode;
+import org.apache.inlong.sdk.transform.decode.PbSourceData;
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.function.FunctionConstant;
+import org.apache.inlong.sdk.transform.process.function.TransformFunction;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.MessageLite;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.table.data.GenericArrayData;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ExtractBinaryFunction -> extract_binary(path)
+ * description:
+ * - Only works on protobuf source data; returns NULL if the source is not a
PbSourceData.
+ * - Returns NULL if 'path' is missing/invalid, or the path cannot be resolved
to a value
+ * in the protobuf message.
+ * - For primitive / struct / map nodes and array nodes with an explicit array
index,
+ * returns the matched value serialized as a {@code byte[]}.
+ * - For array nodes without an array index, returns a {@link
GenericArrayData} whose
+ * elements are the {@code byte[]} representation of each list value.
+ */
+@TransformFunction(type = FunctionConstant.PB_TYPE, names = {
+ "extract_binary"}, parameter = "(path)", descriptions = {
+ "- Only works on protobuf source data; returns NULL if the
source is not a PbSourceData;",
+ "- Returns NULL if 'path' is missing/invalid, or the path
cannot be resolved "
+ + "to a value in the protobuf message;",
+ "- For primitive / struct / map nodes and array nodes with an
explicit array index, "
+ + "returns the matched value serialized as a byte[];",
+ "- For array nodes without an array index, returns a
GenericArrayData whose elements "
+ + "are the byte[] representation of each list value."
+ }, examples = {
+ "extract_binary($root.feature) = [62,111]"
+ })
+public class ExtractBinaryFunction implements ValueParser {
+
+ private final ValueParser pathParser;
+ private Descriptor parentDesc;
+ private DynamicMessage parentRoot;
+
+ public ExtractBinaryFunction(Function expr) {
+ List<Expression> expressions = expr.getParameters().getExpressions();
+ this.pathParser = OperatorTools.buildParser(expressions.get(0));
+ }
+
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ // data
+ if (!(sourceData instanceof PbSourceData)) {
+ return null;
+ }
+ if (pathParser instanceof ColumnParser) {
+ return this.parseByColumnParser(sourceData, rowIndex, context);
+ }
+ if (pathParser instanceof ExtractStructExcludingFunction) {
+ ExtractStructExcludingFunction excluding =
(ExtractStructExcludingFunction) pathParser;
+ excluding.setKeepMessage(true);
+ Object result = excluding.parse(sourceData, rowIndex, context);
+ return toByteArray(result);
+ }
+ Object result = this.pathParser.parse(sourceData, rowIndex, context);
+ return toByteArray(result);
+ }
+
+ public Object parseByColumnParser(SourceData sourceData, int rowIndex,
Context context) {
+ String path = ((ColumnParser) pathParser).getFieldName();
+ if (path == null) {
+ return null;
+ }
+ PbSourceData pbData = (PbSourceData) sourceData;
+ if (StringUtils.equals(PbSourceData.ROOT, path)) {
+ return pbData.getRoot().toByteArray();
+ }
+ // node list
+ List<PbNode> childNodes = null;
+ boolean isParentData = false;
+ if (StringUtils.startsWith(path, PbSourceData.ROOT_KEY)) {
+ childNodes = pbData.parseStructNodeList(path,
pbData.getRootDesc());
+ } else if (StringUtils.startsWith(path, PbSourceData.CHILD_KEY)) {
+ if (pbData.getChildDesc() == null) {
+ return null;
+ }
+ childNodes = pbData.parseStructNodeList(path,
pbData.getChildDesc());
+ } else if (parentDesc != null) {
+ childNodes = pbData.parseStructNodeList(path, parentDesc);
+ isParentData = true;
+ }
+ if (childNodes == null || childNodes.size() <= 0) {
+ return null;
+ }
+ // value
+ Object currentNode = null;
+ if (isParentData) {
+ currentNode = pbData.findNodeValueByCache(childNodes, parentRoot);
+ } else {
+ currentNode = pbData.findFieldNode(rowIndex, path);
+ }
+ if (currentNode == null) {
+ return null;
+ }
+ // array
+ PbNode lastNode = childNodes.get(childNodes.size() - 1);
+ // primitive
+ if (lastNode.isPrimitiveType()) {
+ return toByteArray(currentNode);
+ }
+ // struct
+ if (lastNode.isStructType()) {
+ return toByteArray(currentNode);
+ }
+ // array
+ if (lastNode.isArrayType()) {
+ if (!lastNode.isHasArrayIndex()) {
+ if (!(currentNode instanceof List)) {
+ return null;
+ }
+ List<?> valueList = (List<?>) currentNode;
+ List<Object> fieldResult = new ArrayList<>(valueList.size());
+ for (Object value : valueList) {
+ fieldResult.add(toByteArray(value));
+ }
+ return new GenericArrayData(fieldResult.toArray());
+ }
+ return toByteArray(currentNode);
+ }
+ // map
+ if (lastNode.isMapType()) {
+ return toByteArray(currentNode);
+ }
+ return null;
+ }
+
+ private Object toByteArray(Object currentNode) {
+ if (currentNode == null) {
+ return null;
+ }
+ if (currentNode instanceof MessageLite) {
+ return ((MessageLite) currentNode).toByteArray();
+ }
+ if (currentNode instanceof ByteString) {
+ return ((ByteString) currentNode).toByteArray();
+ }
+ if (currentNode instanceof List) {
+ List<?> valueList = (List<?>) currentNode;
+ List<Object> fieldResult = new ArrayList<>(valueList.size());
+ for (Object value : valueList) {
+ fieldResult.add(toByteArray(value));
+ }
+ return new GenericArrayData(fieldResult.toArray());
+ }
+ return
String.valueOf(currentNode).getBytes(StandardCharsets.ISO_8859_1);
+ }
+
+ /**
+ * get parentDesc
+ * @return the parentDesc
+ */
+ public Descriptor getParentDesc() {
+ return parentDesc;
+ }
+
+ /**
+ * set parentDesc
+ * @param parentDesc the parentDesc to set
+ */
+ public void setParentDesc(Descriptor parentDesc) {
+ this.parentDesc = parentDesc;
+ }
+
+ /**
+ * get parentRoot
+ * @return the parentRoot
+ */
+ public DynamicMessage getParentRoot() {
+ return parentRoot;
+ }
+
+ /**
+ * set parentRoot
+ * @param parentRoot the parentRoot to set
+ */
+ public void setParentRoot(DynamicMessage parentRoot) {
+ this.parentRoot = parentRoot;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructExcludingFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructExcludingFunction.java
new file mode 100644
index 0000000000..589f943388
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructExcludingFunction.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.pb;
+
+import org.apache.inlong.sdk.transform.decode.PbNode;
+import org.apache.inlong.sdk.transform.decode.PbSourceData;
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.function.FunctionConstant;
+import org.apache.inlong.sdk.transform.process.function.TransformFunction;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+import com.google.protobuf.DynamicMessage;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import org.apache.flink.table.data.GenericArrayData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ExtractStructExcludingFunction -> extract_struct_excluding(path,
excludeField1, excludeField2, ...)
+ * description:
+ * - Only works on protobuf source data; returns NULL if the source is not a
PbSourceData.
+ * - Returns NULL if 'path' is missing/invalid, or the path cannot be resolved
to a
+ * message-typed node in the protobuf source data.
+ * - Each {@code excludeFieldN} is the name (relative to 'path') of a
sub-field that
+ * should be REMOVED from a copy of the located message before returning.
Parameters
+ * that cannot be resolved on the located message, or are not plain column
references,
+ * are silently ignored. The original record is never mutated.
+ * - When 'path' resolves to:
+ * - a single message: returns a GenericRowData built from a trimmed copy
of that
+ * message (the excluded fields are cleared);
+ * - a repeated (array) field of messages: returns a GenericArrayData
whose elements
+ * are the trimmed GenericRowData for every array element.
+ */
+@TransformFunction(type = FunctionConstant.PB_TYPE, names = {
+ "extract_struct_excluding"}, parameter = "(path, excludeField1,
excludeField2, ...)", descriptions = {
+ "- Only works on protobuf source data; returns NULL if the
source is not a PbSourceData;",
+ "- Returns NULL if 'path' is missing/invalid, or the path
cannot be resolved "
+ + "to a message-typed node;",
+ "- Each excludeFieldN is the name (relative to 'path') of a
sub-field to REMOVE "
+ + "from a copy of the located message; unknown /
non-column-reference "
+ + "parameters are silently ignored;",
+ "- When 'path' resolves to a single message, returns a trimmed
GenericRowData. "
+ + "When 'path' resolves to a repeated message field,
returns a "
+ + "GenericArrayData whose elements are the trimmed
GenericRowData for "
+ + "every array element. The original record is never
mutated."
+ }, examples = {
+ "extract_struct_excluding($root.person,address,phone) "
+ + "= <GenericRowData of person without address and
phone>"
+ })
+public class ExtractStructExcludingFunction implements ValueParser {
+
+ private final ValueParser pathParser;
+ private final List<ValueParser> fieldParsers;
+ private String path;
+ private boolean isKeepMessage = false;
+
+ public ExtractStructExcludingFunction(Function expr) {
+ List<Expression> expressions = expr.getParameters().getExpressions();
+ this.pathParser = OperatorTools.buildParser(expressions.get(0));
+ if (pathParser instanceof ColumnParser) {
+ this.path = ((ColumnParser) pathParser).getFieldName();
+ }
+ this.fieldParsers = new ArrayList<>();
+ for (int i = 1; i < expressions.size(); i++) {
+
this.fieldParsers.add(OperatorTools.buildParser(expressions.get(i)));
+ }
+ }
+
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ if (!(sourceData instanceof PbSourceData)) {
+ return null;
+ }
+ if (path == null) {
+ return null;
+ }
+ PbSourceData pbData = (PbSourceData) sourceData;
+ if (PbSourceData.ROOT.equals(path)) {
+ return buildStruct(pbData, rowIndex, context, pbData.getRoot());
+ }
+ // parse path
+ List<PbNode> pathChildNodes = pbData.parseStructNodeList(path,
pbData.getRootDesc());
+ if (pathChildNodes == null || pathChildNodes.size() == 0) {
+ return null;
+ }
+ // check message type
+ PbNode lastNode = pathChildNodes.get(pathChildNodes.size() - 1);
+ if (!lastNode.getFieldDesc().getJavaType().equals(JavaType.MESSAGE)) {
+ return null;
+ }
+ // get data
+ Object currentNode = pbData.findFieldNode(rowIndex, path);
+ if (currentNode == null) {
+ return null;
+ }
+ // array node
+ if (lastNode.isArrayType() && !lastNode.isHasArrayIndex()) {
+ if (!(currentNode instanceof List)) {
+ return null;
+ }
+ List<?> currentNodeList = (List<?>) currentNode;
+ List<Object> valueResult = new ArrayList<>(currentNodeList.size());
+ for (Object nodeValue : currentNodeList) {
+ if (!(nodeValue instanceof DynamicMessage)) {
+ continue;
+ }
+ DynamicMessage currentValue = (DynamicMessage) nodeValue;
+ Object item = buildStruct(pbData, rowIndex, context,
currentValue);
+ valueResult.add(item);
+ }
+ GenericArrayData result = new
GenericArrayData(valueResult.toArray());
+ return result;
+ } else {
+ // struct node
+ if (!(currentNode instanceof DynamicMessage)) {
+ return null;
+ }
+ DynamicMessage currentValue = (DynamicMessage) currentNode;
+ return buildStruct(pbData, rowIndex, context, currentValue);
+ }
+ }
+
+ private Object buildStruct(PbSourceData pbData, int rowIndex, Context
context,
+ DynamicMessage rawValue) {
+ DynamicMessage.Builder currentValue = rawValue.toBuilder();
+ Descriptor currentDesc = currentValue.getDescriptorForType();
+ for (ValueParser parser : fieldParsers) {
+ if (parser instanceof ColumnParser) {
+ ColumnParser columnParser = (ColumnParser) parser;
+ String fieldName = columnParser.getFieldName();
+ List<PbNode> childNodes =
pbData.parseStructNodeList(fieldName, currentDesc);
+ if (childNodes == null || childNodes.size() == 0) {
+ continue;
+ }
+ pbData.clearNodeValue(childNodes, currentValue);
+ }
+ }
+ if (isKeepMessage()) {
+ return currentValue.build();
+ }
+ Object result = pbData.buildStructData(currentDesc,
currentValue.build());
+ return result;
+ }
+
+ /**
+ * get isKeepMessage
+ * @return the isKeepMessage
+ */
+ public boolean isKeepMessage() {
+ return isKeepMessage;
+ }
+
+ /**
+ * set isKeepMessage
+ * @param isKeepMessage the isKeepMessage to set
+ */
+ public void setKeepMessage(boolean isKeepMessage) {
+ this.isKeepMessage = isKeepMessage;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructFunction.java
new file mode 100644
index 0000000000..d2c0c2c70f
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructFunction.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.pb;
+
+import org.apache.inlong.sdk.transform.decode.PbNode;
+import org.apache.inlong.sdk.transform.decode.PbSourceData;
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.function.FunctionConstant;
+import org.apache.inlong.sdk.transform.process.function.TransformFunction;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+import com.google.protobuf.DynamicMessage;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ExtractStructFunction -> extract_struct(path, field1, field2, field3...)
+ * description:
+ * - Only works on protobuf source data; returns NULL if the source is not a
PbSourceData.
+ * - Returns NULL if 'path' is missing/invalid, or the path cannot be resolved
to a
+ * DynamicMessage in the protobuf source data.
+ * - Otherwise, returns a GenericRowData whose arity equals the number of
declared
+ * fields (field1, field2, ...). For each declared field:
+ * - if the field can be resolved on the located message, the corresponding
+ * position is filled with the resolved value;
+ * - otherwise (field not found, or the parameter is not a column
reference),
+ * the corresponding position is set to NULL.
+ */
+@TransformFunction(type = FunctionConstant.PB_TYPE, names = {
+ "extract_struct"}, parameter = "(path, field1,field2,field3...)",
descriptions = {
+ "- Only works on protobuf source data; returns NULL if the
source is not a PbSourceData;",
+ "- Returns NULL if 'path' is missing/invalid, or the path
cannot be resolved "
+ + "to a DynamicMessage in the protobuf source data;",
+ "- Otherwise, returns a GenericRowData whose arity equals the
number of declared fields. "
+ + "Each position is filled with the resolved field
value, or NULL if the field "
+ + "cannot be resolved on the located message."
+ }, examples = {
+ "extract_struct($root.person,name,age) = +I(\"Alice\",11)"
+ })
+public class ExtractStructFunction implements ValueParser {
+
+ private final ValueParser pathParser;
+ private final List<ValueParser> fieldParsers;
+ private String path;
+
+ public ExtractStructFunction(Function expr) {
+ List<Expression> expressions = expr.getParameters().getExpressions();
+ this.pathParser = OperatorTools.buildParser(expressions.get(0));
+ if (pathParser instanceof ColumnParser) {
+ this.path = ((ColumnParser) pathParser).getFieldName();
+ }
+ this.fieldParsers = new ArrayList<>();
+ for (int i = 1; i < expressions.size(); i++) {
+
this.fieldParsers.add(OperatorTools.buildParser(expressions.get(i)));
+ }
+ }
+
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ if (!(sourceData instanceof PbSourceData)) {
+ return null;
+ }
+ if (path == null) {
+ return null;
+ }
+ PbSourceData pbData = (PbSourceData) sourceData;
+ // parse path
+ List<PbNode> pathChildNodes = pbData.parseStructNodeList(path,
pbData.getRootDesc());
+ if (pathChildNodes == null || pathChildNodes.size() == 0) {
+ return null;
+ }
+ // check message type
+ PbNode lastNode = pathChildNodes.get(pathChildNodes.size() - 1);
+ if (!lastNode.getFieldDesc().getJavaType().equals(JavaType.MESSAGE)) {
+ return null;
+ }
+ // get data
+ Object currentNode = pbData.findFieldNode(rowIndex, path);
+ if (currentNode == null) {
+ return null;
+ }
+ // array node
+ if (lastNode.isArrayType() && !lastNode.isHasArrayIndex()) {
+ if (!(currentNode instanceof List)) {
+ return null;
+ }
+ List<?> currentNodeList = (List<?>) currentNode;
+ List<GenericRowData> valueResult = new
ArrayList<>(currentNodeList.size());
+ for (Object nodeValue : currentNodeList) {
+ if (!(nodeValue instanceof DynamicMessage)) {
+ continue;
+ }
+ DynamicMessage currentValue = (DynamicMessage) nodeValue;
+ GenericRowData item = buildStruct(pbData, rowIndex, context,
currentValue);
+ valueResult.add(item);
+ }
+ GenericArrayData result = new
GenericArrayData(valueResult.toArray());
+ return result;
+ } else {
+ // struct node
+ if (!(currentNode instanceof DynamicMessage)) {
+ return null;
+ }
+ DynamicMessage currentValue = (DynamicMessage) currentNode;
+ return buildStruct(pbData, rowIndex, context, currentValue);
+ }
+ }
+
+ private GenericRowData buildStruct(PbSourceData pbData, int rowIndex,
Context context,
+ DynamicMessage currentValue) {
+ Descriptor currentDesc = currentValue.getDescriptorForType();
+ GenericRowData result = new GenericRowData(fieldParsers.size());
+ int index = 0;
+ for (ValueParser parser : fieldParsers) {
+ if (parser instanceof ColumnParser) {
+ ColumnParser columnParser = (ColumnParser) parser;
+ String fieldName = columnParser.getFieldName();
+ List<PbNode> childNodes =
pbData.parseStructNodeList(fieldName, currentDesc);
+ if (childNodes == null || childNodes.size() == 0) {
+ result.setField(index++, null);
+ continue;
+ }
+ Object fieldValue = pbData.findNodeValueByCache(childNodes,
currentValue);
+ PbNode lastNode = childNodes.get(childNodes.size() - 1);
+ if (lastNode.isArrayType() && !lastNode.isHasArrayIndex()) {
+ if (!(fieldValue instanceof List)) {
+ result.setField(index++, null);
+ continue;
+ }
+ List<?> valueList = (List<?>) fieldValue;
+ List<Object> valueResult = new
ArrayList<>(valueList.size());
+ for (Object value : valueList) {
+ Object transformedValue =
pbData.buildFieldValue(lastNode.getFieldDesc(), value);
+ valueResult.add(transformedValue);
+ }
+ GenericArrayData arrayItem = new
GenericArrayData(valueResult.toArray());
+ result.setField(index++, arrayItem);
+ } else {
+ Object transformedValue =
pbData.buildFieldValue(lastNode.getFieldDesc(), fieldValue);
+ result.setField(index++, transformedValue);
+ }
+ } else if (parser instanceof ExtractBinaryFunction) {
+ ExtractBinaryFunction extractBinaryFunc =
(ExtractBinaryFunction) parser;
+ extractBinaryFunc.setParentRoot(currentValue);
+ extractBinaryFunc.setParentDesc(currentDesc);
+ Object fieldValue = extractBinaryFunc.parse(pbData, rowIndex,
context);
+ result.setField(index++, fieldValue);
+ } else {
+ result.setField(index++, null);
+ }
+ }
+ return result;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
index ebec767375..7102b3c969 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
@@ -51,4 +51,11 @@ public class ColumnParser implements ValueParser {
return sourceData.getField(rowIndex, fieldName);
}
+ /**
+ * get fieldName
+ * @return the fieldName
+ */
+ public String getFieldName() {
+ return fieldName;
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/TestFunctionDoc.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/TestFunctionDoc.java
index 53ca1a4979..4cb272a23d 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/TestFunctionDoc.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/TestFunctionDoc.java
@@ -32,7 +32,7 @@ public class TestFunctionDoc extends
AbstractFunctionStringTestBase {
@Test
public void TestFunctionDoc() {
Map<String, Set<FunctionInfo>> functionDocMap =
FunctionTools.getFunctionDoc();
- Assert.assertEquals(8, functionDocMap.size());
+ Assert.assertEquals(9, functionDocMap.size());
System.out.println(new Gson().toJson(functionDocMap));
}
}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
index a126f8d52d..e6dcb6e44e 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
@@ -32,6 +32,7 @@ import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -115,4 +116,140 @@ public class TestPb2RowDataProcessor extends
AbstractProcessorTestBase {
Assert.assertEquals(((GenericRowData) output.get(1).getRow(5,
3)).getMap(2).size(), 1);
Assert.assertEquals(output.get(1).getArray(6).size(), 2);
}
+
+ @Test
+ public void testPb2RowData4Struct() throws Exception {
+ String transformBase64 = this.getPbTestDescription();
+ PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", "msgs");
+ List<FieldInfo> sinkFields = this.getTestFieldList("sid", "packageID",
"msgTime");
+ // concat_struct
+ FieldInfo concatStructField = new FieldInfo("concatStruct");
+ String[] concatStructFields = new String[]{"attaID", "packageID"};
+ FormatInfo[] concatStructFieldFormats = new FormatInfo[]{
+ new StringFormatInfo(),
+ new LongFormatInfo()
+ };
+ RowFormatInfo concatStructFormat = new
RowFormatInfo(concatStructFields, concatStructFieldFormats);
+ concatStructField.setFormatInfo(concatStructFormat);
+ sinkFields.add(concatStructField);
+ // extract_struct
+ FieldInfo extractStructField = new FieldInfo("extractStruct");
+ String[] extractStructFields = new String[]{"msg", "msgTime"};
+ FormatInfo[] extractStructFieldFormats = new FormatInfo[]{
+ new BinaryFormatInfo(Integer.MAX_VALUE),
+ new LongFormatInfo()
+ };
+ RowFormatInfo extractStructFormat = new
RowFormatInfo(extractStructFields, extractStructFieldFormats);
+ extractStructField.setFormatInfo(extractStructFormat);
+ sinkFields.add(extractStructField);
+ // extract_binary_string
+ FieldInfo extractBinaryStringField = new
FieldInfo("extractBinaryString");
+ extractBinaryStringField.setFormatInfo(new
BinaryFormatInfo(Integer.MAX_VALUE));
+ sinkFields.add(extractBinaryStringField);
+ // extract_binary_array_binary
+ FieldInfo extractBinaryArrayBinaryField = new
FieldInfo("extractBinaryArrayBinary");
+ ArrayFormatInfo extractBinaryArrayBinaryFormat = new
ArrayFormatInfo(new BinaryFormatInfo(Integer.MAX_VALUE));
+
extractBinaryArrayBinaryField.setFormatInfo(extractBinaryArrayBinaryFormat);
+ sinkFields.add(extractBinaryArrayBinaryField);
+ // extract_binary_map
+ FieldInfo extractBinaryMapField = new FieldInfo("extractBinaryMap");
+ extractBinaryMapField.setFormatInfo(new
BinaryFormatInfo(Integer.MAX_VALUE));
+ sinkFields.add(extractBinaryMapField);
+ // sink
+ RowDataSinkInfo rowSink = new RowDataSinkInfo("UTF-8", sinkFields);
+ // sql
+ String transformSql = "select $root.sid,$root.packageID,$child.msgTime"
+ + ",concat_struct($root.sid,$root.packageID) as concatStruct"
+ + ",extract_struct($root.msgs(0),msg,msgTime) as extractStruct"
+ + ",extract_binary($root.sid) as extractBinaryString"
+ + ",extract_binary($root.msgs) as extractBinaryArrayBinary"
+ + ",extract_binary($child.extinfo) as extractBinaryMap "
+ + "from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1
+ TransformProcessor<String, RowData> processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+ SinkEncoderFactory.createRowEncoder(rowSink));
+ byte[] srcBytes = this.getPbTestData();
+ List<RowData> output = processor.transformForBytes(srcBytes, new
HashMap<>());
+ Assert.assertEquals(2, output.size());
+ // 0
+ Assert.assertEquals(output.get(0).getString(0).toString(), "sid");
+ Assert.assertEquals(output.get(0).getString(1).toString(), "1");
+ Assert.assertEquals(output.get(0).getString(2).toString(),
"1713243918000");
+
+ Assert.assertEquals(((GenericRowData) output.get(0).getRow(3,
2)).getString(0).toString(), "sid");
+ Assert.assertEquals(((GenericRowData) output.get(0).getRow(3,
2)).getLong(1), 1);
+
+ Assert.assertEquals(((GenericRowData) output.get(0).getRow(4,
2)).getBinary(0).length, 9);
+ Assert.assertEquals(((GenericRowData) output.get(0).getRow(4,
2)).getLong(1), 1713243918000L);
+
+ Assert.assertEquals(output.get(0).getBinary(5).length, 3);
+
+ Assert.assertEquals(((GenericArrayData)
output.get(0).getArray(6)).size(), 2);
+ Assert.assertEquals(((GenericArrayData)
output.get(0).getArray(6)).getBinary(0).length, 32);
+ Assert.assertEquals(((GenericArrayData)
output.get(0).getArray(6)).getBinary(1).length, 35);
+
+ Assert.assertEquals(output.get(0).getBinary(7).length, 53);
+ }
+
+ @Test
+ public void testPb2RowData4ExtractStructExcluding() throws Exception {
+ String transformBase64 = this.getPbTestDescription();
+ PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", "msgs");
+ List<FieldInfo> sinkFields = this.getTestFieldList("sid", "packageID",
"msgTime");
+ // extract_struct
+ FieldInfo extractStructExcludingField = new
FieldInfo("extractStructExcluding");
+ String[] extractStructExcludingFields = new String[]{"msg", "msgTime",
"extinfo"};
+ FormatInfo[] extractStructExcludingFieldFormats = new FormatInfo[]{
+ new BinaryFormatInfo(Integer.MAX_VALUE),
+ new LongFormatInfo(),
+ new MapFormatInfo(new StringFormatInfo(), new
StringFormatInfo())
+ };
+ RowFormatInfo extractStructExcludingFormat = new
RowFormatInfo(extractStructExcludingFields,
+ extractStructExcludingFieldFormats);
+
extractStructExcludingField.setFormatInfo(extractStructExcludingFormat);
+ sinkFields.add(extractStructExcludingField);
+ // rootBinary
+ FieldInfo rootBinary = new FieldInfo("rootBinary");
+ rootBinary.setFormatInfo(new BinaryFormatInfo(Integer.MAX_VALUE));
+ sinkFields.add(rootBinary);
+ // extractStructExcludingBinary
+ FieldInfo extractStructExcludingBinary = new
FieldInfo("extractStructExcludingBinary");
+ extractStructExcludingBinary.setFormatInfo(new
BinaryFormatInfo(Integer.MAX_VALUE));
+ sinkFields.add(extractStructExcludingBinary);
+ // extractStructBinary
+ FieldInfo extractStructBinary = new FieldInfo("extractStructBinary");
+ extractStructBinary.setFormatInfo(new
BinaryFormatInfo(Integer.MAX_VALUE));
+ sinkFields.add(extractStructBinary);
+ // sink
+ RowDataSinkInfo rowSink = new RowDataSinkInfo("UTF-8", sinkFields);
+ // sql
+ String transformSql = "select $root.sid,$root.packageID,$child.msgTime"
+ + ",extract_struct_excluding($root.msgs(0),msg,extinfo) as
extractStructExcluding "
+ + ",extract_binary($root) as rootBinary "
+ +
",extract_binary(extract_struct_excluding($root.msgs(0),msg,extinfo)) as
extractStructExcludingBinary "
+ + ",extract_binary(extract_struct($root.msgs(0),msg,msgTime))
as extractStructBinary "
+ + " from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1
+ TransformProcessor<String, RowData> processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+ SinkEncoderFactory.createRowEncoder(rowSink));
+ byte[] srcBytes = this.getPbTestData();
+ List<RowData> output = processor.transformForBytes(srcBytes, new
HashMap<>());
+ Assert.assertEquals(2, output.size());
+ // 0
+ Assert.assertEquals(output.get(0).getString(0).toString(), "sid");
+ Assert.assertEquals(output.get(0).getString(1).toString(), "1");
+ Assert.assertEquals(output.get(0).getString(2).toString(),
"1713243918000");
+
+ Assert.assertEquals(((GenericRowData) output.get(0).getRow(3,
3)).getBinary(0).length, 0);
+ Assert.assertEquals(((GenericRowData) output.get(0).getRow(3,
3)).getLong(1), 1713243918000L);
+ Assert.assertEquals(((GenericRowData) output.get(0).getRow(3,
3)).getMap(2).size(), 0);
+
+ Assert.assertEquals(output.get(0).getBinary(4).length, 78);
+ Assert.assertEquals(output.get(0).getBinary(5).length, 7);
+ Assert.assertEquals(output.get(0).getBinary(6).length, 60);
+ }
}