This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f033cee04f [spark][infra] run spark integration tests in CI. (#5590)
f033cee04f is described below
commit f033cee04ff557102f3c6eb0cae4d1fefed09916
Author: Yujiang Zhong <[email protected]>
AuthorDate: Fri Jul 25 16:28:00 2025 +0800
[spark][infra] run spark integration tests in CI. (#5590)
---
.github/workflows/utitcase-spark-3.x.yml | 2 +-
.github/workflows/utitcase-spark-4.x.yml | 2 +-
.../table/source/snapshot/TimeTravelUtil.java | 2 +
.../org/apache/paimon/spark/SparkReadITCase.java | 91 +++++++++++-----------
.../org/apache/paimon/spark/SparkReadTestBase.java | 12 +++
.../paimon/spark/SparkSchemaEvolutionITCase.java | 71 +++++++++--------
.../apache/paimon/spark/SparkTimeTravelITCase.java | 3 +-
.../org/apache/paimon/spark/RowTestHelper.scala | 53 +++++++++++++
8 files changed, 154 insertions(+), 82 deletions(-)
diff --git a/.github/workflows/utitcase-spark-3.x.yml
b/.github/workflows/utitcase-spark-3.x.yml
index de8b3deaeb..1a23b55b58 100644
--- a/.github/workflows/utitcase-spark-3.x.yml
+++ b/.github/workflows/utitcase-spark-3.x.yml
@@ -59,6 +59,6 @@ jobs:
test_modules+="org.apache.paimon:paimon-spark-${suffix},"
done
test_modules="${test_modules%,}"
- mvn -T 2C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone
+ mvn -T 2C -B verify -pl "${test_modules}"
-Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/utitcase-spark-4.x.yml
b/.github/workflows/utitcase-spark-4.x.yml
index 06495b6975..fd1365aabe 100644
--- a/.github/workflows/utitcase-spark-4.x.yml
+++ b/.github/workflows/utitcase-spark-4.x.yml
@@ -59,6 +59,6 @@ jobs:
test_modules+="org.apache.paimon:paimon-spark-${suffix},"
done
test_modules="${test_modules%,}"
- mvn -T 2C -B test -pl "${test_modules}"
-Duser.timezone=$jvm_timezone -Pspark4,flink1
+ mvn -T 2C -B verify -pl "${test_modules}"
-Duser.timezone=$jvm_timezone -Pspark4,flink1
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
index 20802f961f..bdef244230 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
@@ -147,6 +147,8 @@ public class TimeTravelUtil {
} else if (version.chars().allMatch(Character::isDigit)) {
options.set(SCAN_SNAPSHOT_ID.key(), version);
} else {
+ // by here, the scan version should be a tag.
+ options.set(SCAN_TAG_NAME.key(), version);
throw new RuntimeException("Cannot find a time travel version for
" + version);
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index a96954dcdf..25d7a39622 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -592,8 +592,9 @@ public class SparkReadITCase extends SparkReadTestBase {
Dataset<Row> dataset =
spark.read().format("paimon").load(tablePath.toString());
assertThat(dataset.select("order_id", "buyer_id",
"dt").collectAsList().toString())
.isEqualTo("[[1,10,2022-07-20]]");
- assertThat(dataset.select("coupon_info").collectAsList().toString())
- .isEqualTo("[[WrappedArray(loyalty_discount,
shipping_discount)]]");
+
+ RowTestHelper.checkRowEquals(
+ dataset.select("coupon_info"), row(array("loyalty_discount",
"shipping_discount")));
// test drop table
assertThat(
@@ -647,37 +648,38 @@ public class SparkReadITCase extends SparkReadTestBase {
}
private void innerTestNestedType(Dataset<Row> dataset) {
- List<Row> results = dataset.collectAsList();
- assertThat(results.toString())
- .isEqualTo(
- "[[1,WrappedArray(AAA,
BBB),[[1.0,WrappedArray(null)],1]], "
- + "[2,WrappedArray(CCC,
DDD),[[null,WrappedArray(true)],null]], "
- + "[3,WrappedArray(null,
null),[[2.0,WrappedArray(true, false)],2]], "
- + "[4,WrappedArray(null,
EEE),[[3.0,WrappedArray(true, false, true)],3]]]");
+ RowTestHelper.checkRowEquals(
+ dataset,
+ Arrays.asList(
+ row(1, array("AAA", "BBB"), row(row(1.0, array(null)),
1L)),
+ row(2, array("CCC", "DDD"), row(row(null,
array(true)), null)),
+ row(3, array(null, null), row(row(2.0, array(true,
false)), 2L)),
+ row(4, array(null, "EEE"), row(row(3.0, array(true,
false, true)), 3L))));
- results = dataset.select("a").collectAsList();
- assertThat(results.toString()).isEqualTo("[[1], [2], [3], [4]]");
+ RowTestHelper.checkRowEquals(
+ dataset.select("a"), Arrays.asList(row(1), row(2), row(3),
row(4)));
- results = dataset.select("c.c1").collectAsList();
- assertThat(results.toString())
- .isEqualTo(
- "[[[1.0,WrappedArray(null)]],
[[null,WrappedArray(true)]], "
- + "[[2.0,WrappedArray(true, false)]], "
- + "[[3.0,WrappedArray(true, false, true)]]]");
+ RowTestHelper.checkRowEquals(
+ dataset.select("c.c1"),
+ Arrays.asList(
+ row(row(1.0, array(null))),
+ row(row(null, array(true))),
+ row(row(2.0, array(true, false))),
+ row(row(3.0, array(true, false, true)))));
- results = dataset.select("c.c2").collectAsList();
- assertThat(results.toString()).isEqualTo("[[1], [null], [2], [3]]");
+ RowTestHelper.checkRowEquals(
+ dataset.select("c.c2"), Arrays.asList(row(1), row(null),
row(2), row(3)));
- results = dataset.select("c.c1.c11").collectAsList();
- assertThat(results.toString()).isEqualTo("[[1.0], [null], [2.0],
[3.0]]");
+ RowTestHelper.checkRowEquals(
+ dataset.select("c.c1.c11"), Arrays.asList(row(1.0), row(null),
row(2.0), row(3.0)));
- results = dataset.select("c.c1.c12").collectAsList();
- assertThat(results.toString())
- .isEqualTo(
- "[[WrappedArray(null)], "
- + "[WrappedArray(true)], "
- + "[WrappedArray(true, false)], "
- + "[WrappedArray(true, false, true)]]");
+ RowTestHelper.checkRowEquals(
+ dataset.select("c.c1.c12"),
+ Arrays.asList(
+ row(array(null)),
+ row(array(true)),
+ row(array(true, false)),
+ row(array(true, false, true))));
}
private void innerTestSimpleTypeFilterPushDown(Dataset<Row> dataset) {
@@ -689,28 +691,27 @@ public class SparkReadITCase extends SparkReadTestBase {
}
private void innerTestNestedTypeFilterPushDown(Dataset<Row> dataset) {
- List<Row> results = dataset.filter("a <
4").select("a").collectAsList();
- assertThat(results.toString()).isEqualTo("[[1], [2], [3]]");
+ RowTestHelper.checkRowEquals(
+ dataset.filter("a < 4").select("a"), Arrays.asList(row(1),
row(2), row(3)));
- results = dataset.filter("array_contains(b,
'AAA')").select("b").collectAsList();
- assertThat(results.toString()).isEqualTo("[[WrappedArray(AAA, BBB)]]");
+ RowTestHelper.checkRowEquals(
+ dataset.filter("array_contains(b, 'AAA')").select("b"),
row(array("AAA", "BBB")));
- results = dataset.filter("c.c1.c11 is null").select("a",
"c").collectAsList();
-
assertThat(results.toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]");
+ RowTestHelper.checkRowEquals(
+ dataset.filter("c.c1.c11 is null").select("a", "c"),
+ row(2, row(row(null, array(true)), null)));
- results = dataset.filter("c.c1.c11 = 1.0").select("a",
"c.c1").collectAsList();
-
assertThat(results.toString()).isEqualTo("[[1,[1.0,WrappedArray(null)]]]");
+ RowTestHelper.checkRowEquals(
+ dataset.filter("c.c1.c11 = 1.0").select("a", "c.c1"),
+ row(1, row(1.0, array(null))));
- results = dataset.filter("c.c2 is null").select("a",
"c").collectAsList();
-
assertThat(results.toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]");
+ RowTestHelper.checkRowEquals(
+ dataset.filter("c.c2 is null").select("a", "c"),
+ row(2, row(row(null, array(true)), null)));
- results =
- dataset.filter("array_contains(c.c1.c12, false)")
- .select("a", "c.c1.c12", "c.c2")
- .collectAsList();
- assertThat(results.toString())
- .isEqualTo(
- "[[3,WrappedArray(true, false),2],
[4,WrappedArray(true, false, true),3]]");
+ RowTestHelper.checkRowEquals(
+ dataset.filter("array_contains(c.c1.c12, false)").select("a",
"c.c1.c12", "c.c2"),
+ Arrays.asList(row(3, array(true, false), 2), row(4,
array(true, false, true), 3)));
}
@Test
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index a4983325c4..27a7557ab7 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -51,6 +51,8 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import scala.collection.Seq;
+
import static org.assertj.core.api.Assertions.assertThat;
/** Base tests for spark read. */
@@ -251,4 +253,14 @@ public abstract class SparkReadTestBase {
protected String defaultShowCreateStringWithNonNullColumn(String table) {
return showCreateString(table, "a INT NOT NULL", "b BIGINT NOT NULL",
"c STRING");
}
+
+ protected static Row row(Object... values) {
+ Object[] array = values != null ? values : new Object[] {null};
+ return RowTestHelper.row(array);
+ }
+
+ protected static Seq array(Object... values) {
+ Object[] array = values != null ? values : new Object[] {null};
+ return RowTestHelper.seq(array);
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index 933658b004..ea837960cc 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -183,10 +184,16 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
Dataset<Row> table = spark.table("testRenameColumn");
results = table.select("bb", "c").collectAsList();
assertThat(results.toString()).isEqualTo("[[2,1], [6,3]]");
+
assertThatThrownBy(() -> table.select("b", "c"))
.isInstanceOf(AnalysisException.class)
+ // Messages vary across different Spark versions, only
validating the common parts.
+ // Spark 4: A column, variable, or function parameter with
name `b` cannot be
+ // resolved. Did you mean one of the following? [`a`, `bb`,
`c`]
+ // Spark 3.5 and earlier versions: A column or function
parameter with name `b`
+ // cannot be resolved. Did you mean one of the following?
[`a`, `bb`, `c`]
.hasMessageContaining(
- "A column or function parameter with name `b` cannot
be resolved. Did you mean one of the following?");
+ "name `b` cannot be resolved. Did you mean one of the
following? [`a`, `bb`, `c`]");
}
@Test
@@ -388,13 +395,15 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
"Cannot move itself for column b"));
// missing column
+ // Messages vary across different Spark versions and there are no
common parts, only
+ // validate the exception class
createTable("tableMissing");
assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissing ALTER
COLUMN d FIRST"))
- .hasMessageContaining("Missing field d in table
paimon.default.tableMissing");
+ .isInstanceOf(AnalysisException.class);
createTable("tableMissingAfter");
assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissingAfter
ALTER COLUMN a AFTER d"))
- .hasMessageContaining("Missing field d in table
paimon.default.tableMissingAfter");
+ .isInstanceOf(AnalysisException.class);
}
@Test
@@ -806,13 +815,12 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
+ tableName
+ " VALUES (1, ARRAY(STRUCT('apple', 100),
STRUCT('banana', 101))), "
+ "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog',
201)))");
- assertThat(
- spark.sql("SELECT * FROM paimon.default." +
tableName).collectAsList()
- .stream()
- .map(Row::toString))
- .containsExactlyInAnyOrder(
- "[1,WrappedArray([apple,100], [banana,101])]",
- "[2,WrappedArray([cat,200], [dog,201])]");
+
+ RowTestHelper.checkRowEquals(
+ spark.sql("SELECT * FROM paimon.default." + tableName),
+ Arrays.asList(
+ row(1, array(row("apple", 100), row("banana", 101))),
+ row(2, array(row("cat", 200), row("dog", 201)))));
spark.sql(
"ALTER TABLE paimon.default."
@@ -824,14 +832,13 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
+ tableName
+ " VALUES (1, ARRAY(STRUCT(110, 'APPLE'), STRUCT(111,
'BANANA'))), "
+ "(3, ARRAY(STRUCT(310, 'FLOWER')))");
- assertThat(
- spark.sql("SELECT * FROM paimon.default." +
tableName).collectAsList()
- .stream()
- .map(Row::toString))
- .containsExactlyInAnyOrder(
- "[1,WrappedArray([110,APPLE], [111,BANANA])]",
- "[2,WrappedArray([200,null], [201,null])]",
- "[3,WrappedArray([310,FLOWER])]");
+
+ RowTestHelper.checkRowEquals(
+ spark.sql("SELECT * FROM paimon.default." + tableName),
+ Arrays.asList(
+ row(1, array(row(110, "APPLE"), row(111, "BANANA"))),
+ row(2, array(row(200, null), row(201, null))),
+ row(3, array(row(310, "FLOWER")))));
}
@ParameterizedTest()
@@ -1012,13 +1019,12 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
+ tableName
+ " VALUES (1, ARRAY(STRUCT('apple', 100),
STRUCT('banana', 101))), "
+ "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog',
201)))");
- assertThat(
- spark.sql("SELECT * FROM paimon.default." +
tableName).collectAsList()
- .stream()
- .map(Row::toString))
- .containsExactlyInAnyOrder(
- "[1,WrappedArray([apple,100], [banana,101])]",
- "[2,WrappedArray([cat,200], [dog,201])]");
+
+ RowTestHelper.checkRowEquals(
+ spark.sql("SELECT * FROM paimon.default." + tableName),
+ Arrays.asList(
+ row(1, array(row("apple", 100), row("banana", 101))),
+ row(2, array(row("cat", 200), row("dog", 201)))));
spark.sql(
"ALTER TABLE paimon.default."
@@ -1029,14 +1035,13 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
+ tableName
+ " VALUES (1, ARRAY(STRUCT('APPLE', 1000000000000),
STRUCT('BANANA', 111))), "
+ "(3, ARRAY(STRUCT('FLOWER', 3000000000000)))");
- assertThat(
- spark.sql("SELECT * FROM paimon.default." +
tableName).collectAsList()
- .stream()
- .map(Row::toString))
- .containsExactlyInAnyOrder(
- "[1,WrappedArray([APPLE,1000000000000],
[BANANA,111])]",
- "[2,WrappedArray([cat,200], [dog,201])]",
- "[3,WrappedArray([FLOWER,3000000000000])]");
+
+ RowTestHelper.checkRowEquals(
+ spark.sql("SELECT * FROM paimon.default." + tableName),
+ Arrays.asList(
+ row(1, array(row("APPLE", 1000000000000L),
row("BANANA", 111))),
+ row(2, array(row("cat", 200), row("dog", 201))),
+ row(3, array(row("FLOWER", 3000000000000L)))));
}
@ParameterizedTest()
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
index 8e13cb18f5..d08142995f 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
@@ -245,8 +245,7 @@ public class SparkTimeTravelITCase extends
SparkReadTestBase {
() -> spark.sql("SELECT * FROM t VERSION AS OF
'unknown'").collectAsList())
.satisfies(
anyCauseMatches(
- RuntimeException.class,
- "Cannot find a time travel version for
unknown"));
+ IllegalArgumentException.class, "Tag 'unknown'
doesn't exist"));
}
@Test
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/RowTestHelper.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/RowTestHelper.scala
new file mode 100644
index 0000000000..bfab0ee7c9
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/RowTestHelper.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.QueryTest.checkAnswer
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConverters._
+
+/**
+ * A helper class for facilitating the comparison of Spark Row objects in Java
unit tests, which
+ * leverages QueryTest.checkAnswer for the comparison.
+ */
+class RowTestHelper extends QueryTest {
+ override protected def spark: SparkSession = {
+ throw new UnsupportedOperationException("Not supported")
+ }
+}
+
+object RowTestHelper {
+ def checkRowEquals(df: DataFrame, expectedRows: java.util.List[Row]): Unit =
{
+ checkAnswer(df, expectedRows)
+ }
+
+ def checkRowEquals(df: DataFrame, expectedRow: Row): Unit = {
+ checkAnswer(df, Seq(expectedRow))
+ }
+
+ def row(values: Array[Any]): Row = {
+ Row.fromSeq(values)
+ }
+
+ def seq(values: Array[Any]): Seq[Any] = values.toSeq
+}