This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new fb5c8e452b [GLUTEN-8799][VL]Support Iceberg with Gluten test framework
(#8800)
fb5c8e452b is described below
commit fb5c8e452b32217dc14596686628bd9e221f03df
Author: Jin Chengcheng <[email protected]>
AuthorDate: Thu Mar 6 15:53:45 2025 +0000
[GLUTEN-8799][VL]Support Iceberg with Gluten test framework (#8800)
---
backends-velox/pom.xml | 88 +++
.../execution/TestStoragePartitionedJoins.java | 665 +++++++++++++++++++++
.../org/apache/iceberg/spark/SparkTestBase.java | 291 +++++++++
3 files changed, 1044 insertions(+)
diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml
index ff1314a627..e19cd63330 100755
--- a/backends-velox/pom.xml
+++ b/backends-velox/pom.xml
@@ -100,6 +100,27 @@
<version>${iceberg.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+
<artifactId>iceberg-spark-${sparkbundle.version}_${scala.binary.version}</artifactId>
+ <version>${iceberg.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-hive-metastore</artifactId>
+ <version>${iceberg.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-api</artifactId>
+ <version>${iceberg.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
<profile>
@@ -147,6 +168,73 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>spark-3.2</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <testExcludes>
+ <testExclude>**/org/apache/gluten/spark34/**</testExclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>spark-3.3</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <testExcludes>
+ <testExclude>**/org/apache/gluten/spark34/**</testExclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>spark-3.5</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <testExcludes>
+ <testExclude>**/org/apache/gluten/spark34/**</testExclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>default</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <testExcludes>
+ <testExclude>**/org/apache/gluten/spark34/**</testExclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
<dependencies>
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/execution/TestStoragePartitionedJoins.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/execution/TestStoragePartitionedJoins.java
new file mode 100644
index 0000000000..8155b630b1
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/execution/TestStoragePartitionedJoins.java
@@ -0,0 +1,665 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.execution;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.PlanningMode;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSQLProperties;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog {
+
+ @Parameterized.Parameters(name = "planningMode = {0}")
+ public static Object[] parameters() {
+ return new Object[] {LOCAL, DISTRIBUTED};
+ }
+
+ private static final String OTHER_TABLE_NAME = "other_table";
+
+ // open file cost and split size are set as 16 MB to produce a split per file
+ private static final Map<String, String> TABLE_PROPERTIES =
+ ImmutableMap.of(
+ TableProperties.SPLIT_SIZE, "16777216",
TableProperties.SPLIT_OPEN_FILE_COST, "16777216");
+
+ // only v2 bucketing and preserve data grouping properties have to be
enabled to trigger SPJ
+ // other properties are only to simplify testing and validation
+ private static final Map<String, String> ENABLED_SPJ_SQL_CONF =
+ ImmutableMap.of(
+ SQLConf.V2_BUCKETING_ENABLED().key(),
+ "true",
+ SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED().key(),
+ "true",
+ SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(),
+ "false",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
+ "false",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(),
+ "-1",
+ SparkSQLProperties.PRESERVE_DATA_GROUPING,
+ "true");
+
+ private static final Map<String, String> DISABLED_SPJ_SQL_CONF =
+ ImmutableMap.of(
+ SQLConf.V2_BUCKETING_ENABLED().key(),
+ "false",
+ SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(),
+ "false",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
+ "false",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(),
+ "-1",
+ SparkSQLProperties.PRESERVE_DATA_GROUPING,
+ "true");
+
+ private final PlanningMode planningMode;
+
+ public TestStoragePartitionedJoins(PlanningMode planningMode) {
+ this.planningMode = planningMode;
+ }
+
+ @BeforeClass
+ public static void setupSparkConf() {
+ spark.conf().set("spark.sql.shuffle.partitions", "4");
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql("DROP TABLE IF EXISTS %s", tableName(OTHER_TABLE_NAME));
+ }
+
+ // TODO: add tests for truncate transforms once SPARK-40295 is released
+
+ @Test
+ public void testJoinsWithBucketingOnByteColumn() throws NoSuchTableException
{
+ checkJoin("byte_col", "TINYINT", "bucket(4, byte_col)");
+ }
+
+ @Test
+ public void testJoinsWithBucketingOnShortColumn() throws
NoSuchTableException {
+ checkJoin("short_col", "SMALLINT", "bucket(4, short_col)");
+ }
+
+ @Test
+ public void testJoinsWithBucketingOnIntColumn() throws NoSuchTableException {
+ checkJoin("int_col", "INT", "bucket(16, int_col)");
+ }
+
+ @Test
+ public void testJoinsWithBucketingOnLongColumn() throws NoSuchTableException
{
+ checkJoin("long_col", "BIGINT", "bucket(16, long_col)");
+ }
+
+ @Test
+ public void testJoinsWithBucketingOnTimestampColumn() throws
NoSuchTableException {
+ checkJoin("timestamp_col", "TIMESTAMP", "bucket(16, timestamp_col)");
+ }
+
+ @Test
+ public void testJoinsWithBucketingOnTimestampNtzColumn() throws
NoSuchTableException {
+ checkJoin("timestamp_col", "TIMESTAMP_NTZ", "bucket(16, timestamp_col)");
+ }
+
+ @Test
+ public void testJoinsWithBucketingOnDateColumn() throws NoSuchTableException
{
+ checkJoin("date_col", "DATE", "bucket(8, date_col)");
+ }
+
+ @Test
+ public void testJoinsWithBucketingOnDecimalColumn() throws
NoSuchTableException {
+ checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)");
+ }
+
+ @Test
+ public void testJoinsWithBucketingOnBinaryColumn() throws
NoSuchTableException {
+ checkJoin("binary_col", "BINARY", "bucket(8, binary_col)");
+ }
+
+ @Test
+ public void testJoinsWithYearsOnTimestampColumn() throws
NoSuchTableException {
+ checkJoin("timestamp_col", "TIMESTAMP", "years(timestamp_col)");
+ }
+
+ @Test
+ public void testJoinsWithYearsOnTimestampNtzColumn() throws
NoSuchTableException {
+ checkJoin("timestamp_col", "TIMESTAMP_NTZ", "years(timestamp_col)");
+ }
+
+ @Test
+ public void testJoinsWithYearsOnDateColumn() throws NoSuchTableException {
+ checkJoin("date_col", "DATE", "years(date_col)");
+ }
+
+ @Test
+ public void testJoinsWithMonthsOnTimestampColumn() throws
NoSuchTableException {
+ checkJoin("timestamp_col", "TIMESTAMP", "months(timestamp_col)");
+ }
+
+ @Test
+ public void testJoinsWithMonthsOnTimestampNtzColumn() throws
NoSuchTableException {
+ checkJoin("timestamp_col", "TIMESTAMP_NTZ", "months(timestamp_col)");
+ }
+
+ @Test
+ public void testJoinsWithMonthsOnDateColumn() throws NoSuchTableException {
+ checkJoin("date_col", "DATE", "months(date_col)");
+ }
+
+ @Test
+ public void testJoinsWithDaysOnTimestampColumn() throws NoSuchTableException
{
+ checkJoin("timestamp_col", "TIMESTAMP", "days(timestamp_col)");
+ }
+
+ @Test
+ public void testJoinsWithDaysOnTimestampNtzColumn() throws
NoSuchTableException {
+ checkJoin("timestamp_col", "TIMESTAMP_NTZ", "days(timestamp_col)");
+ }
+
+ @Test
+ public void testJoinsWithDaysOnDateColumn() throws NoSuchTableException {
+ checkJoin("date_col", "DATE", "days(date_col)");
+ }
+
+ @Test
+ public void testJoinsWithHoursOnTimestampColumn() throws
NoSuchTableException {
+ checkJoin("timestamp_col", "TIMESTAMP", "hours(timestamp_col)");
+ }
+
+ @Test
+ public void testJoinsWithHoursOnTimestampNtzColumn() throws
NoSuchTableException {
+ checkJoin("timestamp_col", "TIMESTAMP_NTZ", "hours(timestamp_col)");
+ }
+
+ @Test
+ public void testJoinsWithMultipleTransformTypes() throws
NoSuchTableException {
+ String createTableStmt =
+ "CREATE TABLE %s ("
+ + " id BIGINT, int_col INT, date_col1 DATE, date_col2
DATE, date_col3 DATE,"
+ + " timestamp_col TIMESTAMP, string_col STRING, dep
STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY ("
+ + " years(date_col1), months(date_col2), days(date_col3),
hours(timestamp_col), "
+ + " bucket(8, int_col), dep)"
+ + "TBLPROPERTIES (%s)";
+
+ sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES));
+ sql(createTableStmt, tableName(OTHER_TABLE_NAME),
tablePropsAsString(TABLE_PROPERTIES));
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ Dataset<Row> dataDF = randomDataDF(table.schema(), 16);
+
+ // write to the first table 1 time to generate 1 file per partition
+ append(tableName, dataDF);
+
+ // write to the second table 2 times to generate 2 files per partition
+ append(tableName(OTHER_TABLE_NAME), dataDF);
+ append(tableName(OTHER_TABLE_NAME), dataDF);
+
+ // Spark SPJ support is limited at the moment and requires all source
partitioning columns,
+ // which were projected in the query, to be part of the join condition
+ // suppose a table is partitioned by `p1`, `bucket(8, pk)`
+ // queries covering `p1` and `pk` columns must include equality predicates
+ // on both `p1` and `pk` to benefit from SPJ
+ // this is a temporary Spark limitation that will be removed in a future
release
+
+ assertPartitioningAwarePlan(
+ 1, /* expected num of shuffles with SPJ */
+ 3, /* expected num of shuffles without SPJ */
+ "SELECT t1.id "
+ + "FROM %s t1 "
+ + "INNER JOIN %s t2 "
+ + "ON t1.id = t2.id AND t1.dep = t2.dep "
+ + "ORDER BY t1.id",
+ tableName,
+ tableName(OTHER_TABLE_NAME));
+
+ assertPartitioningAwarePlan(
+ 1, /* expected num of shuffles with SPJ */
+ 3, /* expected num of shuffles without SPJ */
+ "SELECT t1.id, t1.int_col, t1.date_col1 "
+ + "FROM %s t1 "
+ + "INNER JOIN %s t2 "
+ + "ON t1.id = t2.id AND t1.int_col = t2.int_col AND
t1.date_col1 = t2.date_col1 "
+ + "ORDER BY t1.id, t1.int_col, t1.date_col1",
+ tableName,
+ tableName(OTHER_TABLE_NAME));
+
+ assertPartitioningAwarePlan(
+ 1, /* expected num of shuffles with SPJ */
+ 3, /* expected num of shuffles without SPJ */
+ "SELECT t1.id, t1.timestamp_col, t1.string_col "
+ + "FROM %s t1 "
+ + "INNER JOIN %s t2 "
+ + "ON t1.id = t2.id AND t1.timestamp_col =
t2.timestamp_col AND t1.string_col = t2.string_col "
+ + "ORDER BY t1.id, t1.timestamp_col, t1.string_col",
+ tableName,
+ tableName(OTHER_TABLE_NAME));
+
+ assertPartitioningAwarePlan(
+ 1, /* expected num of shuffles with SPJ */
+ 3, /* expected num of shuffles without SPJ */
+ "SELECT t1.id, t1.date_col1, t1.date_col2, t1.date_col3 "
+ + "FROM %s t1 "
+ + "INNER JOIN %s t2 "
+ + "ON t1.id = t2.id AND t1.date_col1 = t2.date_col1 AND
t1.date_col2 = t2.date_col2 AND t1.date_col3 = t2.date_col3 "
+ + "ORDER BY t1.id, t1.date_col1, t1.date_col2,
t1.date_col3",
+ tableName,
+ tableName(OTHER_TABLE_NAME));
+
+ assertPartitioningAwarePlan(
+ 1, /* expected num of shuffles with SPJ */
+ 3, /* expected num of shuffles without SPJ */
+ "SELECT t1.id, t1.int_col, t1.timestamp_col, t1.dep "
+ + "FROM %s t1 "
+ + "INNER JOIN %s t2 "
+ + "ON t1.id = t2.id AND t1.int_col = t2.int_col AND
t1.timestamp_col = t2.timestamp_col AND t1.dep = t2.dep "
+ + "ORDER BY t1.id, t1.int_col, t1.timestamp_col, t1.dep",
+ tableName,
+ tableName(OTHER_TABLE_NAME));
+ }
+
+ @Test
+ public void testJoinsWithCompatibleSpecEvolution() {
+ // create a table with an empty spec
+ sql(
+ "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+ + "USING iceberg "
+ + "TBLPROPERTIES (%s)",
+ tableName, tablePropsAsString(TABLE_PROPERTIES));
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ // evolve the spec in the first table by adding `dep`
+ table.updateSpec().addField("dep").commit();
+
+ // insert data into the first table partitioned by `dep`
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName);
+
+ // evolve the spec in the first table by adding `bucket(int_col, 8)`
+ table.updateSpec().addField(Expressions.bucket("int_col", 8)).commit();
+
+ // insert data into the first table partitioned by `dep`, `bucket(8,
int_col)`
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO %s VALUES (2L, 200, 'hr')", tableName);
+
+ // create another table partitioned by `other_dep`
+ sql(
+ "CREATE TABLE %s (other_id BIGINT, other_int_col INT, other_dep
STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (other_dep)"
+ + "TBLPROPERTIES (%s)",
+ tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
+
+ // insert data into the second table partitioned by 'other_dep'
+ sql("INSERT INTO %s VALUES (1L, 100, 'software')",
tableName(OTHER_TABLE_NAME));
+ sql("INSERT INTO %s VALUES (2L, 200, 'hr')", tableName(OTHER_TABLE_NAME));
+
+ // SPJ would apply as the grouping keys are compatible
+ // the first table: `dep` (an intersection of all active partition fields
across scanned specs)
+ // the second table: `other_dep` (the only partition field).
+
+ assertPartitioningAwarePlan(
+ 1, /* expected num of shuffles with SPJ */
+ 3, /* expected num of shuffles without SPJ */
+ "SELECT * "
+ + "FROM %s "
+ + "INNER JOIN %s "
+ + "ON id = other_id AND int_col = other_int_col AND dep =
other_dep "
+ + "ORDER BY id, int_col, dep",
+ tableName,
+ tableName(OTHER_TABLE_NAME));
+ }
+
+ @Test
+ public void testJoinsWithIncompatibleSpecs() {
+ sql(
+ "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep)"
+ + "TBLPROPERTIES (%s)",
+ tableName, tablePropsAsString(TABLE_PROPERTIES));
+
+ sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName);
+ sql("INSERT INTO %s VALUES (2L, 200, 'software')", tableName);
+ sql("INSERT INTO %s VALUES (3L, 300, 'software')", tableName);
+
+ sql(
+ "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(8, int_col))"
+ + "TBLPROPERTIES (%s)",
+ tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
+
+ sql("INSERT INTO %s VALUES (1L, 100, 'software')",
tableName(OTHER_TABLE_NAME));
+ sql("INSERT INTO %s VALUES (2L, 200, 'software')",
tableName(OTHER_TABLE_NAME));
+ sql("INSERT INTO %s VALUES (3L, 300, 'software')",
tableName(OTHER_TABLE_NAME));
+
+ // queries can't benefit from SPJ as specs are not compatible
+ // the first table: `dep`
+ // the second table: `bucket(8, int_col)`
+
+ assertPartitioningAwarePlan(
+ 3, /* expected num of shuffles with SPJ */
+ 3, /* expected num of shuffles with SPJ */
+ "SELECT * "
+ + "FROM %s t1 "
+ + "INNER JOIN %s t2 "
+ + "ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep
= t2.dep "
+ + "ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col,
t2.dep",
+ tableName,
+ tableName(OTHER_TABLE_NAME));
+ }
+
+ @Test
+ public void testJoinsWithUnpartitionedTables() {
+ sql(
+ "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+ + "USING iceberg "
+ + "TBLPROPERTIES ("
+ + " 'read.split.target-size' = 16777216,"
+ + " 'read.split.open-file-cost' = 16777216)",
+ tableName);
+
+ sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName);
+ sql("INSERT INTO %s VALUES (2L, 200, 'software')", tableName);
+ sql("INSERT INTO %s VALUES (3L, 300, 'software')", tableName);
+
+ sql(
+ "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+ + "USING iceberg "
+ + "TBLPROPERTIES ("
+ + " 'read.split.target-size' = 16777216,"
+ + " 'read.split.open-file-cost' = 16777216)",
+ tableName(OTHER_TABLE_NAME));
+
+ sql("INSERT INTO %s VALUES (1L, 100, 'software')",
tableName(OTHER_TABLE_NAME));
+ sql("INSERT INTO %s VALUES (2L, 200, 'software')",
tableName(OTHER_TABLE_NAME));
+ sql("INSERT INTO %s VALUES (3L, 300, 'software')",
tableName(OTHER_TABLE_NAME));
+
+ // queries covering unpartitioned tables can't benefit from SPJ but
shouldn't fail
+
+ assertPartitioningAwarePlan(
+ 3, /* expected num of shuffles with SPJ */
+ 3, /* expected num of shuffles without SPJ */
+ "SELECT * "
+ + "FROM %s t1 "
+ + "INNER JOIN %s t2 "
+ + "ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep
= t2.dep "
+ + "ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col,
t2.dep",
+ tableName,
+ tableName(OTHER_TABLE_NAME));
+ }
+
+ @Test
+ public void testJoinsWithEmptyTable() {
+ sql(
+ "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep)"
+ + "TBLPROPERTIES (%s)",
+ tableName, tablePropsAsString(TABLE_PROPERTIES));
+
+ sql(
+ "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep)"
+ + "TBLPROPERTIES (%s)",
+ tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
+
+ sql("INSERT INTO %s VALUES (1L, 100, 'software')",
tableName(OTHER_TABLE_NAME));
+ sql("INSERT INTO %s VALUES (2L, 200, 'software')",
tableName(OTHER_TABLE_NAME));
+ sql("INSERT INTO %s VALUES (3L, 300, 'software')",
tableName(OTHER_TABLE_NAME));
+
+ assertPartitioningAwarePlan(
+ 3, /* expected num of shuffles with SPJ */
+ 3, /* expected num of shuffles without SPJ */
+ "SELECT * "
+ + "FROM %s t1 "
+ + "INNER JOIN %s t2 "
+ + "ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep
= t2.dep "
+ + "ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col,
t2.dep",
+ tableName,
+ tableName(OTHER_TABLE_NAME));
+ }
+
+ @Test
+ public void testJoinsWithOneSplitTables() {
+ sql(
+ "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep)"
+ + "TBLPROPERTIES (%s)",
+ tableName, tablePropsAsString(TABLE_PROPERTIES));
+
+ sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName);
+
+ sql(
+ "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep)"
+ + "TBLPROPERTIES (%s)",
+ tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
+
+ sql("INSERT INTO %s VALUES (1L, 100, 'software')",
tableName(OTHER_TABLE_NAME));
+
+ // Spark should be able to avoid shuffles even without SPJ if each side
has only one split
+
+ assertPartitioningAwarePlan(
+ 0, /* expected num of shuffles with SPJ */
+ 0, /* expected num of shuffles without SPJ */
+ "SELECT * "
+ + "FROM %s t1 "
+ + "INNER JOIN %s t2 "
+ + "ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep
= t2.dep "
+ + "ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col,
t2.dep",
+ tableName,
+ tableName(OTHER_TABLE_NAME));
+ }
+
+ @Test
+ public void testJoinsWithMismatchingPartitionKeys() {
+ sql(
+ "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep)"
+ + "TBLPROPERTIES (%s)",
+ tableName, tablePropsAsString(TABLE_PROPERTIES));
+
+ sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName);
+ sql("INSERT INTO %s VALUES (2L, 100, 'hr')", tableName);
+
+ sql(
+ "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep)"
+ + "TBLPROPERTIES (%s)",
+ tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
+
+ sql("INSERT INTO %s VALUES (1L, 100, 'software')",
tableName(OTHER_TABLE_NAME));
+ sql("INSERT INTO %s VALUES (3L, 300, 'hardware')",
tableName(OTHER_TABLE_NAME));
+
+ assertPartitioningAwarePlan(
+ 1, /* expected num of shuffles with SPJ */
+ 3, /* expected num of shuffles without SPJ */
+ "SELECT * "
+ + "FROM %s t1 "
+ + "INNER JOIN %s t2 "
+ + "ON t1.id = t2.id AND t1.dep = t2.dep "
+ + "ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col,
t2.dep",
+ tableName,
+ tableName(OTHER_TABLE_NAME));
+ }
+
+ @Test
+ public void testAggregates() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep, bucket(8, int_col))"
+ + "TBLPROPERTIES (%s)",
+ tableName, tablePropsAsString(TABLE_PROPERTIES));
+
+ // write to the table 3 times to generate 3 files per partition
+ Table table = validationCatalog.loadTable(tableIdent);
+ Dataset<Row> dataDF = randomDataDF(table.schema(), 100);
+ append(tableName, dataDF);
+
+ assertPartitioningAwarePlan(
+ 1, /* expected num of shuffles with SPJ */
+ 3, /* expected num of shuffles without SPJ */
+ "SELECT COUNT (DISTINCT id) AS count FROM %s GROUP BY dep, int_col
ORDER BY count",
+ tableName,
+ tableName(OTHER_TABLE_NAME));
+
+ assertPartitioningAwarePlan(
+ 1, /* expected num of shuffles with SPJ */
+ 3, /* expected num of shuffles without SPJ */
+ "SELECT COUNT (DISTINCT id) AS count FROM %s GROUP BY dep ORDER BY
count",
+ tableName,
+ tableName(OTHER_TABLE_NAME));
+ }
+
+ private void checkJoin(String sourceColumnName, String sourceColumnType,
String transform)
+ throws NoSuchTableException {
+
+ String createTableStmt =
+ "CREATE TABLE %s (id BIGINT, salary INT, %s %s)"
+ + "USING iceberg "
+ + "PARTITIONED BY (%s)"
+ + "TBLPROPERTIES (%s)";
+
+ sql(
+ createTableStmt,
+ tableName,
+ sourceColumnName,
+ sourceColumnType,
+ transform,
+ tablePropsAsString(TABLE_PROPERTIES));
+ configurePlanningMode(tableName, planningMode);
+
+ sql(
+ createTableStmt,
+ tableName(OTHER_TABLE_NAME),
+ sourceColumnName,
+ sourceColumnType,
+ transform,
+ tablePropsAsString(TABLE_PROPERTIES));
+ configurePlanningMode(tableName(OTHER_TABLE_NAME), planningMode);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Dataset<Row> dataDF = randomDataDF(table.schema(), 200);
+ append(tableName, dataDF);
+ append(tableName(OTHER_TABLE_NAME), dataDF);
+
+ assertPartitioningAwarePlan(
+ 1, /* expected num of shuffles with SPJ */
+ 3, /* expected num of shuffles without SPJ */
+ "SELECT t1.id, t1.salary, t1.%s "
+ + "FROM %s t1 "
+ + "INNER JOIN %s t2 "
+ + "ON t1.id = t2.id AND t1.%s = t2.%s "
+ + "ORDER BY t1.id, t1.%s, t1.salary", // add order by
salary to make test stable
+ sourceColumnName,
+ tableName,
+ tableName(OTHER_TABLE_NAME),
+ sourceColumnName,
+ sourceColumnName,
+ sourceColumnName);
+ }
+
+ private void assertPartitioningAwarePlan(
+ int expectedNumShufflesWithSPJ,
+ int expectedNumShufflesWithoutSPJ,
+ String query,
+ Object... args) {
+
+ AtomicReference<List<Object[]>> rowsWithSPJ = new AtomicReference<>();
+ AtomicReference<List<Object[]>> rowsWithoutSPJ = new AtomicReference<>();
+
+ withSQLConf(
+ ENABLED_SPJ_SQL_CONF,
+ () -> {
+ String plan = executeAndKeepPlan(query, args).toString();
+ int actualNumShuffles = StringUtils.countMatches(plan,
"Exchange");
+ Assert.assertEquals(
+ "Number of shuffles with enabled SPJ must match",
+ expectedNumShufflesWithSPJ,
+ actualNumShuffles);
+
+ rowsWithSPJ.set(sql(query, args));
+ });
+
+ withSQLConf(
+ DISABLED_SPJ_SQL_CONF,
+ () -> {
+ String plan = executeAndKeepPlan(query, args).toString();
+ int actualNumShuffles = StringUtils.countMatches(plan,
"Exchange");
+ Assert.assertEquals(
+ "Number of shuffles with disabled SPJ must match",
+ expectedNumShufflesWithoutSPJ,
+ actualNumShuffles);
+
+ rowsWithoutSPJ.set(sql(query, args));
+ });
+
+ assertEquals("SPJ should not change query output", rowsWithoutSPJ.get(),
rowsWithSPJ.get());
+ }
+
+ private Dataset<Row> randomDataDF(Schema schema, int numRows) {
+ Iterable<InternalRow> rows = RandomData.generateSpark(schema, numRows, 0);
+ JavaRDD<InternalRow> rowRDD =
sparkContext.parallelize(Lists.newArrayList(rows));
+ StructType rowSparkType = SparkSchemaUtil.convert(schema);
+ return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType,
false);
+ }
+
+ private void append(String table, Dataset<Row> df) throws
NoSuchTableException {
+ // fanout writes are enabled as write-time clustering is not supported
without Spark extensions
+ df.coalesce(1).writeTo(table).option(SparkWriteOptions.FANOUT_ENABLED,
"true").append();
+ }
+}
+
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/SparkTestBase.java
b/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/SparkTestBase.java
new file mode 100644
index 0000000000..4a9c1d5a7e
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/SparkTestBase.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.execution.QueryExecution;
+import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.QueryExecutionListener;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+public abstract class SparkTestBase extends SparkTestHelperBase {
+
+ protected static TestHiveMetastore metastore = null;
+ protected static HiveConf hiveConf = null;
+ protected static SparkSession spark = null;
+ protected static JavaSparkContext sparkContext = null;
+ protected static HiveCatalog catalog = null;
+
+ @BeforeClass
+ public static void startMetastoreAndSpark() {
+ SparkTestBase.metastore = new TestHiveMetastore();
+ metastore.start();
+ SparkTestBase.hiveConf = metastore.hiveConf();
+
+ SparkTestBase.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config(SQLConf.PARTITION_OVERWRITE_MODE().key(),
"dynamic")
+ .config("spark.hadoop." + METASTOREURIS.varname,
hiveConf.get(METASTOREURIS.varname))
+
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
+ .config("spark.plugins",
"org.apache.gluten.GlutenPlugin")
+ .config("spark.default.parallelism", "1")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "1024MB")
+ .config("spark.ui.enabled", "false")
+ .config("spark.gluten.ui.enabled", "false")
+ .enableHiveSupport()
+ .getOrCreate();
+
+ SparkTestBase.sparkContext =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ SparkTestBase.catalog =
+ (HiveCatalog)
+ CatalogUtil.loadCatalog(
+ HiveCatalog.class.getName(), "hive",
ImmutableMap.of(), hiveConf);
+
+ try {
+ catalog.createNamespace(Namespace.of("default"));
+ } catch (AlreadyExistsException ignored) {
+ // the default namespace already exists. ignore the create error
+ }
+ }
+
+ @AfterClass
+ public static void stopMetastoreAndSpark() throws Exception {
+ SparkTestBase.catalog = null;
+ if (metastore != null) {
+ metastore.stop();
+ SparkTestBase.metastore = null;
+ }
+ if (spark != null) {
+ spark.stop();
+ SparkTestBase.spark = null;
+ SparkTestBase.sparkContext = null;
+ }
+ }
+
+ protected long waitUntilAfter(long timestampMillis) {
+ long current = System.currentTimeMillis();
+ while (current <= timestampMillis) {
+ current = System.currentTimeMillis();
+ }
+ return current;
+ }
+
+ protected List<Object[]> sql(String query, Object... args) {
+ List<Row> rows = spark.sql(String.format(query, args)).collectAsList();
+ if (rows.size() < 1) {
+ return ImmutableList.of();
+ }
+
+ return rowsToJava(rows);
+ }
+
+ protected Object scalarSql(String query, Object... args) {
+ List<Object[]> rows = sql(query, args);
+ Assert.assertEquals("Scalar SQL should return one row", 1,
rows.size());
+ Object[] row = Iterables.getOnlyElement(rows);
+ Assert.assertEquals("Scalar SQL should return one value", 1,
row.length);
+ return row[0];
+ }
+
+ protected Object[] row(Object... values) {
+ return values;
+ }
+
+ protected static String dbPath(String dbName) {
+ return metastore.getDatabasePath(dbName);
+ }
+
+ protected void withUnavailableFiles(Iterable<? extends ContentFile<?>>
files, Action action) {
+ Iterable<String> fileLocations = Iterables.transform(files, file ->
file.path().toString());
+ withUnavailableLocations(fileLocations, action);
+ }
+
+ private void move(String location, String newLocation) {
+ Path path = Paths.get(URI.create(location));
+ Path tempPath = Paths.get(URI.create(newLocation));
+
+ try {
+ Files.move(path, tempPath);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to move: " + location, e);
+ }
+ }
+
+ protected void withUnavailableLocations(Iterable<String> locations, Action
action) {
+ for (String location : locations) {
+ move(location, location + "_temp");
+ }
+
+ try {
+ action.invoke();
+ } finally {
+ for (String location : locations) {
+ move(location + "_temp", location);
+ }
+ }
+ }
+
+ protected void withDefaultTimeZone(String zoneId, Action action) {
+ TimeZone currentZone = TimeZone.getDefault();
+ try {
+ TimeZone.setDefault(TimeZone.getTimeZone(zoneId));
+ action.invoke();
+ } finally {
+ TimeZone.setDefault(currentZone);
+ }
+ }
+
+ protected void withSQLConf(Map<String, String> conf, Action action) {
+ SQLConf sqlConf = SQLConf.get();
+
+ Map<String, String> currentConfValues = Maps.newHashMap();
+ conf.keySet()
+ .forEach(
+ confKey -> {
+ if (sqlConf.contains(confKey)) {
+ String currentConfValue =
sqlConf.getConfString(confKey);
+ currentConfValues.put(confKey,
currentConfValue);
+ }
+ });
+
+ conf.forEach(
+ (confKey, confValue) -> {
+ if (SQLConf.isStaticConfigKey(confKey)) {
+ throw new RuntimeException("Cannot modify the value of
a static config: " + confKey);
+ }
+ sqlConf.setConfString(confKey, confValue);
+ });
+
+ try {
+ action.invoke();
+ } finally {
+ conf.forEach(
+ (confKey, confValue) -> {
+ if (currentConfValues.containsKey(confKey)) {
+ sqlConf.setConfString(confKey,
currentConfValues.get(confKey));
+ } else {
+ sqlConf.unsetConf(confKey);
+ }
+ });
+ }
+ }
+
+ protected Dataset<Row> jsonToDF(String schema, String... records) {
+ Dataset<String> jsonDF =
spark.createDataset(ImmutableList.copyOf(records), Encoders.STRING());
+ return spark.read().schema(schema).json(jsonDF);
+ }
+
+ protected void append(String table, String... jsonRecords) {
+ try {
+ String schema = spark.table(table).schema().toDDL();
+ Dataset<Row> df = jsonToDF(schema, jsonRecords);
+ df.coalesce(1).writeTo(table).append();
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException("Failed to write data", e);
+ }
+ }
+
+ protected String tablePropsAsString(Map<String, String> tableProps) {
+ StringBuilder stringBuilder = new StringBuilder();
+
+ for (Map.Entry<String, String> property : tableProps.entrySet()) {
+ if (stringBuilder.length() > 0) {
+ stringBuilder.append(", ");
+ }
+ stringBuilder.append(String.format("'%s' '%s'", property.getKey(),
property.getValue()));
+ }
+
+ return stringBuilder.toString();
+ }
+
+ protected SparkPlan executeAndKeepPlan(String query, Object... args) {
+ return executeAndKeepPlan(() -> sql(query, args));
+ }
+
+ protected SparkPlan executeAndKeepPlan(Action action) {
+ AtomicReference<SparkPlan> executedPlanRef = new AtomicReference<>();
+
+ QueryExecutionListener listener =
+ new QueryExecutionListener() {
+ @Override
+ public void onSuccess(String funcName, QueryExecution qe,
long durationNs) {
+ executedPlanRef.set(qe.executedPlan());
+ }
+
+ @Override
+ public void onFailure(String funcName, QueryExecution qe,
Exception exception) {}
+ };
+
+ spark.listenerManager().register(listener);
+
+ action.invoke();
+
+ try {
+ spark.sparkContext().listenerBus().waitUntilEmpty();
+ } catch (TimeoutException e) {
+ throw new RuntimeException("Timeout while waiting for processing
events", e);
+ }
+
+ SparkPlan executedPlan = executedPlanRef.get();
+ if (executedPlan instanceof AdaptiveSparkPlanExec) {
+ return ((AdaptiveSparkPlanExec) executedPlan).executedPlan();
+ } else {
+ return executedPlan;
+ }
+ }
+
+ @FunctionalInterface
+ protected interface Action {
+ void invoke();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]