This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 373c3eb846 [INLONG-11961][SDK] Transform supports array index access,
the WHERE clause supports the LIKE operator, and the str_to_json function
converts KV-format data into JSON format (#11962)
373c3eb846 is described below
commit 373c3eb846ffa1f2e3c13329cd79b5f2bf65e5e4
Author: ChunLiang Lu <[email protected]>
AuthorDate: Fri Aug 8 19:18:26 2025 +0800
[INLONG-11961][SDK] Transform supports array index access, the WHERE clause
supports the LIKE operator, and the str_to_json function converts KV-format
data into JSON format (#11962)
---
.../process/function/string/StrToJsonFunction.java | 116 ++++++++++++++++++++
.../transform/process/operator/LikeOperator.java | 117 +++++++++++++++++++++
.../sdk/transform/process/parser/ArrayParser.java | 56 ++++++++++
.../process/processor/TestCsv2KvProcessor.java | 57 ++++++++++
4 files changed, 346 insertions(+)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/StrToJsonFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/StrToJsonFunction.java
new file mode 100644
index 0000000000..26c8d5df1a
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/StrToJsonFunction.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.string;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.function.FunctionConstant;
+import org.apache.inlong.sdk.transform.process.function.TransformFunction;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import com.google.gson.JsonObject;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.List;
+/**
+ * StrToJsonFunction -> str_to_json(str, pairDelimiter, keyValueDelimiter)
+ * description:
+ * - Return NULL if 'str' is NULL
+ * - Return a json string after splitting the 'str' into key/value pairs using
'pairDelimiter'(default is ',')
+ * and 'keyValueDelimiter'(default is '=')
+ * Note: Both 'pairDelimiter' and 'keyValueDelimiter' are treated as regular
expressions.So special characters
+ * (e.g. <([{^-=$!|]})?*+.>) need to be properly escaped before using as
a delimiter literally.
+ */
+@TransformFunction(type = FunctionConstant.STRING_TYPE, names = {
+ "str_to_json"}, parameter = "(String s1, String pairDelimiter, String
keyValueDelimiter)", descriptions = {
+ "- Return \"\" if 'str' is NULL;",
+ "- Return a json string after splitting the 'str' into
key/value pairs using 'pairDelimiter'(default is ',') "
+ +
+ "and 'keyValueDelimiter'(default is '=');",
+ "Note: Both 'pairDelimiter' and 'keyValueDelimiter' are
treated as regular expressions.So special " +
+ "characters(e.g. <([{^-=$!|]})?*+.>) need to be
properly escaped before using as a delimiter literally."
+ }, examples = {
+ "str_to_json('key1=value1,key2=value2,key3=value3') =
{\"key1\":\"value1\",\"key2\":\"value2\",\"key3\"=\"value3\"}",
+ "str_to_json(\"name->John!age->30!city->China\" , \"!\" ,
\"->\") = {\"name\":\"John\",\"age\":\"30\",\"city\":\"China\"}"
+ })
+public class StrToJsonFunction implements ValueParser {
+
+ private ValueParser inputParser;
+
+ private ValueParser pairDelimiterParser;
+
+ private ValueParser kvDelimiterParser;
+
+ public StrToJsonFunction(Function expr) {
+ List<Expression> expressions = expr.getParameters().getExpressions();
+ if (!expressions.isEmpty()) {
+ inputParser = OperatorTools.buildParser(expressions.get(0));
+ if (expressions.size() >= 2) {
+ pairDelimiterParser =
OperatorTools.buildParser(expressions.get(1));
+ if (expressions.size() >= 3) {
+ kvDelimiterParser =
OperatorTools.buildParser(expressions.get(2));
+ }
+ }
+ }
+ }
+
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object inputStringObj = inputParser.parse(sourceData, rowIndex,
context);
+ Object pairDelimiterStringObj = null;
+ String pairDelimiterString = null;
+ if (pairDelimiterParser != null) {
+ pairDelimiterStringObj = pairDelimiterParser.parse(sourceData,
rowIndex, context);
+ pairDelimiterString =
OperatorTools.parseString(pairDelimiterStringObj);
+ }
+ Object kvDelimiterStringObj = null;
+ String kvDelimiterString = null;
+ if (kvDelimiterParser != null) {
+ kvDelimiterStringObj = kvDelimiterParser.parse(sourceData,
rowIndex, context);
+ kvDelimiterString =
OperatorTools.parseString(kvDelimiterStringObj);
+ }
+ String inputString = OperatorTools.parseString(inputStringObj);
+
+ return parse2Json(pairDelimiterString, kvDelimiterString, inputString);
+ }
+
+ private JsonObject parse2Json(String pairDelimiterString, String
kvDelimiterString,
+ String inputString) {
+ String pairDelimiter =
+ (pairDelimiterString == null || pairDelimiterString.isEmpty())
? "," : escapeRegex(pairDelimiterString);
+ String keyValueDelimiter =
+ (kvDelimiterString == null || kvDelimiterString.isEmpty()) ?
"=" : escapeRegex(kvDelimiterString);
+
+ JsonObject json = new JsonObject();
+ String[] pairs = inputString.split(pairDelimiter);
+
+ for (String pair : pairs) {
+ if (pair.contains(keyValueDelimiter)) {
+ String[] keyValue = pair.split(keyValueDelimiter, 2);
+ json.addProperty(keyValue[0], keyValue[1]);
+ }
+ }
+ return json;
+ }
+
+ private String escapeRegex(String delimiter) {
+ return delimiter.replaceAll("([\\\\^$|?*+\\[\\](){}])", "\\\\$1");
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/LikeOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/LikeOperator.java
new file mode 100644
index 0000000000..d48ee38869
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/LikeOperator.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.common.util.StringUtil;
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import lombok.extern.slf4j.Slf4j;
+import net.sf.jsqlparser.expression.operators.relational.LikeExpression;
+
+import java.util.regex.Pattern;
+
+/**
+ * LikeOperator
+ *
+ */
+@Slf4j
+@TransformOperator(values = LikeExpression.class)
+public class LikeOperator implements ExpressionOperator {
+
+ private final ValueParser destParser;
+ private final ValueParser patternParser;
+ private final ValueParser escapeParser;
+ private final boolean isNot;
+ private static final String REGEX_SPECIAL_CHAR = "[]()|^-+*?{}$\\.";
+
+ public LikeOperator(LikeExpression expr) {
+ destParser = OperatorTools.buildParser(expr.getLeftExpression());
+ patternParser = OperatorTools.buildParser(expr.getRightExpression());
+ escapeParser = OperatorTools.buildParser(expr.getEscape());
+ isNot = expr.isNot();
+ }
+
+ private String buildLikeRegex(String pattern, char escapeChar) {
+ int len = pattern.length();
+ StringBuilder regexPattern = new StringBuilder(len + len);
+ for (int i = 0; i < len; i++) {
+ char c = pattern.charAt(i);
+ if (REGEX_SPECIAL_CHAR.indexOf(c) >= 0) {
+ regexPattern.append('\\');
+ }
+ if (c == escapeChar) {
+ if (i == (pattern.length() - 1)) {
+ // At the end of a string, the escape character represents
itself
+ regexPattern.append(c);
+ continue;
+ }
+ char nextChar = pattern.charAt(i + 1);
+ if (nextChar == '_' || nextChar == '%' || nextChar ==
escapeChar) {
+ regexPattern.append(nextChar);
+ i++;
+ } else {
+ throw new RuntimeException("Illegal pattern string");
+ }
+ } else if (c == '_') {
+ regexPattern.append('.');
+ } else if (c == '%') {
+ regexPattern.append("(?s:.*)");
+ } else {
+ regexPattern.append(c);
+ }
+ }
+ return regexPattern.toString();
+ }
+
+ /**
+ * check
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public boolean check(SourceData sourceData, int rowIndex, Context context)
{
+ Object destObj = destParser.parse(sourceData, rowIndex, context);
+ Object patternObj = patternParser.parse(sourceData, rowIndex, context);
+ if (destObj == null || patternObj == null) {
+ return false;
+ }
+ char escapeChr = '\\';
+ if (escapeParser != null) {
+ Object escapeObj = this.escapeParser.parse(sourceData, rowIndex,
context);
+ if (!StringUtil.isEmpty(escapeObj)) {
+ escapeChr = escapeObj.toString().charAt(0);
+ }
+ }
+ String destStr = destObj.toString();
+ String pattern = patternObj.toString();
+ try {
+ final String regex = buildLikeRegex(pattern, escapeChr);
+ boolean isMatch = Pattern.matches(regex.toLowerCase(),
destStr.toLowerCase());
+ if (isNot) {
+ return !isMatch;
+ }
+ return isMatch;
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ return false;
+ }
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ArrayParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ArrayParser.java
new file mode 100644
index 0000000000..9571b0a16a
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ArrayParser.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+
+import net.sf.jsqlparser.expression.ArrayExpression;
+
+import java.util.List;
+
+/**
+ * ArrayParser
+ * Description: Support to get the value from array
+ */
+@TransformParser(values = ArrayExpression.class)
+public class ArrayParser implements ValueParser {
+
+ private final ValueParser left;
+
+ private final ValueParser right;
+
+ public ArrayParser(ArrayExpression expr) {
+ this.left = OperatorTools.buildParser(expr.getObjExpression());
+ this.right = OperatorTools.buildParser(expr.getIndexExpression());
+ }
+
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object leftValue = this.left.parse(sourceData, rowIndex, context);
+ Object rightValue = this.right.parse(sourceData, rowIndex, context);
+
+ if (leftValue instanceof List<?> && rightValue instanceof Number) {
+ List<?> leftObj = (List<?>) leftValue;
+ Number rightObj = (Number) rightValue;
+ return leftObj.get(rightObj.intValue());
+ }
+ return null;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
index 2b57409714..ad2ee5f3bc 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.process.processor;
import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
@@ -82,4 +83,60 @@ public class TestCsv2KvProcessor extends
AbstractProcessorTestBase {
List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok",
new HashMap<>());
Assert.assertEquals(0, output2.size());
}
+
+ @Test
+ public void testCsv2CsvSplit() throws Exception {
+ List<FieldInfo> sourceFields = this.getTestFieldList("ftime",
"extinfo", "country", "province", "operator",
+ "apn", "gw", "src_ip_head", "info_str", "product_id",
"app_version", "sdk_id", "sdk_version",
+ "hardware_os", "qua", "upload_ip", "client_ip", "upload_apn",
"event_code", "event_result",
+ "package_size", "consume_time", "event_value", "event_time",
"upload_time");
+ List<FieldInfo> sinkFields = this.getTestFieldList("imp_hour",
"ftime", "event_code", "event_time", "log_id",
+ "qimei36", "platform", "hardware_os", "os_version", "brand",
"model", "country", "province", "city",
+ "network_type", "dt_qq", "app_version", "boundle_id",
"dt_usid", "dt_pgid", "dt_ref_pgid", "dt_eid",
+ "dt_element_lvtm", "dt_lvtm", "product_id", "biz_pub_params",
"udf_kv", "sdk_type", "app_version_num");
+ CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\',
sourceFields);
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', sinkFields);
+ String transformSql = "select replace(substr(ftime,1,10),'-','') as
imp_hour,"
+ + "ftime as ftime,event_code as event_code,"
+ + "event_time as event_time,"
+ + "parse_url(url_decode(event_value,'GBK'),'QUERY','A100') as
log_id,"
+ + "parse_url(url_decode(event_value,'GBK'),'QUERY','A153') as
qimei36,"
+ + "case when lower(url_decode(hardware_os,'GBK')) like
'%android%' then 'android' when lower(url_decode(hardware_os,'GBK')) like
'%ipad%' then 'ipad' when lower(url_decode(hardware_os,'GBK')) like '%iphone%'
then 'iphone' when lower(url_decode(hardware_os,'GBK')) like '%harmony%' then
'harmony' when lower(url_decode(hardware_os,'GBK')) like '%windows%' then
'windows' when lower(url_decode(hardware_os,'GBK')) like '%mac%' then 'mac'
when lower(url_decode(hardware_os,'GBK') [...]
+ + "url_decode(hardware_os,'GBK') as hardware_os,"
+ + "trim(case when hardware_os LIKE '%Android%' then
regexp_extract(url_decode(hardware_os,'GBK'), 'Android(.+),level', 1) when
hardware_os LIKE '%iPhone%' then regexp_extract(url_decode(hardware_os,'GBK'),
'OS(.+)\\\\(', 1) when hardware_os LIKE '%Harmony%' then
regexp_extract(url_decode(hardware_os,'GBK'),
'Harmony\\\\s+[^\\\\s]+\\\\s+([^\\\\s]+)\\\\(', 1) else 'unknown' end) as
os_version,"
+ + "parse_url(url_decode(event_value,'GBK'),'QUERY','A9') as
brand,"
+ + "parse_url(url_decode(event_value,'GBK'),'QUERY','A10') as
model,"
+ + "country as country,"
+ + "province as province,"
+ + "parse_url(url_decode(event_value,'GBK'),'QUERY','A160') as
city,"
+ + "parse_url(url_decode(event_value,'GBK'),'QUERY','A19') as
network_type,"
+ + "parse_url(url_decode(event_value,'GBK'),'QUERY','dt_qq') as
dt_qq,"
+ + "url_decode(app_version,'GBK') as app_version,"
+ + "parse_url(url_decode(event_value,'GBK'),'QUERY','A67') as
boundle_id,"
+ + "parse_url(url_decode(event_value,'GBK'),'QUERY','dt_usid')
as dt_usid,"
+ + "parse_url(url_decode(event_value,'GBK'),'QUERY','dt_pgid')
as dt_pgid,"
+ +
"parse_url(url_decode(event_value,'GBK'),'QUERY','dt_ref_pgid') as dt_ref_pgid,"
+ + "parse_url(url_decode(event_value,'GBK'),'QUERY','dt_eid')
as dt_eid,"
+ +
"parse_url(url_decode(event_value,'GBK'),'QUERY','dt_element_lvtm') as
dt_element_lvtm,"
+ + "parse_url(url_decode(event_value,'GBK'),'QUERY','dt_lvtm')
as dt_lvtm,"
+ + "product_id as product_id,"
+ +
"json_remove(str_to_json(url_decode(event_value,'GBK'),'&','='),'udf_kv') as
biz_pub_params,"
+ + "parse_url(url_decode(event_value,'GBK'),'QUERY','udf_kv')
as udf_kv,"
+ + "case when sdk_id='js' then 1 when sdk_id='weapp' then 2
else 0 end as sdk_type,"
+ +
"split_index(app_version,'\\.',0)*1000+split_index(app_version,'\\.',1)*100+split_index(split_index(app_version,'\\.',2),'\\(',0)
as app_version_num "
+ + "from source where
parse_url(url_decode(event_value,'GBK'),'QUERY','dt_pgid') like 'pg_sgrp_%'";
+ System.out.println(transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1
+ TransformProcessor<String, String> processor1 = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
+ String sourceData =
+ "2025-01-01
01:01:01.001|extinfo=127.0.0.1|china|guangdong|unite|unknown|unknown|127.0.0.1
2025-01-01
01:01:01.001|INFO|MNJT|1.2.0.12345|js|1.2.3.4-qqvideo6|PJV110%3BAndroid+15%2Clevel+35||127.0.0.1|127.0.0.1|wifi|dt_imp|true|0|0|A9%3DOPPO%26A89%3D12345678%26A76%3D1.2.3.4%26A58%3DN%26A52%3D480%26A17%3D1080*2244%26A12%3Dzh%26A10%3DPJV110%26A158%3D12345678%26A67%3Dmobileapp%26A159%3DN%26A31%3D%2C%2C%26A160%3Dshenzhen%26ui_vrsn%3DPJV%28CN01%29%26udf_kv%3D%7B%22eid%22%3A%22se
[...]
+ List<String> output1 = processor1.transform(sourceData, new
HashMap<>());
+ Assert.assertEquals(1, output1.size());
+ System.out.println(output1.get(0));
+ Assert.assertEquals(output1.get(0),
+ "20250101|2025-01-01 01:01:01.001|dt_imp|2025-01-01
01:01:01.001|12345678|123456|android|PJV110;Android 15,level
35|15|OPPO|PJV110|china|guangdong|shenzhen|wifi|12345678|1.2.0.12345|mobileapp|12345678|pg_sgrp_test||search|||MNJT|{\"A88\":\"12345678\",\"A89\":\"12345678\",\"A48\":\"\",\"dt_wxopenid\":\"\",\"dt_seqtime\":\"12345678\",\"app_bld\":\"12345678\",\"A100\":\"12345678\",\"dt_fchlid\":\"\",\"A1\":\"12345678\",\"os_vrsn\":\"Android
15\",\"A3\":\"12345678\",\"dt_mchl [...]
+ }
}