This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 24ffaeb14a [Improve] Support JSON multi-level nested parsing (#10000)
24ffaeb14a is described below

commit 24ffaeb14ae47cd0cd555815461fb5ecb538002e
Author: misi <[email protected]>
AuthorDate: Tue Nov 18 21:13:42 2025 +0800

    [Improve] Support JSON multi-level nested parsing (#10000)
    
    Co-authored-by: misi <[email protected]>
---
 .../apache/seatunnel/e2e/transform/TestSQLIT.java  |   4 +
 .../resources/sql_transform/func_null_return.conf  | 120 +++++++++++
 .../transform/sql/zeta/ZetaSQLFunction.java        |   3 +
 .../transform/sql/zeta/CastFunctionTest.java       | 229 +++++++++++++++++++++
 4 files changed, 356 insertions(+)

diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index 5b04022a67..2e0a2a0851 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -84,6 +84,10 @@ public class TestSQLIT extends TestSuiteBase {
 
         Container.ExecResult multiIfSql = 
container.executeJob("/sql_transform/func_multi_if.conf");
         Assertions.assertEquals(0, multiIfSql.getExitCode());
+
+        Container.ExecResult nullReturnSql =
+                container.executeJob("/sql_transform/func_null_return.conf");
+        Assertions.assertEquals(0, nullReturnSql.getExitCode());
     }
 
     @TestTemplate
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_null_return.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_null_return.conf
new file mode 100644
index 0000000000..c219e2b5ba
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_null_return.conf
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        nullable_field = "string"
+        nested_data = {
+          fields = {
+            inner_field = "string"
+            inner_nullable = "string"
+          }
+        }
+      }
+    }
+    rows = [
+      {fields = [1, "Test Name", null, {inner_field: "inner_value", 
inner_nullable: null}], kind = INSERT},
+      {fields = [2, "Another Name", "Some Value", {inner_field: 
"another_inner", inner_nullable: "non_null"}], kind = INSERT},
+      {fields = [3, "Third Name", null, {inner_field: null, inner_nullable: 
null}], kind = INSERT}
+    ]
+  }
+}
+
+transform {
+  sql {
+    plugin_input = "fake"
+    plugin_output = "fake1"
+    query = "select id, name, nullable_field, nested_data.inner_field, 
nested_data.inner_nullable, nested_data.inner_field as copied_field from fake"
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "fake1"
+    rules = {
+      field_rules = [
+        {
+          field_name = "id"
+          field_type = "int"
+          field_value = [
+            {equals_to = 1},
+            {equals_to = 2},
+            {equals_to = 3}
+          ]
+        },
+        {
+          field_name = "name"
+          field_type = "string"
+          field_value = [
+            {equals_to = "Test Name"},
+            {equals_to = "Another Name"},
+            {equals_to = "Third Name"}
+          ]
+        },
+        {
+          field_name = "nullable_field"
+          field_type = "string"
+          field_value = [
+            {is_null = true},
+            {equals_to = "Some Value"},
+            {is_null = true}
+          ]
+        },
+        {
+          field_name = "inner_field"
+          field_type = "string"
+          field_value = [
+            {equals_to = "inner_value"},
+            {equals_to = "another_inner"},
+            {is_null = true}
+          ]
+        },
+        {
+          field_name = "inner_nullable"
+          field_type = "string"
+          field_value = [
+            {is_null = true},
+            {equals_to = "non_null"},
+            {is_null = true}
+          ]
+        },
+        {
+          field_name = "copied_field"
+          field_type = "string"
+          field_value = [
+            {equals_to = "inner_value"},
+            {equals_to = "another_inner"},
+            {is_null = true}
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 4adadaf60f..de1a78eae2 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -340,6 +340,9 @@ public class ZetaSQLFunction {
                     }
                     parDataType = ((SeaTunnelRowType) 
parDataType).getFieldType(idx);
                     res = parRowValues.getFields()[idx];
+                    if (res == null) {
+                        return null;
+                    }
                 }
                 return res;
             }
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/CastFunctionTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/CastFunctionTest.java
index b5c5ae377e..c854bc021e 100644
--- 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/CastFunctionTest.java
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/CastFunctionTest.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.transform.sql.zeta;
 
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -54,4 +55,232 @@ public class CastFunctionTest {
         Assertions.assertEquals(Byte.parseByte("1"), f2Object);
         Assertions.assertEquals(Short.parseShort("1"), f3Object);
     }
+
+    @Test
+    public void testCastFunctionWithNullNestedField() {
+        SQLEngine sqlEngine = 
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"user"},
+                        new SeaTunnelDataType[] {
+                            new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE)
+                        });
+
+        SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {null});
+
+        sqlEngine.init("test", null, rowType, "select user.address as address 
from test");
+
+        SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+
+        SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow, 
outRowType).get(0);
+
+        Object addressField = outRow.getField(0);
+        Assertions.assertNull(
+                addressField,
+                "When casting nested field where intermediate value is null, 
result should be null");
+    }
+
+    @Test
+    public void testCastFunctionWithNestedField() {
+        SQLEngine sqlEngine = 
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+        // Create a map with nested data
+        MapType<String, String> mapType =
+                new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE);
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(new String[] {"user"}, new 
SeaTunnelDataType[] {mapType});
+
+        // Create input data with nested fields
+        java.util.Map<String, String> userData = new java.util.HashMap<>();
+        userData.put("address", "123 Main St");
+        userData.put("age", "25");
+        SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {userData});
+
+        sqlEngine.init(
+                "test",
+                null,
+                rowType,
+                "select user.address as address, cast(user.age as INT) as age 
from test");
+
+        SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+        SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow, 
outRowType).get(0);
+
+        // Check nested field access
+        Assertions.assertEquals("123 Main St", outRow.getField(0)); // Direct 
access
+        Assertions.assertEquals(25, outRow.getField(1)); // Cast from nested 
field
+    }
+
+    @Test
+    public void testCastFunctionWithNormalValues() {
+        SQLEngine sqlEngine = 
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"str_field", "int_field"},
+                        new SeaTunnelDataType[] {BasicType.STRING_TYPE, 
BasicType.INT_TYPE});
+
+        SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {"123", 456});
+
+        sqlEngine.init(
+                "test",
+                null,
+                rowType,
+                "select str_field, cast(str_field as INT) as int_from_str, "
+                        + "int_field, cast(int_field as STRING) as 
str_from_int from test");
+
+        SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+        SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow, 
outRowType).get(0);
+
+        // Original values should remain unchanged
+        Assertions.assertEquals("123", outRow.getField(0));
+        Assertions.assertEquals(456, outRow.getField(2));
+
+        // Cast conversions
+        Assertions.assertEquals(123, outRow.getField(1)); // String to Int
+        Assertions.assertEquals("456", outRow.getField(3)); // Int to String
+    }
+
+    @Test
+    public void testNormalNestedRowFieldAccess() {
+        SQLEngine sqlEngine = 
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+        // Create nested row type structure
+        SeaTunnelRowType innerRowType =
+                new SeaTunnelRowType(
+                        new String[] {"street", "city"},
+                        new SeaTunnelDataType[] {BasicType.STRING_TYPE, 
BasicType.STRING_TYPE});
+
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(new String[] {"user"}, new 
SeaTunnelDataType[] {innerRowType});
+
+        // Create nested row data
+        SeaTunnelRow innerRow = new SeaTunnelRow(new Object[] {"123 Main St", 
"New York"});
+        SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {innerRow});
+
+        sqlEngine.init(
+                "test", null, rowType, "select user.street as street, 
user.city as city from test");
+
+        SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+        SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow, 
outRowType).get(0);
+
+        // Verify normal nested field access (testing lines 343-345 in 
ZetaSQLFunction)
+        Assertions.assertEquals("123 Main St", outRow.getField(0));
+        Assertions.assertEquals("New York", outRow.getField(1));
+    }
+
+    @Test
+    public void testMultiLevelNestedRowFieldAccess() {
+        SQLEngine sqlEngine = 
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+        // Create multi-level nested row type structure
+        SeaTunnelRowType addressRowType =
+                new SeaTunnelRowType(
+                        new String[] {"street", "zipcode"},
+                        new SeaTunnelDataType[] {BasicType.STRING_TYPE, 
BasicType.STRING_TYPE});
+
+        SeaTunnelRowType userRowType =
+                new SeaTunnelRowType(
+                        new String[] {"name", "address"},
+                        new SeaTunnelDataType[] {BasicType.STRING_TYPE, 
addressRowType});
+
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(new String[] {"user"}, new 
SeaTunnelDataType[] {userRowType});
+
+        // Create multi-level nested row data
+        SeaTunnelRow addressRow = new SeaTunnelRow(new Object[] {"123 Main 
St", "10001"});
+        SeaTunnelRow userRow = new SeaTunnelRow(new Object[] {"John Doe", 
addressRow});
+        SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {userRow});
+
+        sqlEngine.init(
+                "test",
+                null,
+                rowType,
+                "select user.address.street as street, user.name as name from 
test");
+
+        SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+        SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow, 
outRowType).get(0);
+
+        // Verify multi-level nested field access (testing lines 343-345 in 
ZetaSQLFunction)
+        Assertions.assertEquals("123 Main St", outRow.getField(0));
+        Assertions.assertEquals("John Doe", outRow.getField(1));
+    }
+
+    @Test
+    public void testMapFieldNormalAccess() {
+        SQLEngine sqlEngine = 
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"user"},
+                        new SeaTunnelDataType[] {
+                            new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE)
+                        });
+
+        // Create map data with actual values (testing normal access scenario)
+        java.util.Map<String, String> userData = new java.util.HashMap<>();
+        userData.put("name", "John Doe");
+        userData.put("email", "[email protected]");
+        SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {userData});
+
+        sqlEngine.init(
+                "test", null, rowType, "select user.name as name, user.email 
as email from test");
+
+        SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+        SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow, 
outRowType).get(0);
+
+        // Verify map field access (testing normal access scenario for lines 
343-345)
+        Assertions.assertEquals("John Doe", outRow.getField(0));
+        Assertions.assertEquals("[email protected]", outRow.getField(1));
+    }
+
+    @Test
+    public void testNestedFieldWithNullIntermediateValue() {
+        SQLEngine sqlEngine = 
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+        // Create multi-level nested row type structure: user -> address -> 
street
+        SeaTunnelRowType addressRowType =
+                new SeaTunnelRowType(
+                        new String[] {"street", "zipcode"},
+                        new SeaTunnelDataType[] {BasicType.STRING_TYPE, 
BasicType.STRING_TYPE});
+
+        SeaTunnelRowType userRowType =
+                new SeaTunnelRowType(
+                        new String[] {"name", "address"},
+                        new SeaTunnelDataType[] {BasicType.STRING_TYPE, 
addressRowType});
+
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(new String[] {"user"}, new 
SeaTunnelDataType[] {userRowType});
+
+        // Test case 1: Normal nested access (user.address.street should 
return "beijing")
+        SeaTunnelRow addressRow1 = new SeaTunnelRow(new Object[] {"beijing", 
"10001"});
+        SeaTunnelRow userRow1 = new SeaTunnelRow(new Object[] {"zhangsan", 
addressRow1});
+        SeaTunnelRow inputRow1 = new SeaTunnelRow(new Object[] {userRow1});
+
+        sqlEngine.init(
+                "test",
+                null,
+                rowType,
+                "select user.address.street as street, user.name as name from 
test");
+
+        SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+        SeaTunnelRow outRow1 = sqlEngine.transformBySQL(inputRow1, 
outRowType).get(0);
+
+        // Verify normal nested field access
+        Assertions.assertEquals("beijing", outRow1.getField(0));
+        Assertions.assertEquals("zhangsan", outRow1.getField(1));
+
+        // Test case 2: Null intermediate value (user.address is null, 
user.address.street should
+        // return null)
+        SeaTunnelRow userRow2 = new SeaTunnelRow(new Object[] {"lisi", null});
+        SeaTunnelRow inputRow2 = new SeaTunnelRow(new Object[] {userRow2});
+
+        SeaTunnelRow outRow2 = sqlEngine.transformBySQL(inputRow2, 
outRowType).get(0);
+
+        // Verify that when intermediate value is null, the result should be 
null
+        Assertions.assertNull(
+                outRow2.getField(0),
+                "When accessing nested field where intermediate value is null, 
result should be null");
+        Assertions.assertEquals("lisi", outRow2.getField(1));
+    }
 }

Reply via email to