This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a79697b07 [INLONG-12019][SDK] Transformation supports a caching 
mechanism for processing identical function parameters (#12020)
9a79697b07 is described below

commit 9a79697b07cd62c48b83b010358e23235308fecf
Author: ChunLiang Lu <[email protected]>
AuthorDate: Fri Oct 17 10:52:52 2025 +0800

    [INLONG-12019][SDK] Transformation supports a caching mechanism for 
processing identical function parameters (#12020)
---
 .../inlong/sdk/transform/process/Context.java      |  3 ++
 .../sdk/transform/process/TransformProcessor.java  |  1 +
 .../process/function/string/ParseUrlFunction.java  | 40 ++++++++++++---
 .../process/function/string/UrlDecodeFunction.java | 20 +++++++-
 .../process/processor/TestCsv2KvProcessor.java     | 59 ++++++++++++++++++++++
 5 files changed, 114 insertions(+), 9 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
index 6cfdd4f160..02e82ffbea 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
@@ -89,4 +89,7 @@ public class Context {
         return null;
     }
 
+    public Map<String, Object> getRuntimeParams() {
+        return runtimeParams;
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 13cc17010c..e6392bbe2d 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -180,6 +180,7 @@ public class TransformProcessor<I, O> {
                         sinkData.addField(fieldName, "");
                     } else {
                         sinkData.addField(fieldName, fieldValue.toString());
+                        context.put(fieldName, fieldValue);
                     }
                 } catch (Throwable t) {
                     sinkData.addField(fieldName, "");
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ParseUrlFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ParseUrlFunction.java
index e853c59271..501522d694 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ParseUrlFunction.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ParseUrlFunction.java
@@ -57,17 +57,24 @@ public class ParseUrlFunction implements ValueParser {
     private ValueParser urlParser;
     private ValueParser partParser;
     private ValueParser keyParser;
+    private final String exprKey;
 
     public ParseUrlFunction(Function expr) {
         List<Expression> params = expr.getParameters().getExpressions();
         urlParser = OperatorTools.buildParser(params.get(0));
         partParser = params.size() > 1 ? 
OperatorTools.buildParser(params.get(1)) : null;
         keyParser = params.size() > 2 ? 
OperatorTools.buildParser(params.get(2)) : null;
+        exprKey = expr.toString();
     }
 
     @Override
     public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Map<String, Object> runtimeParams = context.getRuntimeParams();
+        if (runtimeParams.containsKey(exprKey)) {
+            return runtimeParams.get(exprKey);
+        }
         if (urlParser == null || partParser == null) {
+            runtimeParams.put(exprKey, null);
             return null;
         }
         Object urlObj = urlParser.parse(sourceData, rowIndex, context);
@@ -75,6 +82,7 @@ public class ParseUrlFunction implements ValueParser {
         Object keyObj = keyParser != null ? keyParser.parse(sourceData, 
rowIndex, context) : null;
 
         if (urlObj == null || partObj == null) {
+            runtimeParams.put(exprKey, null);
             return null;
         }
 
@@ -82,6 +90,7 @@ public class ParseUrlFunction implements ValueParser {
         String part = OperatorTools.parseString(partObj);
         String key = keyObj != null ? OperatorTools.parseString(keyObj) : null;
         if (keyParser != null && key == null) {
+            runtimeParams.put(exprKey, null);
             return null;
         }
 
@@ -95,6 +104,7 @@ public class ParseUrlFunction implements ValueParser {
             }
             Map<String, String> queryPairs = splitQuery(strQuery);
             if (key == null) {
+                runtimeParams.put(exprKey, strQuery);
                 return strQuery;
             }
             return queryPairs.getOrDefault(key, "");
@@ -103,23 +113,39 @@ public class ParseUrlFunction implements ValueParser {
                 URL netUrl = new URL(url);
                 switch (part) {
                     case "HOST":
-                        return netUrl.getHost();
+                        String exprValue = netUrl.getHost();
+                        runtimeParams.put(exprKey, exprValue);
+                        return exprValue;
                     case "PATH":
-                        return netUrl.getPath();
+                        exprValue = netUrl.getPath();
+                        runtimeParams.put(exprKey, exprValue);
+                        return exprValue;
                     case "REF":
-                        return netUrl.getRef();
+                        exprValue = netUrl.getRef();
+                        runtimeParams.put(exprKey, exprValue);
+                        return exprValue;
                     case "PROTOCOL":
-                        return netUrl.getProtocol();
+                        exprValue = netUrl.getProtocol();
+                        runtimeParams.put(exprKey, exprValue);
+                        return exprValue;
                     case "AUTHORITY":
-                        return netUrl.getAuthority();
+                        exprValue = netUrl.getAuthority();
+                        runtimeParams.put(exprKey, exprValue);
+                        return exprValue;
                     case "FILE":
-                        return netUrl.getFile();
+                        exprValue = netUrl.getFile();
+                        runtimeParams.put(exprKey, exprValue);
+                        return exprValue;
                     case "USERINFO":
-                        return netUrl.getUserInfo();
+                        exprValue = netUrl.getUserInfo();
+                        runtimeParams.put(exprKey, exprValue);
+                        return exprValue;
                     default:
+                        runtimeParams.put(exprKey, null);
                         return null;
                 }
             } catch (MalformedURLException e) {
+                runtimeParams.put(exprKey, null);
                 return null;
             }
         }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/UrlDecodeFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/UrlDecodeFunction.java
index 9daf27d02f..3be28bad92 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/UrlDecodeFunction.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/UrlDecodeFunction.java
@@ -30,6 +30,7 @@ import net.sf.jsqlparser.expression.Function;
 import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Map;
 
 /**
  * UrlDecodeFunction  ->  url_decode(str[, charset])
@@ -53,39 +54,54 @@ public class UrlDecodeFunction implements ValueParser {
 
     private final ValueParser stringParser;
     private final ValueParser charsetParser;
+    private final String exprKey;
 
     public UrlDecodeFunction(Function expr) {
         List<Expression> params = expr.getParameters().getExpressions();
         stringParser = OperatorTools.buildParser(params.get(0));
         charsetParser = params.size() > 1 ? 
OperatorTools.buildParser(params.get(1)) : null;
+        exprKey = expr.toString();
     }
 
     @Override
     public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Map<String, Object> runtimeParams = context.getRuntimeParams();
+        if (runtimeParams.containsKey(exprKey)) {
+            return runtimeParams.get(exprKey);
+        }
         Object stringObj = stringParser.parse(sourceData, rowIndex, context);
         if (stringObj == null) {
+            runtimeParams.put(exprKey, null);
             return null;
         }
         String string = OperatorTools.parseString(stringObj);
         if (string == null) {
+            runtimeParams.put(exprKey, null);
             return null;
         }
 
         try {
             if (charsetParser == null) {
-                return URLDecoder.decode(string, 
StandardCharsets.UTF_8.toString());
+                String exprValue = URLDecoder.decode(string, 
StandardCharsets.UTF_8.toString());
+                runtimeParams.put(exprKey, exprValue);
+                return exprValue;
             } else {
                 Object charsetObj = charsetParser.parse(sourceData, rowIndex, 
context);
                 if (charsetObj == null) {
+                    runtimeParams.put(exprKey, null);
                     return null;
                 }
                 String charset = OperatorTools.parseString(charsetObj);
                 if (charset == null) {
+                    runtimeParams.put(exprKey, null);
                     return null;
                 }
-                return URLDecoder.decode(string, charset);
+                String exprValue = URLDecoder.decode(string, charset);
+                runtimeParams.put(exprKey, exprValue);
+                return exprValue;
             }
         } catch (Exception e) {
+            runtimeParams.put(exprKey, null);
             return null;
         }
     }
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
index ad2ee5f3bc..7598edf497 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
@@ -139,4 +139,63 @@ public class TestCsv2KvProcessor extends 
AbstractProcessorTestBase {
         Assert.assertEquals(output1.get(0),
                 "20250101|2025-01-01 01:01:01.001|dt_imp|2025-01-01 
01:01:01.001|12345678|123456|android|PJV110;Android 15,level 
35|15|OPPO|PJV110|china|guangdong|shenzhen|wifi|12345678|1.2.0.12345|mobileapp|12345678|pg_sgrp_test||search|||MNJT|{\"A88\":\"12345678\",\"A89\":\"12345678\",\"A48\":\"\",\"dt_wxopenid\":\"\",\"dt_seqtime\":\"12345678\",\"app_bld\":\"12345678\",\"A100\":\"12345678\",\"dt_fchlid\":\"\",\"A1\":\"12345678\",\"os_vrsn\":\"Android
 15\",\"A3\":\"12345678\",\"dt_mchl [...]
     }
+
+    @Test
+    public void testCsv2CsvRuntimesMap() throws Exception {
+        List<FieldInfo> sourceFields = this.getTestFieldList("ftime", 
"extinfo", "country", "province", "operator",
+                "apn", "gw", "src_ip_head", "info_str", "product_id", 
"app_version", "sdk_id", "sdk_version",
+                "hardware_os", "qua", "upload_ip", "client_ip", "upload_apn", 
"event_code", "event_result",
+                "package_size", "consume_time", "event_value", "event_time", 
"upload_time");
+        List<FieldInfo> sinkFields = this.getTestFieldList("imp_hour", 
"ftime", "event_code", "event_time", "log_id",
+                "qimei36", "platform", "hardware_os", "os_version", "brand", 
"model", "country", "province", "city",
+                "network_type", "dt_qq", "app_version", "boundle_id", 
"dt_usid", "dt_pgid", "dt_ref_pgid", "dt_eid",
+                "dt_element_lvtm", "dt_lvtm", "product_id", "biz_pub_params", 
"udf_kv", "sdk_type", "app_version_num");
+        CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\', 
sourceFields);
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', sinkFields);
+        String transformSql = "select replace(substr(ftime,1,10),'-','') as 
imp_hour,"
+                + "url_decode(event_value,'GBK') as decode_event_value,"
+                + "url_decode(hardware_os,'GBK') as decode_hardvalue_os,"
+                + "lower($ctx.decode_hardvalue_os) as lower_hardvalue_os,"
+                + "ftime as ftime,event_code as event_code,"
+                + "event_time as event_time,"
+                + "parse_url($ctx.decode_event_value,'QUERY','A100') as 
log_id,"
+                + "parse_url($ctx.decode_event_value,'QUERY','A153') as 
qimei36,"
+                + "case when $ctx.lower_hardvalue_os like '%android%' then 
'android' when $ctx.lower_hardvalue_os like '%ipad%' then 'ipad' when 
$ctx.lower_hardvalue_os like '%iphone%' then 'iphone' when 
$ctx.lower_hardvalue_os like '%harmony%' then 'harmony' when 
$ctx.lower_hardvalue_os like '%windows%' then 'windows' when 
$ctx.lower_hardvalue_os like '%mac%' then 'mac' when $ctx.lower_hardvalue_os 
like '%linux%' then 'linux' else 'unknown' end as platform,"
+                + "$ctx.decode_hardvalue_os as hardware_os,"
+                + "trim(case when hardware_os LIKE '%Android%' then 
regexp_extract($ctx.decode_hardvalue_os, 'Android(.+),level', 1) when 
hardware_os LIKE '%iPhone%' then regexp_extract($ctx.decode_hardvalue_os, 
'OS(.+)\\\\(', 1) when hardware_os LIKE '%Harmony%' then 
regexp_extract($ctx.decode_hardvalue_os, 
'Harmony\\\\s+[^\\\\s]+\\\\s+([^\\\\s]+)\\\\(', 1) else 'unknown' end) as 
os_version,"
+                + "parse_url($ctx.decode_event_value,'QUERY','A9') as brand,"
+                + "parse_url($ctx.decode_event_value,'QUERY','A10') as model,"
+                + "country as country,"
+                + "province as province,"
+                + "parse_url($ctx.decode_event_value,'QUERY','A160') as city,"
+                + "parse_url($ctx.decode_event_value,'QUERY','A19') as 
network_type,"
+                + "parse_url($ctx.decode_event_value,'QUERY','dt_qq') as 
dt_qq,"
+                + "url_decode(app_version,'GBK') as app_version,"
+                + "parse_url($ctx.decode_event_value,'QUERY','A67') as 
boundle_id,"
+                + "parse_url($ctx.decode_event_value,'QUERY','dt_usid') as 
dt_usid,"
+                + "parse_url($ctx.decode_event_value,'QUERY','dt_pgid') as 
dt_pgid,"
+                + "parse_url($ctx.decode_event_value,'QUERY','dt_ref_pgid') as 
dt_ref_pgid,"
+                + "parse_url($ctx.decode_event_value,'QUERY','dt_eid') as 
dt_eid,"
+                + 
"parse_url($ctx.decode_event_value,'QUERY','dt_element_lvtm') as 
dt_element_lvtm,"
+                + "parse_url($ctx.decode_event_value,'QUERY','dt_lvtm') as 
dt_lvtm,"
+                + "product_id as product_id,"
+                + 
"json_remove(str_to_json($ctx.decode_event_value,'&','='),'udf_kv') as 
biz_pub_params,"
+                + "parse_url($ctx.decode_event_value,'QUERY','udf_kv') as 
udf_kv,"
+                + "case when sdk_id='js' then 1 when sdk_id='weapp' then 2 
else 0 end as sdk_type,"
+                + 
"split_index(app_version,'\\.',0)*1000+split_index(app_version,'\\.',1)*100+split_index(split_index(app_version,'\\.',2),'\\(',0)
 as app_version_num "
+                + "from source where 
parse_url(url_decode(event_value,'GBK'),'QUERY','dt_pgid') like 'pg_sgrp_%'";
+        System.out.println(transformSql);
+        TransformConfig config = new TransformConfig(transformSql, new 
HashMap<>(), false, true);
+        // case1
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
+        String sourceData =
+                "2025-01-01 
01:01:01.001|extinfo=127.0.0.1|china|guangdong|unite|unknown|unknown|127.0.0.1 
2025-01-01 
01:01:01.001|INFO|MNJT|1.2.0.12345|js|1.2.3.4-qqvideo6|PJV110%3BAndroid+15%2Clevel+35||127.0.0.1|127.0.0.1|wifi|dt_imp|true|0|0|A9%3DOPPO%26A89%3D12345678%26A76%3D1.2.3.4%26A58%3DN%26A52%3D480%26A17%3D1080*2244%26A12%3Dzh%26A10%3DPJV110%26A158%3D12345678%26A67%3Dmobileapp%26A159%3DN%26A31%3D%2C%2C%26A160%3Dshenzhen%26ui_vrsn%3DPJV%28CN01%29%26udf_kv%3D%7B%22eid%22%3A%22se
 [...]
+        List<String> output1 = processor1.transform(sourceData, new 
HashMap<>());
+        Assert.assertEquals(1, output1.size());
+        System.out.println(output1.get(0));
+        Assert.assertEquals(output1.get(0),
+                "20250101|2025-01-01 01:01:01.001|dt_imp|2025-01-01 
01:01:01.001|12345678|123456|android|PJV110;Android 15,level 
35|15|OPPO|PJV110|china|guangdong|shenzhen|wifi|12345678|1.2.0.12345|mobileapp|12345678|pg_sgrp_test||search|||MNJT|{\"A88\":\"12345678\",\"A89\":\"12345678\",\"A48\":\"\",\"dt_wxopenid\":\"\",\"dt_seqtime\":\"12345678\",\"app_bld\":\"12345678\",\"A100\":\"12345678\",\"dt_fchlid\":\"\",\"A1\":\"12345678\",\"os_vrsn\":\"Android
 15\",\"A3\":\"12345678\",\"dt_mchl [...]
+    }
 }

Reply via email to