This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 32d05f31e [core][spark] Fix read nested column with pk table (#4476)
32d05f31e is described below
commit 32d05f31e1aae54e7d7504fc5f66cbde4a111a91
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Nov 8 10:37:15 2024 +0800
[core][spark] Fix read nested column with pk table (#4476)
This closes #4476.
---
.../paimon/operation/MergeFileSplitRead.java | 26 +--
.../org/apache/paimon/spark/SparkReadITCase.java | 27 +++
.../apache/paimon/spark/sql/PaimonQueryTest.scala | 209 +++++++++++----------
3 files changed, 149 insertions(+), 113 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index c21c3683c..23a3a576e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -44,6 +44,7 @@ import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.Projection;
@@ -131,11 +132,9 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
@Override
public MergeFileSplitRead withReadType(RowType readType) {
// todo: replace projectedFields with readType
+ RowType tableRowType = tableSchema.logicalRowType();
int[][] projectedFields =
- Arrays.stream(
- tableSchema
- .logicalRowType()
-
.getFieldIndices(readType.getFieldNames()))
+
Arrays.stream(tableRowType.getFieldIndices(readType.getFieldNames()))
.mapToObj(i -> new int[] {i})
.toArray(int[][]::new);
int[][] newProjectedFields = projectedFields;
@@ -161,13 +160,18 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
this.pushdownProjection = projection.pushdownProjection;
this.outerProjection = projection.outerProjection;
if (pushdownProjection != null) {
- RowType pushdownRowType =
- tableSchema
- .logicalRowType()
- .project(
- Arrays.stream(pushdownProjection)
- .mapToInt(arr -> arr[0])
- .toArray());
+ List<DataField> tableFields = tableRowType.getFields();
+ List<DataField> readFields = readType.getFields();
+ List<DataField> finalReadFields = new ArrayList<>();
+ for (int i : Arrays.stream(pushdownProjection).mapToInt(arr ->
arr[0]).toArray()) {
+ DataField requiredField = tableFields.get(i);
+ finalReadFields.add(
+ readFields.stream()
+ .filter(x ->
x.name().equals(requiredField.name()))
+ .findFirst()
+ .orElse(requiredField));
+ }
+ RowType pushdownRowType = new RowType(finalReadFields);
readerFactoryBuilder.withReadValueType(pushdownRowType);
mergeSorter.setProjectedValueType(pushdownRowType);
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index be6264f7b..32c3498a7 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -440,6 +440,33 @@ public class SparkReadITCase extends SparkReadTestBase {
innerTest("MyTable6", false, true);
}
+ @Test
+ public void testReadNestedColumnTable() {
+ String tableName = "testAddNestedColumnTable";
+ spark.sql(
+ "CREATE TABLE paimon.default."
+ + tableName
+ + " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1:
STRING, f2: INT>>) "
+ + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k',
'file.format' = 'parquet')");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, STRUCT(10, STRUCT('apple', 100)))");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (2, STRUCT(20, STRUCT('banana', 200)))");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, STRUCT(30, STRUCT('cat', 100)))");
+ assertThat(
+ spark.sql("SELECT v.f2.f1, k FROM paimon.default." +
tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[cat,1]", "[banana,2]");
+ }
+
private void innerTest(String tableName, boolean hasPk, boolean
partitioned) {
String ddlTemplate =
"CREATE TABLE default.%s (\n"
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
index b08b342ca..fc2f9ac0c 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
@@ -219,108 +219,113 @@ class PaimonQueryTest extends PaimonSparkTestBase {
}
test("Paimon Query: query nested cols") {
- fileFormats.foreach {
- fileFormat =>
- bucketModes.foreach {
- bucketMode =>
- val bucketProp = if (bucketMode != -1) {
- s", 'bucket-key'='name', 'bucket' = '$bucketMode' "
- } else {
- ""
- }
- withTable("students") {
- sql(s"""
- |CREATE TABLE students (
- | name STRING,
- | course STRUCT<course_name: STRING, grade: DOUBLE>,
- | teacher STRUCT<name: STRING, address: STRUCT<street:
STRING, city: STRING>>,
- | m MAP<STRING, STRUCT<s:STRING, i INT, d: DOUBLE>>,
- | l ARRAY<STRUCT<s:STRING, i INT, d: DOUBLE>>,
- | s STRUCT<s1: STRING, s2: MAP<STRING, STRUCT<s:STRING,
i INT, a: ARRAY<STRUCT<s:STRING, i INT, d: DOUBLE>>>>>,
- | m2 MAP<STRUCT<s:STRING, i INT, d: DOUBLE>,
STRUCT<s:STRING, i INT, d: DOUBLE>>
- |) USING paimon
- |TBLPROPERTIES ('file.format'='$fileFormat' $bucketProp)
- |""".stripMargin)
-
- sql(s"""
- |INSERT INTO students VALUES (
- | 'Alice',
- | STRUCT('Math', 85.0),
- | STRUCT('John', STRUCT('Street 1', 'City 1')),
- | MAP('k1', STRUCT('s1', 1, 1.0), 'k2', STRUCT('s11',
11, 11.0)),
- | ARRAY(STRUCT('s1', 1, 1.0), STRUCT('s11', 11, 11.0)),
- | STRUCT('a', MAP('k1', STRUCT('s1', 1,
ARRAY(STRUCT('s1', 1, 1.0))), 'k3', STRUCT('s11', 11, ARRAY(STRUCT('s11', 11,
11.0))))),
- | MAP(STRUCT('k1', 1, 1.0), STRUCT('s1', 1, 1.0),
STRUCT('k2', 1, 1.0), STRUCT('s11', 11, 11.0)))
- |""".stripMargin)
-
- sql(s"""
- |INSERT INTO students VALUES (
- | 'Bob',
- | STRUCT('Biology', 92.0),
- | STRUCT('Jane', STRUCT('Street 2', 'City 2')),
- | MAP('k2', STRUCT('s2', 2, 2.0)),
- | ARRAY(STRUCT('s2', 2, 2.0), STRUCT('s22', 22, 22.0)),
- | STRUCT('b', MAP('k2', STRUCT('s22', 22,
ARRAY(STRUCT('s22', 22, 22.0))))),
- | MAP(STRUCT('k2', 2, 2.0), STRUCT('s22', 22, 22.0)))
- |""".stripMargin)
-
- sql(s"""
- |INSERT INTO students VALUES (
- | 'Cathy',
- | STRUCT('History', 95.0),
- | STRUCT('Jane', STRUCT('Street 3', 'City 3')),
- | MAP('k1', STRUCT('s3', 3, 3.0), 'k2', STRUCT('s33',
33, 33.0)),
- | ARRAY(STRUCT('s3', 3, 3.0)),
- | STRUCT('c', MAP('k1', STRUCT('s3', 3,
ARRAY(STRUCT('s3', 3, 3.0))), 'k2', STRUCT('s33', 33, ARRAY(STRUCT('s33', 33,
33.0))))),
- | MAP(STRUCT('k1', 3, 3.0), STRUCT('s3', 3, 3.0),
STRUCT('k2', 3, 3.0), STRUCT('s33', 33, 33.0)))
- |""".stripMargin)
-
- checkAnswer(
- sql(s"""
- |SELECT
- | course.grade, name, teacher.address,
course.course_name,
- | m['k1'].d, m['k1'].s,
- | l[1].d, l[1].s,
- | s.s2['k2'].a[0].i,
- | map_keys(m2).i
- |FROM students ORDER BY name
- |""".stripMargin),
- Seq(
- Row(
- 85.0,
- "Alice",
- Row("Street 1", "City 1"),
- "Math",
- 1.0,
- "s1",
- 11.0,
- "s11",
- null,
- Seq(1, 1)),
- Row(
- 92.0,
- "Bob",
- Row("Street 2", "City 2"),
- "Biology",
- null,
- null,
- 22.0,
- "s22",
- 22,
- Seq(2)),
- Row(
- 95.0,
- "Cathy",
- Row("Street 3", "City 3"),
- "History",
- 3.0,
- "s3",
- null,
- null,
- 33,
- Seq(3, 3))
- )
- )
+ withPk.foreach {
+ hasPk =>
+ fileFormats.foreach {
+ fileFormat =>
+ bucketModes.foreach {
+ bucketMode =>
+ val key = if (hasPk) "primary-key" else "bucket-key"
+ val props = if (bucketMode != -1) {
+ s", '$key'='name', 'bucket' = '$bucketMode' "
+ } else {
+ ""
+ }
+ withTable("students") {
+ sql(s"""
+ |CREATE TABLE students (
+ | name STRING,
+ | course STRUCT<course_name: STRING, grade: DOUBLE>,
+ | teacher STRUCT<name: STRING, address:
STRUCT<street: STRING, city: STRING>>,
+ | m MAP<STRING, STRUCT<s:STRING, i INT, d: DOUBLE>>,
+ | l ARRAY<STRUCT<s:STRING, i INT, d: DOUBLE>>,
+ | s STRUCT<s1: STRING, s2: MAP<STRING,
STRUCT<s:STRING, i INT, a: ARRAY<STRUCT<s:STRING, i INT, d: DOUBLE>>>>>,
+ | m2 MAP<STRUCT<s:STRING, i INT, d: DOUBLE>,
STRUCT<s:STRING, i INT, d: DOUBLE>>
+ |) USING paimon
+ |TBLPROPERTIES ('file.format'='$fileFormat' $props)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO students VALUES (
+ | 'Alice',
+ | STRUCT('Math', 85.0),
+ | STRUCT('John', STRUCT('Street 1', 'City 1')),
+ | MAP('k1', STRUCT('s1', 1, 1.0), 'k2',
STRUCT('s11', 11, 11.0)),
+ | ARRAY(STRUCT('s1', 1, 1.0), STRUCT('s11', 11,
11.0)),
+ | STRUCT('a', MAP('k1', STRUCT('s1', 1,
ARRAY(STRUCT('s1', 1, 1.0))), 'k3', STRUCT('s11', 11, ARRAY(STRUCT('s11', 11,
11.0))))),
+ | MAP(STRUCT('k1', 1, 1.0), STRUCT('s1', 1, 1.0),
STRUCT('k2', 1, 1.0), STRUCT('s11', 11, 11.0)))
+ |""".stripMargin)
+
+ sql(
+ s"""
+ |INSERT INTO students VALUES (
+ | 'Bob',
+ | STRUCT('Biology', 92.0),
+ | STRUCT('Jane', STRUCT('Street 2', 'City 2')),
+ | MAP('k2', STRUCT('s2', 2, 2.0)),
+ | ARRAY(STRUCT('s2', 2, 2.0), STRUCT('s22', 22, 22.0)),
+ | STRUCT('b', MAP('k2', STRUCT('s22', 22,
ARRAY(STRUCT('s22', 22, 22.0))))),
+ | MAP(STRUCT('k2', 2, 2.0), STRUCT('s22', 22, 22.0)))
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO students VALUES (
+ | 'Cathy',
+ | STRUCT('History', 95.0),
+ | STRUCT('Jane', STRUCT('Street 3', 'City 3')),
+ | MAP('k1', STRUCT('s3', 3, 3.0), 'k2',
STRUCT('s33', 33, 33.0)),
+ | ARRAY(STRUCT('s3', 3, 3.0)),
+ | STRUCT('c', MAP('k1', STRUCT('s3', 3,
ARRAY(STRUCT('s3', 3, 3.0))), 'k2', STRUCT('s33', 33, ARRAY(STRUCT('s33', 33,
33.0))))),
+ | MAP(STRUCT('k1', 3, 3.0), STRUCT('s3', 3, 3.0),
STRUCT('k2', 3, 3.0), STRUCT('s33', 33, 33.0)))
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"""
+ |SELECT
+ | course.grade, name, teacher.address,
course.course_name,
+ | m['k1'].d, m['k1'].s,
+ | l[1].d, l[1].s,
+ | s.s2['k2'].a[0].i,
+ | map_keys(m2).i
+ |FROM students ORDER BY name
+ |""".stripMargin),
+ Seq(
+ Row(
+ 85.0,
+ "Alice",
+ Row("Street 1", "City 1"),
+ "Math",
+ 1.0,
+ "s1",
+ 11.0,
+ "s11",
+ null,
+ Seq(1, 1)),
+ Row(
+ 92.0,
+ "Bob",
+ Row("Street 2", "City 2"),
+ "Biology",
+ null,
+ null,
+ 22.0,
+ "s22",
+ 22,
+ Seq(2)),
+ Row(
+ 95.0,
+ "Cathy",
+ Row("Street 3", "City 3"),
+ "History",
+ 3.0,
+ "s3",
+ null,
+ null,
+ 33,
+ Seq(3, 3))
+ )
+ )
+ }
}
}
}