This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 32d05f31e [core][spark] Fix read nested column with pk table (#4476)
32d05f31e is described below

commit 32d05f31e1aae54e7d7504fc5f66cbde4a111a91
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Nov 8 10:37:15 2024 +0800

    [core][spark] Fix read nested column with pk table (#4476)
    
    This closes #4476.
---
 .../paimon/operation/MergeFileSplitRead.java       |  26 +--
 .../org/apache/paimon/spark/SparkReadITCase.java   |  27 +++
 .../apache/paimon/spark/sql/PaimonQueryTest.scala  | 209 +++++++++++----------
 3 files changed, 149 insertions(+), 113 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index c21c3683c..23a3a576e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -44,6 +44,7 @@ import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ProjectedRow;
 import org.apache.paimon.utils.Projection;
@@ -131,11 +132,9 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
     @Override
     public MergeFileSplitRead withReadType(RowType readType) {
         // todo: replace projectedFields with readType
+        RowType tableRowType = tableSchema.logicalRowType();
         int[][] projectedFields =
-                Arrays.stream(
-                                tableSchema
-                                        .logicalRowType()
-                                        
.getFieldIndices(readType.getFieldNames()))
+                
Arrays.stream(tableRowType.getFieldIndices(readType.getFieldNames()))
                         .mapToObj(i -> new int[] {i})
                         .toArray(int[][]::new);
         int[][] newProjectedFields = projectedFields;
@@ -161,13 +160,18 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
         this.pushdownProjection = projection.pushdownProjection;
         this.outerProjection = projection.outerProjection;
         if (pushdownProjection != null) {
-            RowType pushdownRowType =
-                    tableSchema
-                            .logicalRowType()
-                            .project(
-                                    Arrays.stream(pushdownProjection)
-                                            .mapToInt(arr -> arr[0])
-                                            .toArray());
+            List<DataField> tableFields = tableRowType.getFields();
+            List<DataField> readFields = readType.getFields();
+            List<DataField> finalReadFields = new ArrayList<>();
+            for (int i : Arrays.stream(pushdownProjection).mapToInt(arr -> 
arr[0]).toArray()) {
+                DataField requiredField = tableFields.get(i);
+                finalReadFields.add(
+                        readFields.stream()
+                                .filter(x -> 
x.name().equals(requiredField.name()))
+                                .findFirst()
+                                .orElse(requiredField));
+            }
+            RowType pushdownRowType = new RowType(finalReadFields);
             readerFactoryBuilder.withReadValueType(pushdownRowType);
             mergeSorter.setProjectedValueType(pushdownRowType);
         }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index be6264f7b..32c3498a7 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -440,6 +440,33 @@ public class SparkReadITCase extends SparkReadTestBase {
         innerTest("MyTable6", false, true);
     }
 
+    @Test
+    public void testReadNestedColumnTable() {
+        String tableName = "testAddNestedColumnTable";
+        spark.sql(
+                "CREATE TABLE paimon.default."
+                        + tableName
+                        + " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1: 
STRING, f2: INT>>) "
+                        + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 
'file.format' = 'parquet')");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, STRUCT(10, STRUCT('apple', 100)))");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (2, STRUCT(20, STRUCT('banana', 200)))");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, STRUCT(30, STRUCT('cat', 100)))");
+        assertThat(
+                        spark.sql("SELECT v.f2.f1, k FROM paimon.default." + 
tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[cat,1]", "[banana,2]");
+    }
+
     private void innerTest(String tableName, boolean hasPk, boolean 
partitioned) {
         String ddlTemplate =
                 "CREATE TABLE default.%s (\n"
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
index b08b342ca..fc2f9ac0c 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
@@ -219,108 +219,113 @@ class PaimonQueryTest extends PaimonSparkTestBase {
   }
 
   test("Paimon Query: query nested cols") {
-    fileFormats.foreach {
-      fileFormat =>
-        bucketModes.foreach {
-          bucketMode =>
-            val bucketProp = if (bucketMode != -1) {
-              s", 'bucket-key'='name', 'bucket' = '$bucketMode' "
-            } else {
-              ""
-            }
-            withTable("students") {
-              sql(s"""
-                     |CREATE TABLE students (
-                     |  name STRING,
-                     |  course STRUCT<course_name: STRING, grade: DOUBLE>,
-                     |  teacher STRUCT<name: STRING, address: STRUCT<street: 
STRING, city: STRING>>,
-                     |  m MAP<STRING, STRUCT<s:STRING, i INT, d: DOUBLE>>,
-                     |  l ARRAY<STRUCT<s:STRING, i INT, d: DOUBLE>>,
-                     |  s STRUCT<s1: STRING, s2: MAP<STRING, STRUCT<s:STRING, 
i INT, a: ARRAY<STRUCT<s:STRING, i INT, d: DOUBLE>>>>>,
-                     |  m2 MAP<STRUCT<s:STRING, i INT, d: DOUBLE>, 
STRUCT<s:STRING, i INT, d: DOUBLE>>
-                     |) USING paimon
-                     |TBLPROPERTIES ('file.format'='$fileFormat' $bucketProp)
-                     |""".stripMargin)
-
-              sql(s"""
-                     |INSERT INTO students VALUES (
-                     |  'Alice',
-                     |  STRUCT('Math', 85.0),
-                     |  STRUCT('John', STRUCT('Street 1', 'City 1')),
-                     |  MAP('k1', STRUCT('s1', 1, 1.0), 'k2', STRUCT('s11', 
11, 11.0)),
-                     |  ARRAY(STRUCT('s1', 1, 1.0), STRUCT('s11', 11, 11.0)),
-                     |  STRUCT('a', MAP('k1', STRUCT('s1', 1, 
ARRAY(STRUCT('s1', 1, 1.0))), 'k3', STRUCT('s11', 11, ARRAY(STRUCT('s11', 11, 
11.0))))),
-                     |  MAP(STRUCT('k1', 1, 1.0), STRUCT('s1', 1, 1.0), 
STRUCT('k2', 1, 1.0), STRUCT('s11', 11, 11.0)))
-                     |""".stripMargin)
-
-              sql(s"""
-                     |INSERT INTO students VALUES (
-                     |  'Bob',
-                     |  STRUCT('Biology', 92.0),
-                     |  STRUCT('Jane', STRUCT('Street 2', 'City 2')),
-                     |  MAP('k2', STRUCT('s2', 2, 2.0)),
-                     |  ARRAY(STRUCT('s2', 2, 2.0), STRUCT('s22', 22, 22.0)),
-                     |  STRUCT('b', MAP('k2', STRUCT('s22', 22, 
ARRAY(STRUCT('s22', 22, 22.0))))),
-                     |  MAP(STRUCT('k2', 2, 2.0), STRUCT('s22', 22, 22.0)))
-                     |""".stripMargin)
-
-              sql(s"""
-                     |INSERT INTO students VALUES (
-                     |  'Cathy',
-                     |  STRUCT('History', 95.0),
-                     |  STRUCT('Jane', STRUCT('Street 3', 'City 3')),
-                     |  MAP('k1', STRUCT('s3', 3, 3.0), 'k2', STRUCT('s33', 
33, 33.0)),
-                     |  ARRAY(STRUCT('s3', 3, 3.0)),
-                     |  STRUCT('c', MAP('k1', STRUCT('s3', 3, 
ARRAY(STRUCT('s3', 3, 3.0))), 'k2', STRUCT('s33', 33, ARRAY(STRUCT('s33', 33, 
33.0))))),
-                     |  MAP(STRUCT('k1', 3, 3.0), STRUCT('s3', 3, 3.0), 
STRUCT('k2', 3, 3.0), STRUCT('s33', 33, 33.0)))
-                     |""".stripMargin)
-
-              checkAnswer(
-                sql(s"""
-                       |SELECT
-                       |  course.grade, name, teacher.address, 
course.course_name,
-                       |  m['k1'].d, m['k1'].s,
-                       |  l[1].d, l[1].s,
-                       |  s.s2['k2'].a[0].i,
-                       |  map_keys(m2).i
-                       |FROM students ORDER BY name
-                       |""".stripMargin),
-                Seq(
-                  Row(
-                    85.0,
-                    "Alice",
-                    Row("Street 1", "City 1"),
-                    "Math",
-                    1.0,
-                    "s1",
-                    11.0,
-                    "s11",
-                    null,
-                    Seq(1, 1)),
-                  Row(
-                    92.0,
-                    "Bob",
-                    Row("Street 2", "City 2"),
-                    "Biology",
-                    null,
-                    null,
-                    22.0,
-                    "s22",
-                    22,
-                    Seq(2)),
-                  Row(
-                    95.0,
-                    "Cathy",
-                    Row("Street 3", "City 3"),
-                    "History",
-                    3.0,
-                    "s3",
-                    null,
-                    null,
-                    33,
-                    Seq(3, 3))
-                )
-              )
+    withPk.foreach {
+      hasPk =>
+        fileFormats.foreach {
+          fileFormat =>
+            bucketModes.foreach {
+              bucketMode =>
+                val key = if (hasPk) "primary-key" else "bucket-key"
+                val props = if (bucketMode != -1) {
+                  s", '$key'='name', 'bucket' = '$bucketMode' "
+                } else {
+                  ""
+                }
+                withTable("students") {
+                  sql(s"""
+                         |CREATE TABLE students (
+                         |  name STRING,
+                         |  course STRUCT<course_name: STRING, grade: DOUBLE>,
+                         |  teacher STRUCT<name: STRING, address: 
STRUCT<street: STRING, city: STRING>>,
+                         |  m MAP<STRING, STRUCT<s:STRING, i INT, d: DOUBLE>>,
+                         |  l ARRAY<STRUCT<s:STRING, i INT, d: DOUBLE>>,
+                         |  s STRUCT<s1: STRING, s2: MAP<STRING, 
STRUCT<s:STRING, i INT, a: ARRAY<STRUCT<s:STRING, i INT, d: DOUBLE>>>>>,
+                         |  m2 MAP<STRUCT<s:STRING, i INT, d: DOUBLE>, 
STRUCT<s:STRING, i INT, d: DOUBLE>>
+                         |) USING paimon
+                         |TBLPROPERTIES ('file.format'='$fileFormat' $props)
+                         |""".stripMargin)
+
+                  sql(s"""
+                         |INSERT INTO students VALUES (
+                         |  'Alice',
+                         |  STRUCT('Math', 85.0),
+                         |  STRUCT('John', STRUCT('Street 1', 'City 1')),
+                         |  MAP('k1', STRUCT('s1', 1, 1.0), 'k2', 
STRUCT('s11', 11, 11.0)),
+                         |  ARRAY(STRUCT('s1', 1, 1.0), STRUCT('s11', 11, 
11.0)),
+                         |  STRUCT('a', MAP('k1', STRUCT('s1', 1, 
ARRAY(STRUCT('s1', 1, 1.0))), 'k3', STRUCT('s11', 11, ARRAY(STRUCT('s11', 11, 
11.0))))),
+                         |  MAP(STRUCT('k1', 1, 1.0), STRUCT('s1', 1, 1.0), 
STRUCT('k2', 1, 1.0), STRUCT('s11', 11, 11.0)))
+                         |""".stripMargin)
+
+                  sql(
+                    s"""
+                       |INSERT INTO students VALUES (
+                       |  'Bob',
+                       |  STRUCT('Biology', 92.0),
+                       |  STRUCT('Jane', STRUCT('Street 2', 'City 2')),
+                       |  MAP('k2', STRUCT('s2', 2, 2.0)),
+                       |  ARRAY(STRUCT('s2', 2, 2.0), STRUCT('s22', 22, 22.0)),
+                       |  STRUCT('b', MAP('k2', STRUCT('s22', 22, 
ARRAY(STRUCT('s22', 22, 22.0))))),
+                       |  MAP(STRUCT('k2', 2, 2.0), STRUCT('s22', 22, 22.0)))
+                       |""".stripMargin)
+
+                  sql(s"""
+                         |INSERT INTO students VALUES (
+                         |  'Cathy',
+                         |  STRUCT('History', 95.0),
+                         |  STRUCT('Jane', STRUCT('Street 3', 'City 3')),
+                         |  MAP('k1', STRUCT('s3', 3, 3.0), 'k2', 
STRUCT('s33', 33, 33.0)),
+                         |  ARRAY(STRUCT('s3', 3, 3.0)),
+                         |  STRUCT('c', MAP('k1', STRUCT('s3', 3, 
ARRAY(STRUCT('s3', 3, 3.0))), 'k2', STRUCT('s33', 33, ARRAY(STRUCT('s33', 33, 
33.0))))),
+                         |  MAP(STRUCT('k1', 3, 3.0), STRUCT('s3', 3, 3.0), 
STRUCT('k2', 3, 3.0), STRUCT('s33', 33, 33.0)))
+                         |""".stripMargin)
+
+                  checkAnswer(
+                    sql(s"""
+                           |SELECT
+                           |  course.grade, name, teacher.address, 
course.course_name,
+                           |  m['k1'].d, m['k1'].s,
+                           |  l[1].d, l[1].s,
+                           |  s.s2['k2'].a[0].i,
+                           |  map_keys(m2).i
+                           |FROM students ORDER BY name
+                           |""".stripMargin),
+                    Seq(
+                      Row(
+                        85.0,
+                        "Alice",
+                        Row("Street 1", "City 1"),
+                        "Math",
+                        1.0,
+                        "s1",
+                        11.0,
+                        "s11",
+                        null,
+                        Seq(1, 1)),
+                      Row(
+                        92.0,
+                        "Bob",
+                        Row("Street 2", "City 2"),
+                        "Biology",
+                        null,
+                        null,
+                        22.0,
+                        "s22",
+                        22,
+                        Seq(2)),
+                      Row(
+                        95.0,
+                        "Cathy",
+                        Row("Street 3", "City 3"),
+                        "History",
+                        3.0,
+                        "s3",
+                        null,
+                        null,
+                        33,
+                        Seq(3, 3))
+                    )
+                  )
+                }
             }
         }
     }

Reply via email to