This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ed66db9d6d [GLUTEN-8948][VL] Fallback iceberg delete from scan (#8987)
ed66db9d6d is described below
commit ed66db9d6d1558426e0b5de7af6e6734d1e822b2
Author: Jin Chengcheng <[email protected]>
AuthorDate: Thu Mar 20 16:22:43 2025 +0000
[GLUTEN-8948][VL] Fallback iceberg delete from scan (#8987)
---
backends-velox/pom.xml | 7 +
.../execution/TestTPCHStoragePartitionedJoins.java | 217 ++++++++
.../extensions/GlutenTestCopyOnWriteDelete.java | 45 ++
.../extensions/GlutenTestMergeOnReadDelete.java | 45 ++
.../extensions/GlutenTestMergeOnReadMerge.java | 46 ++
.../extensions/GlutenTestMergeOnReadUpdate.java | 45 ++
...toragePartitionedJoinsInRowLevelOperations.java | 27 +
.../GlutenTestSystemFunctionPushDownDQL.java | 27 +
...SystemFunctionPushDownInRowLevelOperations.java | 25 +
.../spark34/sql/GlutenTestAggregatePushDown.java | 63 +++
.../gluten/spark34/sql/GlutenTestDeleteFrom.java | 27 +
.../sql/GlutenTestPartitionedWritesAsSelect.java | 22 +
.../sql/GlutenTestPartitionedWritesToBranch.java | 27 +
.../GlutenTestPartitionedWritesToWapBranch.java | 27 +
.../gluten/spark34/sql/GlutenTestSelect.java | 27 +
.../sql/GlutenTestTimestampWithoutZone.java | 27 +
.../gluten/spark34/sql/TestFilterPushDown.java | 604 +++++++++++++++++++++
.../spark/extensions/SparkExtensionsTestBase.java | 72 +++
.../spark/source/TestSparkReaderDeletes.java | 25 +
.../gluten/backendsapi/velox/VeloxBackend.scala | 2 +
docs/get-started/VeloxIceberg.md | 25 +-
.../gluten/execution/IcebergScanTransformer.scala | 44 +-
.../org/apache/gluten/execution/IcebergSuite.scala | 118 ++--
.../gluten/backendsapi/BackendSettingsApi.scala | 2 +
24 files changed, 1514 insertions(+), 82 deletions(-)
diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml
index 3f9c7f8074..490afe876c 100755
--- a/backends-velox/pom.xml
+++ b/backends-velox/pom.xml
@@ -133,6 +133,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+
<artifactId>iceberg-spark-extensions-${sparkbundle.version}_${scala.binary.version}</artifactId>
+ <version>${iceberg.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/execution/TestTPCHStoragePartitionedJoins.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/execution/TestTPCHStoragePartitionedJoins.java
new file mode 100644
index 0000000000..78ecbfa9f5
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/execution/TestTPCHStoragePartitionedJoins.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.execution;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.utils.Arm;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkSQLProperties;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import scala.io.Source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestTPCHStoragePartitionedJoins extends SparkTestBaseWithCatalog {
+ protected String rootPath = this.getClass().getResource("/").getPath();
+ protected String tpchBasePath = rootPath + "../../../src/test/resources";
+
+ protected String tpchQueries = rootPath +
"../../../../tools/gluten-it/common/src/main/resources/tpch-queries";
+
+ // open file cost and split size are set as 16 MB to produce a split per
file
+ private static final Map<String, String> TABLE_PROPERTIES =
+ ImmutableMap.of(
+ TableProperties.SPLIT_SIZE, "16777216",
TableProperties.SPLIT_OPEN_FILE_COST, "16777216");
+
+ // only v2 bucketing and preserve data grouping properties have to be
enabled to trigger SPJ
+ // other properties are only to simplify testing and validation
+ private static final Map<String, String> ENABLED_SPJ_SQL_CONF =
+ ImmutableMap.of(
+ SQLConf.V2_BUCKETING_ENABLED().key(),
+ "true",
+ SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED().key(),
+ "true",
+ SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(),
+ "false",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
+ "false",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(),
+ "-1",
+ SparkSQLProperties.PRESERVE_DATA_GROUPING,
+ "true",
+
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED().key(),
+ "true");
+ protected static String PARQUET_TABLE_PREFIX = "p_";
+ protected static List<String> tableNames = ImmutableList.of("part",
"supplier", "partsupp",
+ "customer", "orders", "lineitem", "nation", "region");
+
+ // If we test all the catalog, we need to create the table in that catalog,
+ // we don't need to test the catalog, so only test the testhadoop catalog
+ @Before
+ public void createTPCHNotNullTables() {
+ tableNames.forEach(table -> {
+ String tableDir = tpchBasePath + "/tpch-data-parquet";
+// String tableDir =
"/Users/chengchengjin/code/gluten/backends-velox/src/test/resources/tpch-data-parquet";
+ String tablePath = new File(tableDir, table).getAbsolutePath();
+ Dataset<Row> tableDF =
spark.read().format("parquet").load(tablePath);
+ tableDF.createOrReplaceTempView(PARQUET_TABLE_PREFIX + table);
+ });
+
+
+ sql(createIcebergTable("part", "`p_partkey` INT,\n" +
+ " `p_name` string,\n" +
+ " `p_mfgr` string,\n" +
+ " `p_brand` string,\n" +
+ " `p_type` string,\n" +
+ " `p_size` INT,\n" +
+ " `p_container` string,\n" +
+ " `p_retailprice` DECIMAL(15,2) ,\n" +
+ " `p_comment` string ", null));
+ sql(createIcebergTable("nation", "`n_nationkey` INT,\n" +
+ " `n_name` CHAR(25),\n" +
+ " `n_regionkey` INT,\n" +
+ " `n_comment` VARCHAR(152)"));
+ sql(createIcebergTable("region", "`r_regionkey` INT,\n" +
+ " `r_name` CHAR(25),\n" +
+ " `r_comment` VARCHAR(152)"));
+ sql(createIcebergTable("supplier", "`s_suppkey` INT,\n" +
+ " `s_name` CHAR(25),\n" +
+ " `s_address` VARCHAR(40),\n" +
+ " `s_nationkey` INT,\n" +
+ " `s_phone` CHAR(15),\n" +
+ " `s_acctbal` DECIMAL(15,2),\n" +
+ " `s_comment` VARCHAR(101)"));
+ sql(createIcebergTable("customer", "`c_custkey` INT,\n" +
+ " `c_name` string,\n" +
+ " `c_address` string,\n" +
+ " `c_nationkey` INT,\n" +
+ " `c_phone` string,\n" +
+ " `c_acctbal` DECIMAL(15,2),\n" +
+ " `c_mktsegment` string,\n" +
+ " `c_comment` string", "bucket(16, c_custkey)"));
+ sql(createIcebergTable("partsupp", "`ps_partkey` INT,\n" +
+ " `ps_suppkey` INT,\n" +
+ " `ps_availqty` INT,\n" +
+ " `ps_supplycost` DECIMAL(15,2),\n" +
+ " `ps_comment` VARCHAR(199)"));
+ sql(createIcebergTable("orders", "`o_orderkey` INT,\n" +
+ " `o_custkey` INT,\n" +
+ " `o_orderstatus` string,\n" +
+ " `o_totalprice` DECIMAL(15,2),\n" +
+ " `o_orderdate` DATE,\n" +
+ " `o_orderpriority` string,\n" +
+ " `o_clerk` string,\n" +
+ " `o_shippriority` INT,\n" +
+ " `o_comment` string", "bucket(16, o_custkey)"));
+
+ sql(createIcebergTable("lineitem", "`l_orderkey` INT,\n" +
+ " `l_partkey` INT,\n" +
+ " `l_suppkey` INT,\n" +
+ " `l_linenumber` INT,\n" +
+ " `l_quantity` DECIMAL(15,2),\n" +
+ " `l_extendedprice` DECIMAL(15,2),\n" +
+ " `l_discount` DECIMAL(15,2),\n" +
+ " `l_tax` DECIMAL(15,2),\n" +
+ " `l_returnflag` string,\n" +
+ " `l_linestatus` string,\n" +
+ " `l_shipdate` DATE,\n" +
+ " `l_commitdate` DATE,\n" +
+ " `l_receiptdate` DATE,\n" +
+ " `l_shipinstruct` string,\n" +
+ " `l_shipmode` string,\n" +
+ " `l_comment` string", null));
+
+ String insertStmt = "INSERT INTO %s select * from %s%s";
+ tableNames.forEach(table -> sql(String.format(insertStmt,
tableName(table), PARQUET_TABLE_PREFIX, table)));
+
+ }
+
+ @After
+ public void dropTPCHNotNullTables() {
+ tableNames.forEach(table -> {
+ sql("DROP TABLE IF EXISTS " + tableName(table));
+ sql("DROP VIEW IF EXISTS " + PARQUET_TABLE_PREFIX + table);
+ });
+
+ }
+
+ private String createIcebergTable(String name, String columns) {
+ return createIcebergTable(name, columns, null);
+ }
+
+ private String createIcebergTable(String name, String columns, String
transform) {
+ // create TPCH iceberg table
+ String createTableStmt =
+ "CREATE TABLE %s (%s)"
+ + "USING iceberg "
+ + "PARTITIONED BY (%s)"
+ + "TBLPROPERTIES (%s)";
+ String createUnpartitionTableStmt =
+ "CREATE TABLE %s (%s)"
+ + "USING iceberg "
+ + "TBLPROPERTIES (%s)";
+ if (transform != null) {
+ return String.format(createTableStmt, tableName(name), columns,
transform,
+ tablePropsAsString(TABLE_PROPERTIES));
+ } else {
+ return String.format(createUnpartitionTableStmt, tableName(name),
columns,
+ tablePropsAsString(TABLE_PROPERTIES));
+ }
+
+ }
+
+ protected String tpchSQL(int queryNum) {
+ try {
+ return FileUtils.readFileToString(new File(tpchQueries + "/q" +
queryNum + ".sql"), "UTF-8");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testTPCH() {
+ spark.conf().set("spark.sql.defaultCatalog", catalogName);
+ spark.conf().set("spark.sql.catalog." + catalogName +
".default-namespace", "default");
+ sql("use namespace default");
+ withSQLConf(ENABLED_SPJ_SQL_CONF, () -> {
+ for (int i = 1; i <= 22; i++) {
+ List<Row> rows = spark.sql(tpchSQL(i)).collectAsList();
+ AtomicReference<List<Row>> rowsSpark = new AtomicReference<>();
+ int finalI = i;
+
withSQLConf(ImmutableMap.of(GlutenConfig.GLUTEN_ENABLED().key(), "false"), () ->
+
rowsSpark.set(spark.sql(tpchSQL(finalI)).collectAsList()
+ ));
+
assertThat(rows).containsExactlyInAnyOrderElementsOf(Iterables.concat(rowsSpark.get()));
+ }
+ });
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestCopyOnWriteDelete.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestCopyOnWriteDelete.java
new file mode 100644
index 0000000000..609c64bd7c
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestCopyOnWriteDelete.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.extensions;
+
+import org.apache.iceberg.PlanningMode;
+import org.apache.iceberg.spark.extensions.TestCopyOnWriteDelete;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+public class GlutenTestCopyOnWriteDelete extends TestCopyOnWriteDelete {
+ public GlutenTestCopyOnWriteDelete(String catalogName, String
implementation, Map<String, String> config, String fileFormat, Boolean
vectorized, String distributionMode, boolean fanoutEnabled, String branch,
PlanningMode planningMode) {
+ super(catalogName, implementation, config, fileFormat, vectorized,
distributionMode, fanoutEnabled, branch, planningMode);
+ }
+
+ @Test
+ public synchronized void testDeleteWithConcurrentTableRefresh() {
+ System.out.println("Run timeout");
+ }
+
+ @Test
+ public synchronized void testDeleteWithSerializableIsolation() {
+ System.out.println("Run timeout");
+ }
+
+ @Test
+ public synchronized void testDeleteWithSnapshotIsolation() throws
ExecutionException {
+ System.out.println("Run timeout");
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestMergeOnReadDelete.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestMergeOnReadDelete.java
new file mode 100644
index 0000000000..07d97bf047
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestMergeOnReadDelete.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.extensions;
+
+import org.apache.iceberg.PlanningMode;
+import org.apache.iceberg.spark.extensions.TestMergeOnReadDelete;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+public class GlutenTestMergeOnReadDelete extends TestMergeOnReadDelete {
+ public GlutenTestMergeOnReadDelete(String catalogName, String
implementation, Map<String, String> config, String fileFormat, Boolean
vectorized, String distributionMode, boolean fanoutEnabled, String branch,
PlanningMode planningMode) {
+ super(catalogName, implementation, config, fileFormat, vectorized,
distributionMode, fanoutEnabled, branch, planningMode);
+ }
+
+ @Test
+ public synchronized void testDeleteWithConcurrentTableRefresh() {
+ System.out.println("Run timeout");
+ }
+
+ @Test
+ public synchronized void testDeleteWithSerializableIsolation() {
+ System.out.println("Run timeout");
+ }
+
+ @Test
+ public synchronized void testDeleteWithSnapshotIsolation() throws
ExecutionException {
+ System.out.println("Run timeout");
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestMergeOnReadMerge.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestMergeOnReadMerge.java
new file mode 100644
index 0000000000..a917474ebc
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestMergeOnReadMerge.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.extensions;
+
+import org.apache.iceberg.PlanningMode;
+import org.apache.iceberg.spark.extensions.TestMergeOnReadMerge;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+public class GlutenTestMergeOnReadMerge extends TestMergeOnReadMerge {
+ public GlutenTestMergeOnReadMerge(String catalogName, String
implementation, Map<String, String> config, String fileFormat, boolean
vectorized, String distributionMode, boolean fanoutEnabled, String branch,
PlanningMode planningMode) {
+ super(catalogName, implementation, config, fileFormat, vectorized,
distributionMode, fanoutEnabled, branch, planningMode);
+ }
+
+ @Test
+ public synchronized void testMergeWithConcurrentTableRefresh() {
+ System.out.println("Run timeout");
+ }
+
+ @Test
+ public synchronized void testMergeWithSerializableIsolation() {
+ System.out.println("Run timeout");
+ }
+
+ @Test
+ public synchronized void testMergeWithSnapshotIsolation() {
+ System.out.println("Run timeout");
+ }
+
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestMergeOnReadUpdate.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestMergeOnReadUpdate.java
new file mode 100644
index 0000000000..ea95e70d39
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestMergeOnReadUpdate.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.extensions;
+
+import org.apache.iceberg.PlanningMode;
+import org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+public class GlutenTestMergeOnReadUpdate extends TestMergeOnReadUpdate {
+ public GlutenTestMergeOnReadUpdate(String catalogName, String
implementation, Map<String, String> config, String fileFormat, boolean
vectorized, String distributionMode, boolean fanoutEnabled, String branch,
PlanningMode planningMode) {
+ super(catalogName, implementation, config, fileFormat, vectorized,
distributionMode, fanoutEnabled, branch, planningMode);
+ }
+
+ @Test
+ public synchronized void testUpdateWithConcurrentTableRefresh() {
+ System.out.println("Run timeout");
+ }
+
+ @Test
+ public synchronized void testUpdateWithSerializableIsolation() {
+ System.out.println("Run timeout");
+ }
+
+ @Test
+ public synchronized void testUpdateWithSnapshotIsolation() throws
ExecutionException {
+ System.out.println("Run timeout");
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestStoragePartitionedJoinsInRowLevelOperations.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestStoragePartitionedJoinsInRowLevelOperations.java
new file mode 100644
index 0000000000..e06bcca442
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestStoragePartitionedJoinsInRowLevelOperations.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.extensions;
+
+import
org.apache.iceberg.spark.extensions.TestStoragePartitionedJoinsInRowLevelOperations;
+
+import java.util.Map;
+
+public class GlutenTestStoragePartitionedJoinsInRowLevelOperations extends
TestStoragePartitionedJoinsInRowLevelOperations {
+ public GlutenTestStoragePartitionedJoinsInRowLevelOperations(String
catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestSystemFunctionPushDownDQL.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestSystemFunctionPushDownDQL.java
new file mode 100644
index 0000000000..3957325d74
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestSystemFunctionPushDownDQL.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.extensions;
+
+import org.apache.iceberg.spark.extensions.TestSystemFunctionPushDownDQL;
+
+import java.util.Map;
+
+public class GlutenTestSystemFunctionPushDownDQL extends
TestSystemFunctionPushDownDQL {
+ public GlutenTestSystemFunctionPushDownDQL(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestSystemFunctionPushDownInRowLevelOperations.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestSystemFunctionPushDownInRowLevelOperations.java
new file mode 100644
index 0000000000..b2ad74b2de
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/extensions/GlutenTestSystemFunctionPushDownInRowLevelOperations.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.extensions;
+
+import java.util.Map;
+
+public class GlutenTestSystemFunctionPushDownInRowLevelOperations extends
GlutenTestSystemFunctionPushDownDQL {
+ public GlutenTestSystemFunctionPushDownInRowLevelOperations(String
catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestAggregatePushDown.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestAggregatePushDown.java
new file mode 100644
index 0000000000..cd79cc457f
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestAggregatePushDown.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.sql;
+
+import org.apache.gluten.spark34.TestConfUtil;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.sql.TestAggregatePushDown;
+import org.apache.spark.sql.SparkSession;
+import org.junit.BeforeClass;
+
+import java.util.Map;
+
+public class GlutenTestAggregatePushDown extends TestAggregatePushDown {
+ public GlutenTestAggregatePushDown(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @BeforeClass
+ public static void startMetastoreAndSpark() {
+ SparkTestBase.metastore = new TestHiveMetastore();
+ metastore.start();
+ SparkTestBase.hiveConf = metastore.hiveConf();
+
+ SparkTestBase.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.iceberg.aggregate_pushdown", "true")
+ .config(TestConfUtil.GLUTEN_CONF)
+ .enableHiveSupport()
+ .getOrCreate();
+
+ SparkTestBase.catalog =
+ (HiveCatalog)
+ CatalogUtil.loadCatalog(
+ HiveCatalog.class.getName(), "hive",
ImmutableMap.of(), hiveConf);
+
+ try {
+ catalog.createNamespace(Namespace.of("default"));
+ } catch (AlreadyExistsException ignored) {
+ // the default namespace already exists. ignore the create error
+ }
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestDeleteFrom.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestDeleteFrom.java
new file mode 100644
index 0000000000..aefa981786
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestDeleteFrom.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.sql;
+
+import org.apache.iceberg.spark.sql.TestDeleteFrom;
+
+import java.util.Map;
+
+public class GlutenTestDeleteFrom extends TestDeleteFrom {
+ public GlutenTestDeleteFrom(String catalogName, String implementation,
Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestPartitionedWritesAsSelect.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestPartitionedWritesAsSelect.java
new file mode 100644
index 0000000000..f5c96e160e
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestPartitionedWritesAsSelect.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.sql;
+
+import org.apache.iceberg.spark.sql.TestPartitionedWritesAsSelect;
+
+public class GlutenTestPartitionedWritesAsSelect extends
TestPartitionedWritesAsSelect {
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestPartitionedWritesToBranch.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestPartitionedWritesToBranch.java
new file mode 100644
index 0000000000..7ef4a67261
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestPartitionedWritesToBranch.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.sql;
+
+import org.apache.iceberg.spark.sql.TestPartitionedWritesToBranch;
+
+import java.util.Map;
+
+public class GlutenTestPartitionedWritesToBranch extends
TestPartitionedWritesToBranch {
+ public GlutenTestPartitionedWritesToBranch(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestPartitionedWritesToWapBranch.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestPartitionedWritesToWapBranch.java
new file mode 100644
index 0000000000..4feffc455c
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestPartitionedWritesToWapBranch.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.sql;
+
+import org.apache.iceberg.spark.sql.TestPartitionedWritesToWapBranch;
+
+import java.util.Map;
+
+public class GlutenTestPartitionedWritesToWapBranch extends
TestPartitionedWritesToWapBranch {
+ public GlutenTestPartitionedWritesToWapBranch(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestSelect.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestSelect.java
new file mode 100644
index 0000000000..cec104661e
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestSelect.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.sql;
+
+import org.apache.iceberg.spark.sql.TestSelect;
+
+import java.util.Map;
+
+public class GlutenTestSelect extends TestSelect {
+ public GlutenTestSelect(String catalogName, String implementation,
Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestTimestampWithoutZone.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestTimestampWithoutZone.java
new file mode 100644
index 0000000000..67067111d6
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/GlutenTestTimestampWithoutZone.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.sql;
+
+import org.apache.iceberg.spark.sql.TestTimestampWithoutZone;
+
+import java.util.Map;
+
+public class GlutenTestTimestampWithoutZone extends TestTimestampWithoutZone {
+ public GlutenTestTimestampWithoutZone(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/TestFilterPushDown.java
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/TestFilterPushDown.java
new file mode 100644
index 0000000000..cea4897e33
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/sql/TestFilterPushDown.java
@@ -0,0 +1,604 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.sql;
+
+import org.apache.iceberg.PlanningMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.spark.sql.execution.SparkPlan;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+// Change the test filter plan match string
+@RunWith(Parameterized.class)
+public class TestFilterPushDown extends SparkTestBaseWithCatalog {
+
+ @Parameterized.Parameters(name = "planningMode = {0}")
+ public static Object[] parameters() {
+ return new Object[] {LOCAL, DISTRIBUTED};
+ }
+
+ private final PlanningMode planningMode;
+
+ public TestFilterPushDown(PlanningMode planningMode) {
+ this.planningMode = planningMode;
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql("DROP TABLE IF EXISTS tmp_view");
+ }
+
+ @Test
+ public void testFilterPushdownWithDecimalValues() {
+ sql(
+ "CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep)",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName);
+ sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName);
+
+ checkFilters(
+ "dep = 'd1' AND salary > 100.03" /* query predicate */,
+ "isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */,
+ "dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /*
Iceberg scan filters */,
+ ImmutableList.of(row(2, new BigDecimal("100.05"), "d1")));
+ }
+
+ @Test
+ public void testFilterPushdownWithIdentityTransform() {
+ sql(
+ "CREATE TABLE %s (id INT, salary INT, dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep)",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
+ sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
+ sql("INSERT INTO %s VALUES (3, 300, 'd3')", tableName);
+ sql("INSERT INTO %s VALUES (4, 400, 'd4')", tableName);
+ sql("INSERT INTO %s VALUES (5, 500, 'd5')", tableName);
+ sql("INSERT INTO %s VALUES (6, 600, null)", tableName);
+
+ checkOnlyIcebergFilters(
+ "dep IS NULL" /* query predicate */,
+ "dep IS NULL" /* Iceberg scan filters */,
+ ImmutableList.of(row(6, 600, null)));
+
+ checkOnlyIcebergFilters(
+ "dep IS NOT NULL" /* query predicate */,
+ "dep IS NOT NULL" /* Iceberg scan filters */,
+ ImmutableList.of(
+ row(1, 100, "d1"),
+ row(2, 200, "d2"),
+ row(3, 300, "d3"),
+ row(4, 400, "d4"),
+ row(5, 500, "d5")));
+
+ checkOnlyIcebergFilters(
+ "dep = 'd3'" /* query predicate */,
+ "dep IS NOT NULL, dep = 'd3'" /* Iceberg scan filters */,
+ ImmutableList.of(row(3, 300, "d3")));
+
+ checkOnlyIcebergFilters(
+ "dep > 'd3'" /* query predicate */,
+ "dep IS NOT NULL, dep > 'd3'" /* Iceberg scan filters */,
+ ImmutableList.of(row(4, 400, "d4"), row(5, 500, "d5")));
+
+ checkOnlyIcebergFilters(
+ "dep >= 'd5'" /* query predicate */,
+ "dep IS NOT NULL, dep >= 'd5'" /* Iceberg scan filters */,
+ ImmutableList.of(row(5, 500, "d5")));
+
+ checkOnlyIcebergFilters(
+ "dep < 'd2'" /* query predicate */,
+ "dep IS NOT NULL, dep < 'd2'" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100, "d1")));
+
+ checkOnlyIcebergFilters(
+ "dep <= 'd2'" /* query predicate */,
+ "dep IS NOT NULL, dep <= 'd2'" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2")));
+
+ checkOnlyIcebergFilters(
+ "dep <=> 'd3'" /* query predicate */,
+ "dep = 'd3'" /* Iceberg scan filters */,
+ ImmutableList.of(row(3, 300, "d3")));
+
+ checkOnlyIcebergFilters(
+ "dep IN (null, 'd1')" /* query predicate */,
+ "dep IN ('d1')" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100, "d1")));
+
+ checkOnlyIcebergFilters(
+ "dep NOT IN ('d2', 'd4')" /* query predicate */,
+ "(dep IS NOT NULL AND dep NOT IN ('d2', 'd4'))" /* Iceberg scan
filters */,
+ ImmutableList.of(row(1, 100, "d1"), row(3, 300, "d3"), row(5, 500,
"d5")));
+
+ checkOnlyIcebergFilters(
+ "dep = 'd1' AND dep IS NOT NULL" /* query predicate */,
+ "dep = 'd1', dep IS NOT NULL" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100, "d1")));
+
+ checkOnlyIcebergFilters(
+ "dep = 'd1' OR dep = 'd2' OR dep = 'd3'" /* query predicate */,
+ "((dep = 'd1' OR dep = 'd2') OR dep = 'd3')" /* Iceberg scan filters
*/,
+ ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2"), row(3, 300,
"d3")));
+
+ checkFilters(
+ "dep = 'd1' AND id = 1" /* query predicate */,
+ "isnotnull(id) AND (id = 1)" /* Spark post scan filter */,
+ "dep IS NOT NULL, id IS NOT NULL, dep = 'd1', id = 1" /* Iceberg scan
filters */,
+ ImmutableList.of(row(1, 100, "d1")));
+
+ checkFilters(
+ "dep = 'd2' OR id = 1" /* query predicate */,
+ "(dep = d2) OR (id = 1)" /* Spark post scan filter */,
+ "(dep = 'd2' OR id = 1)" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2")));
+
+ checkFilters(
+ "dep LIKE 'd1%' AND id = 1" /* query predicate */,
+ "isnotnull(id) AND (id = 1)" /* Spark post scan filter */,
+ "dep IS NOT NULL, id IS NOT NULL, dep LIKE 'd1%', id = 1" /* Iceberg
scan filters */,
+ ImmutableList.of(row(1, 100, "d1")));
+
+ checkFilters(
+ "dep NOT LIKE 'd5%' AND (id = 1 OR id = 5)" /* query predicate */,
+ "(id = 1) OR (id = 5)" /* Spark post scan filter */,
+ "dep IS NOT NULL, NOT (dep LIKE 'd5%'), (id = 1 OR id = 5)" /* Iceberg
scan filters */,
+ ImmutableList.of(row(1, 100, "d1")));
+
+ checkFilters(
+ "dep LIKE '%d5' AND id IN (1, 5)" /* query predicate */,
+ "EndsWith(dep, d5) AND id IN (1,5)" /* Spark post scan filter */,
+ "dep IS NOT NULL, id IN (1, 5)" /* Iceberg scan filters */,
+ ImmutableList.of(row(5, 500, "d5")));
+ }
+
+ @Test
+ public void testFilterPushdownWithHoursTransform() {
+ sql(
+ "CREATE TABLE %s (id INT, price INT, t TIMESTAMP)"
+ + "USING iceberg "
+ + "PARTITIONED BY (hours(t))",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP
'2021-06-30T01:00:00.000Z')", tableName);
+ sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP
'2021-06-30T02:00:00.000Z')", tableName);
+ sql("INSERT INTO %s VALUES (3, 300, null)", tableName);
+
+ withDefaultTimeZone(
+ "UTC",
+ () -> {
+ checkOnlyIcebergFilters(
+ "t IS NULL" /* query predicate */,
+ "t IS NULL" /* Iceberg scan filters */,
+ ImmutableList.of(row(3, 300, null)));
+
+ // strict/inclusive projections for t < TIMESTAMP
'2021-06-30T02:00:00.000Z' are equal,
+ // so this filter selects entire partitions and can be pushed down
completely
+ checkOnlyIcebergFilters(
+ "t < TIMESTAMP '2021-06-30T02:00:00.000Z'" /* query predicate */,
+ "t IS NOT NULL, t < 1625018400000000" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100,
timestamp("2021-06-30T01:00:00.0Z"))));
+
+ // strict/inclusive projections for t < TIMESTAMP
'2021-06-30T01:00:00.001Z' differ,
+ // so this filter does NOT select entire partitions and can't be
pushed down completely
+ checkFilters(
+ "t < TIMESTAMP '2021-06-30T01:00:00.001Z'" /* query predicate */,
+ "t < 2021-06-30 01:00:00.001" /* Spark post scan filter */,
+ "t IS NOT NULL, t < 1625014800001000" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100,
timestamp("2021-06-30T01:00:00.0Z"))));
+
+ // strict/inclusive projections for t <= TIMESTAMP
'2021-06-30T01:00:00.000Z' differ,
+ // so this filter does NOT select entire partitions and can't be
pushed down completely
+ checkFilters(
+ "t <= TIMESTAMP '2021-06-30T01:00:00.000Z'" /* query predicate
*/,
+ "t <= 2021-06-30 01:00:00" /* Spark post scan filter */,
+ "t IS NOT NULL, t <= 1625014800000000" /* Iceberg scan filters
*/,
+ ImmutableList.of(row(1, 100,
timestamp("2021-06-30T01:00:00.0Z"))));
+ });
+ }
+
+ @Test
+ public void testFilterPushdownWithDaysTransform() {
+ sql(
+ "CREATE TABLE %s (id INT, price INT, t TIMESTAMP)"
+ + "USING iceberg "
+ + "PARTITIONED BY (days(t))",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP
'2021-06-15T01:00:00.000Z')", tableName);
+ sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP
'2021-06-30T02:00:00.000Z')", tableName);
+ sql("INSERT INTO %s VALUES (3, 300, TIMESTAMP
'2021-07-15T10:00:00.000Z')", tableName);
+ sql("INSERT INTO %s VALUES (4, 400, null)", tableName);
+
+ withDefaultTimeZone(
+ "UTC",
+ () -> {
+ checkOnlyIcebergFilters(
+ "t IS NULL" /* query predicate */,
+ "t IS NULL" /* Iceberg scan filters */,
+ ImmutableList.of(row(4, 400, null)));
+
+ // strict/inclusive projections for t < TIMESTAMP
'2021-07-05T00:00:00.000Z' are equal,
+ // so this filter selects entire partitions and can be pushed down
completely
+ checkOnlyIcebergFilters(
+ "t < TIMESTAMP '2021-07-05T00:00:00.000Z'" /* query predicate */,
+ "t IS NOT NULL, t < 1625443200000000" /* Iceberg scan filters */,
+ ImmutableList.of(
+ row(1, 100, timestamp("2021-06-15T01:00:00.000Z")),
+ row(2, 200, timestamp("2021-06-30T02:00:00.000Z"))));
+
+ // strict/inclusive projections for t < TIMESTAMP
'2021-06-30T03:00:00.000Z' differ,
+ // so this filter does NOT select entire partitions and can't be
pushed down completely
+ checkFilters(
+ "t < TIMESTAMP '2021-06-30T03:00:00.000Z'" /* query predicate */,
+ "t < 2021-06-30 03:00:00" /* Spark post scan filter */,
+ "t IS NOT NULL, t < 1625022000000000" /* Iceberg scan filters */,
+ ImmutableList.of(
+ row(1, 100, timestamp("2021-06-15T01:00:00.000Z")),
+ row(2, 200, timestamp("2021-06-30T02:00:00.000Z"))));
+ });
+ }
+
+ @Test
+ public void testFilterPushdownWithMonthsTransform() {
+ sql(
+ "CREATE TABLE %s (id INT, price INT, t TIMESTAMP)"
+ + "USING iceberg "
+ + "PARTITIONED BY (months(t))",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP
'2021-06-30T01:00:00.000Z')", tableName);
+ sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP
'2021-06-30T02:00:00.000Z')", tableName);
+ sql("INSERT INTO %s VALUES (3, 300, TIMESTAMP
'2021-07-15T10:00:00.000Z')", tableName);
+ sql("INSERT INTO %s VALUES (4, 400, null)", tableName);
+
+ withDefaultTimeZone(
+ "UTC",
+ () -> {
+ checkOnlyIcebergFilters(
+ "t IS NULL" /* query predicate */,
+ "t IS NULL" /* Iceberg scan filters */,
+ ImmutableList.of(row(4, 400, null)));
+
+ // strict/inclusive projections for t < TIMESTAMP
'2021-07-01T00:00:00.000Z' are equal,
+ // so this filter selects entire partitions and can be pushed down
completely
+ checkOnlyIcebergFilters(
+ "t < TIMESTAMP '2021-07-01T00:00:00.000Z'" /* query predicate */,
+ "t IS NOT NULL, t < 1625097600000000" /* Iceberg scan filters */,
+ ImmutableList.of(
+ row(1, 100, timestamp("2021-06-30T01:00:00.000Z")),
+ row(2, 200, timestamp("2021-06-30T02:00:00.000Z"))));
+
+ // strict/inclusive projections for t < TIMESTAMP
'2021-06-30T03:00:00.000Z' differ,
+ // so this filter does NOT select entire partitions and can't be
pushed down completely
+ checkFilters(
+ "t < TIMESTAMP '2021-06-30T03:00:00.000Z'" /* query predicate */,
+ "t < 2021-06-30 03:00:00" /* Spark post scan filter */,
+ "t IS NOT NULL, t < 1625022000000000" /* Iceberg scan filters */,
+ ImmutableList.of(
+ row(1, 100, timestamp("2021-06-30T01:00:00.000Z")),
+ row(2, 200, timestamp("2021-06-30T02:00:00.000Z"))));
+ });
+ }
+
+ @Test
+ public void testFilterPushdownWithYearsTransform() {
+ sql(
+ "CREATE TABLE %s (id INT, price INT, t TIMESTAMP)"
+ + "USING iceberg "
+ + "PARTITIONED BY (years(t))",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP
'2021-06-30T01:00:00.000Z')", tableName);
+ sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP
'2021-06-30T02:00:00.000Z')", tableName);
+ sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP
'2022-09-25T02:00:00.000Z')", tableName);
+ sql("INSERT INTO %s VALUES (3, 300, null)", tableName);
+
+ withDefaultTimeZone(
+ "UTC",
+ () -> {
+ checkOnlyIcebergFilters(
+ "t IS NULL" /* query predicate */,
+ "t IS NULL" /* Iceberg scan filters */,
+ ImmutableList.of(row(3, 300, null)));
+
+ // strict/inclusive projections for t < TIMESTAMP
'2022-01-01T00:00:00.000Z' are equal,
+ // so this filter selects entire partitions and can be pushed down
completely
+ checkOnlyIcebergFilters(
+ "t < TIMESTAMP '2022-01-01T00:00:00.000Z'" /* query predicate */,
+ "t IS NOT NULL, t < 1640995200000000" /* Iceberg scan filters */,
+ ImmutableList.of(
+ row(1, 100, timestamp("2021-06-30T01:00:00.000Z")),
+ row(2, 200, timestamp("2021-06-30T02:00:00.000Z"))));
+
+ // strict/inclusive projections for t < TIMESTAMP
'2021-06-30T03:00:00.000Z' differ,
+ // so this filter does NOT select entire partitions and can't be
pushed down completely
+ checkFilters(
+ "t < TIMESTAMP '2021-06-30T03:00:00.000Z'" /* query predicate */,
+ "t < 2021-06-30 03:00:00" /* Spark post scan filter */,
+ "t IS NOT NULL, t < 1625022000000000" /* Iceberg scan filters */,
+ ImmutableList.of(
+ row(1, 100, timestamp("2021-06-30T01:00:00.000Z")),
+ row(2, 200, timestamp("2021-06-30T02:00:00.000Z"))));
+ });
+ }
+
+ @Test
+ public void testFilterPushdownWithBucketTransform() {
+ sql(
+ "CREATE TABLE %s (id INT, salary INT, dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep, bucket(8, id))",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
+ sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
+
+ checkFilters(
+ "dep = 'd1' AND id = 1" /* query predicate */,
+ "id = 1" /* Spark post scan filter */,
+ "dep IS NOT NULL, id IS NOT NULL, dep = 'd1'" /* Iceberg scan filters
*/,
+ ImmutableList.of(row(1, 100, "d1")));
+ }
+
+ @Test
+ public void testFilterPushdownWithTruncateTransform() {
+ sql(
+ "CREATE TABLE %s (id INT, salary INT, dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (truncate(1, dep))",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
+ sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
+ sql("INSERT INTO %s VALUES (3, 300, 'a3')", tableName);
+
+ checkOnlyIcebergFilters(
+ "dep LIKE 'd%'" /* query predicate */,
+ "dep IS NOT NULL, dep LIKE 'd%'" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2")));
+
+ checkFilters(
+ "dep = 'd1'" /* query predicate */,
+ "dep = d1" /* Spark post scan filter */,
+ "dep IS NOT NULL" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100, "d1")));
+ }
+
+ @Test
+ public void testFilterPushdownWithSpecEvolutionAndIdentityTransforms() {
+ sql(
+ "CREATE TABLE %s (id INT, salary INT, dep STRING, sub_dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep)",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql("INSERT INTO %s VALUES (1, 100, 'd1', 'sd1')", tableName);
+
+ // the filter can be pushed completely because all specs include
identity(dep)
+ checkOnlyIcebergFilters(
+ "dep = 'd1'" /* query predicate */,
+ "dep IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100, "d1", "sd1")));
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.updateSpec().addField("sub_dep").commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO %s VALUES (2, 200, 'd2', 'sd2')", tableName);
+
+ // the filter can be pushed completely because all specs include
identity(dep)
+ checkOnlyIcebergFilters(
+ "dep = 'd1'" /* query predicate */,
+ "dep IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100, "d1", "sd1")));
+
+ table.updateSpec().removeField("sub_dep").removeField("dep").commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO %s VALUES (3, 300, 'd3', 'sd3')", tableName);
+
+ // the filter can't be pushed completely because not all specs include
identity(dep)
+ checkFilters(
+ "dep = 'd1'" /* query predicate */,
+ "isnotnull(dep) AND (dep = d1)" /* Spark post scan filter */,
+ "dep IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100, "d1", "sd1")));
+ }
+
+ @Test
+ public void testFilterPushdownWithSpecEvolutionAndTruncateTransform() {
+ sql(
+ "CREATE TABLE %s (id INT, salary INT, dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (truncate(2, dep))",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
+
+ // the filter can be pushed completely because the current spec supports it
+ checkOnlyIcebergFilters(
+ "dep LIKE 'd1%'" /* query predicate */,
+ "dep IS NOT NULL, dep LIKE 'd1%'" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100, "d1")));
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ table
+ .updateSpec()
+ .removeField(Expressions.truncate("dep", 2))
+ .addField(Expressions.truncate("dep", 1))
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
+
+ // the filter can be pushed completely because both specs support it
+ checkOnlyIcebergFilters(
+ "dep LIKE 'd%'" /* query predicate */,
+ "dep IS NOT NULL, dep LIKE 'd%'" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2")));
+
+ // the filter can't be pushed completely because the second spec is
truncate(dep, 1) and
+ // the predicate literal is d1, which is two chars
+ checkFilters(
+ "dep LIKE 'd1%' AND id = 1" /* query predicate */,
+ "(isnotnull(id) AND StartsWith(dep, d1)) AND (id = 1)" /* Spark post
scan filter */,
+ "dep IS NOT NULL, id IS NOT NULL, dep LIKE 'd1%', id = 1" /* Iceberg
scan filters */,
+ ImmutableList.of(row(1, 100, "d1")));
+ }
+
+ @Test
+ public void testFilterPushdownWithSpecEvolutionAndTimeTransforms() {
+ sql(
+ "CREATE TABLE %s (id INT, price INT, t TIMESTAMP)"
+ + "USING iceberg "
+ + "PARTITIONED BY (hours(t))",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ withDefaultTimeZone(
+ "UTC",
+ () -> {
+ sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP
'2021-06-30T01:00:00.000Z')", tableName);
+
+ // the filter can be pushed completely because the current spec
supports it
+ checkOnlyIcebergFilters(
+ "t < TIMESTAMP '2021-07-01T00:00:00.000Z'" /* query predicate */,
+ "t IS NOT NULL, t < 1625097600000000" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100,
timestamp("2021-06-30T01:00:00.000Z"))));
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ table
+ .updateSpec()
+ .removeField(Expressions.hour("t"))
+ .addField(Expressions.month("t"))
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP
'2021-05-30T01:00:00.000Z')", tableName);
+
+ // the filter can be pushed completely because both specs support it
+ checkOnlyIcebergFilters(
+ "t < TIMESTAMP '2021-06-01T00:00:00.000Z'" /* query predicate */,
+ "t IS NOT NULL, t < 1622505600000000" /* Iceberg scan filters */,
+ ImmutableList.of(row(2, 200,
timestamp("2021-05-30T01:00:00.000Z"))));
+ });
+ }
+
+ @Test
+ public void testFilterPushdownWithSpecialFloatingPointPartitionValues() {
+ sql(
+ "CREATE TABLE %s (id INT, salary DOUBLE)" + "USING iceberg " +
"PARTITIONED BY (salary)",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql("INSERT INTO %s VALUES (1, 100.5)", tableName);
+ sql("INSERT INTO %s VALUES (2, double('NaN'))", tableName);
+ sql("INSERT INTO %s VALUES (3, double('infinity'))", tableName);
+ sql("INSERT INTO %s VALUES (4, double('-infinity'))", tableName);
+
+ checkOnlyIcebergFilters(
+ "salary = 100.5" /* query predicate */,
+ "salary IS NOT NULL, salary = 100.5" /* Iceberg scan filters */,
+ ImmutableList.of(row(1, 100.5)));
+
+ checkOnlyIcebergFilters(
+ "salary = double('NaN')" /* query predicate */,
+ "salary IS NOT NULL, is_nan(salary)" /* Iceberg scan filters */,
+ ImmutableList.of(row(2, Double.NaN)));
+
+ checkOnlyIcebergFilters(
+ "salary != double('NaN')" /* query predicate */,
+ "salary IS NOT NULL, NOT (is_nan(salary))" /* Iceberg scan filters */,
+ ImmutableList.of(
+ row(1, 100.5), row(3, Double.POSITIVE_INFINITY), row(4,
Double.NEGATIVE_INFINITY)));
+
+ checkOnlyIcebergFilters(
+ "salary = double('infinity')" /* query predicate */,
+ "salary IS NOT NULL, salary = Infinity" /* Iceberg scan filters */,
+ ImmutableList.of(row(3, Double.POSITIVE_INFINITY)));
+
+ checkOnlyIcebergFilters(
+ "salary = double('-infinity')" /* query predicate */,
+ "salary IS NOT NULL, salary = -Infinity" /* Iceberg scan filters */,
+ ImmutableList.of(row(4, Double.NEGATIVE_INFINITY)));
+ }
+
+ private void checkOnlyIcebergFilters(
+ String predicate, String icebergFilters, List<Object[]> expectedRows) {
+
+ checkFilters(predicate, null, icebergFilters, expectedRows);
+ }
+
+ private void checkFilters(
+ String predicate, String sparkFilter, String icebergFilters,
List<Object[]> expectedRows) {
+
+ Action check =
+ () -> {
+ assertEquals(
+ "Rows must match",
+ expectedRows,
+ sql("SELECT * FROM %s WHERE %s ORDER BY id", tableName,
predicate));
+ };
+ SparkPlan sparkPlan = executeAndKeepPlan(check);
+ String planAsString = sparkPlan.toString().replaceAll("#(\\d+L?)", "");
+
+ if (sparkFilter != null) {
+ // The plan is different
+ assertThat(planAsString)
+ .as("Post scan filter should match")
+ .contains("FilterExecTransformer (" + sparkFilter + ")");
+ } else {
+ assertThat(planAsString).as("Should be no post scan
filter").doesNotContain("Filter (");
+ }
+
+ assertThat(planAsString)
+ .as("Pushed filters must match")
+ .contains("[filters=" + icebergFilters + ",");
+ }
+
+ private Timestamp timestamp(String timestampAsString) {
+ return Timestamp.from(Instant.parse(timestampAsString));
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/extensions/SparkExtensionsTestBase.java
b/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/extensions/SparkExtensionsTestBase.java
new file mode 100644
index 0000000000..a571c6c817
--- /dev/null
+++
b/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/extensions/SparkExtensionsTestBase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import org.apache.gluten.spark34.TestConfUtil;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.BeforeClass;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+
+public abstract class SparkExtensionsTestBase extends SparkCatalogTestBase {
+
+ private static final Random RANDOM = ThreadLocalRandom.current();
+
+ public SparkExtensionsTestBase(
+ String catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @BeforeClass
+ public static void startMetastoreAndSpark() {
+ SparkTestBase.metastore = new TestHiveMetastore();
+ metastore.start();
+ SparkTestBase.hiveConf = metastore.hiveConf();
+
+ SparkTestBase.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.testing", "true")
+ .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
+ .config("spark.sql.extensions",
IcebergSparkSessionExtensions.class.getName())
+ .config("spark.hadoop." + METASTOREURIS.varname,
hiveConf.get(METASTOREURIS.varname))
+ .config("spark.sql.shuffle.partitions", "4")
+
.config("spark.sql.hive.metastorePartitionPruningFallbackOnException", "true")
+
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
+ .config(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
String.valueOf(RANDOM.nextBoolean()))
+ .config(TestConfUtil.GLUTEN_CONF)
+ .enableHiveSupport()
+ .getOrCreate();
+
+ SparkTestBase.catalog =
+ (HiveCatalog)
+ CatalogUtil.loadCatalog(
+ HiveCatalog.class.getName(), "hive", ImmutableMap.of(),
hiveConf);
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
b/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 7aa39f6a61..9fb2654fc7 100644
---
a/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++
b/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -16,6 +16,7 @@
*/
package org.apache.iceberg.spark.source;
+import org.apache.gluten.config.GlutenConfig;
import org.apache.gluten.spark34.TestConfUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -403,6 +404,25 @@ public class TestSparkReaderDeletes extends
DeleteReadTests {
assertThat(actual).as("Table should contain expected
row").isEqualTo(expected);
checkDeleteCount(3L);
+ // TODO, the query fallbacks because not supports equality delete.
+ // Error Source: RUNTIME
+ //Error Code: NOT_IMPLEMENTED
+ //Retriable: False
+ //Context: Split [Hive:
/var/folders/63/845y6pk53dx_83hpw8ztdchw0000gn/T/junit-17345315326614809092/junit4173952394189821024.tmp
4 - 647] Task Gluten_Stage_5_TID_5_VTID_1
+ //Additional Context: Operator: TableScan[0] 0
+ //Function: prepareSplit
+ //File:
/Users/chengchengjin/code/gluten/ep/build-velox/build/velox_ep/velox/connectors/hive/iceberg/IcebergSplitReader.cpp
+ //Line: 95
+ //Stack trace:
+ // Check the table query data because above query is fallback by column
_deleted.
+ // This query is fallback by equality delete files, remove this check
after equality reader is supported.
+ StructLikeSet actualWithoutMetadata =
+ rowSet(tableName, PROJECTION_SCHEMA_WITHOUT_DELETED.asStruct(),
"id", "data");
+ spark.conf().set(GlutenConfig.GLUTEN_ENABLED().key(), "false");
+ StructLikeSet expectWithoutMetadata =
+ rowSet(tableName, PROJECTION_SCHEMA_WITHOUT_DELETED.asStruct(),
"id", "data");
+ assertThat(actualWithoutMetadata).as("Table should contain expected
row").isEqualTo(expectWithoutMetadata);
+ spark.conf().set(GlutenConfig.GLUTEN_ENABLED().key(), "true");
}
@TestTemplate
@@ -603,6 +623,11 @@ public class TestSparkReaderDeletes extends
DeleteReadTests {
required(2, "data", Types.StringType.get()),
MetadataColumns.IS_DELETED);
+ private static final Schema PROJECTION_SCHEMA_WITHOUT_DELETED =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ required(2, "data", Types.StringType.get()));
+
private static StructLikeSet expectedRowSet(int... idsToRemove) {
return expectedRowSet(false, false, idsToRemove);
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 9e3837b062..23fec4bbdc 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -566,4 +566,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true
+ override def supportIcebergEqualityDeleteRead(): Boolean = false
+
}
diff --git a/docs/get-started/VeloxIceberg.md b/docs/get-started/VeloxIceberg.md
index c02966b40e..82850acf95 100644
--- a/docs/get-started/VeloxIceberg.md
+++ b/docs/get-started/VeloxIceberg.md
@@ -42,19 +42,42 @@ INSERT INTO local.db.table SELECT id, data FROM source
WHERE length(data) = 1;
````
## Reading
-Offload
+### Read data
+Offload/Fallback
+
+| Table Type | No Delete | Position Delete | Equality Delete |
+|-------------|-----------------|-----------------|-----------------|
+| unpartition | Offload | Offload | Fallback |
+| partition | Fallback mostly | Fallback mostly | Fallback |
+| metadata | Fallback | | |
+
+Offload the simple query.
````
SELECT count(1) as count, data
FROM local.db.table
GROUP BY data;
````
+If delete by Spark and copy on read, will generate position delete file, the
query may offload.
+
+If delete by Flink, may generate the equality delete file, fallback in tht
case.
+
+Now we only offload the simple query, for partition table, many operators are
fallback by Expression
+StaticInvoke such as BucketFunction, wait to be supported.
+
DataFrame reads are supported and can now reference tables by name using
spark.table:
+
````
val df = spark.table("local.db.table")
df.count()
````
+### Read metadata
+Fallback
+````
+SELECT data, _file FROM local.db.table;
+````
+
## DataType
Timestamptz in orc format is not supported, throws exception.
UUID type and Fixed type is fallback.
diff --git
a/gluten-iceberg/src-iceberg/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
b/gluten-iceberg/src-iceberg/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 4a406a97b5..920151e5f9 100644
---
a/gluten-iceberg/src-iceberg/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++
b/gluten-iceberg/src-iceberg/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -16,12 +16,13 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.backendsapi.BackendsApiManager
import
org.apache.gluten.execution.IcebergScanTransformer.{containsMetadataColumn,
containsUuidOrFixedType}
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.SplitInfo
-import org.apache.iceberg.BaseTable
+import org.apache.iceberg.{BaseTable, MetadataColumns, SnapshotSummary}
import org.apache.iceberg.spark.source.{GlutenIcebergSourceUtil, SparkTable}
import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.TypeID
@@ -55,20 +56,41 @@ case class IcebergScanTransformer(
}
override def doValidateInternal(): ValidationResult = {
- if (!super.doValidateInternal().ok()) {
- return ValidationResult.failed(s"Unsupported scan $scan")
+ val validationResult = super.doValidateInternal();
+ if (!validationResult.ok()) {
+ return validationResult
}
- val notSupport = table match {
- case t: SparkTable => t.table() match {
- case t: BaseTable => t.operations().current().schema().columns().stream
- .anyMatch(c => containsUuidOrFixedType(c.`type`()) ||
containsMetadataColumn(c))
+
+ if (!BackendsApiManager.getSettings.supportIcebergEqualityDeleteRead()) {
+ val notSupport = table match {
+ case t: SparkTable => t.table() match {
+ case t: BaseTable =>
t.operations().current().schema().columns().stream
+ .anyMatch(c => containsUuidOrFixedType(c.`type`()) ||
containsMetadataColumn(c))
+ case _ => false
+ }
case _ => false
}
- case _ => false
- }
- if (notSupport) {
- return ValidationResult.failed("Contains not supported data type or
metadata column")
+ if (notSupport) {
+ return ValidationResult.failed("Contains not supported data type or
metadata column")
+ }
+ // Delete from command read the _file metadata, which may be not
successful.
+ val readMetadata = scan.readSchema().fieldNames.exists(f =>
MetadataColumns.isMetadataColumn(f))
+ if (readMetadata) {
+ return ValidationResult.failed(s"Read the metadata column")
+ }
+ val containsEqualityDelete = table match {
+ case t: SparkTable => t.table() match {
+ case t: BaseTable =>
t.operations().current().currentSnapshot().summary()
+ .getOrDefault(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0").toInt > 0
+ case _ => false
+ }
+ case _ => false
+ }
+ if (containsEqualityDelete) {
+ return ValidationResult.failed("Contains equality delete files")
+ }
}
+
ValidationResult.succeeded
}
diff --git
a/gluten-iceberg/src-iceberg/test/scala/org/apache/gluten/execution/IcebergSuite.scala
b/gluten-iceberg/src-iceberg/test/scala/org/apache/gluten/execution/IcebergSuite.scala
index 82aebad5f4..96fbdb5424 100644
---
a/gluten-iceberg/src-iceberg/test/scala/org/apache/gluten/execution/IcebergSuite.scala
+++
b/gluten-iceberg/src-iceberg/test/scala/org/apache/gluten/execution/IcebergSuite.scala
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.config.GlutenConfig
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
@@ -63,7 +62,6 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
- withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
spark.sql(s"""
|create table $leftTable(id int, name string, p string)
@@ -80,12 +78,8 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
|(3, 'a4', 'p3');
|""".stripMargin
)
- }
// Partition key of integer type.
- withSQLConf(
- GlutenConfig.GLUTEN_ENABLED.key -> "false"
- ) {
// Gluten does not support write iceberg table.
spark.sql(s"""
|create table $rightTable(id int, name string, p int)
@@ -102,7 +96,6 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
|(1, 'b1', 21);
|""".stripMargin
)
- }
withSQLConf(
"spark.sql.sources.v2.bucketing.enabled" -> "true",
@@ -143,7 +136,6 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
- withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
spark.sql(s"""
|create table $leftTable(id int, name string, p int)
@@ -160,12 +152,8 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
|(3, 'a4', 2);
|""".stripMargin
)
- }
// Partition key of integer type.
- withSQLConf(
- GlutenConfig.GLUTEN_ENABLED.key -> "false"
- ) {
// Gluten does not support write iceberg table.
spark.sql(s"""
|create table $rightTable(id int, name string, p int)
@@ -182,7 +170,6 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
|(1, 'b1', 1);
|""".stripMargin
)
- }
withSQLConf(
"spark.sql.sources.v2.bucketing.enabled" -> "true",
@@ -223,7 +210,6 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
val leftTable = "p_str_tb"
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
- withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
spark.sql(s"""
|create table $leftTable(id int, name string, p string)
@@ -261,7 +247,6 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
|(1, 'b1', 21);
|""".stripMargin
)
- }
withSQLConf(
"spark.sql.sources.v2.bucketing.enabled" -> "true",
@@ -301,7 +286,6 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
val leftTable = "p_str_tb"
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
- withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
spark.sql(s"""
|create table $leftTable(id int, name string, p string)
@@ -339,7 +323,6 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
|(1, 'b1', 21);
|""".stripMargin
)
- }
withSQLConf(
"spark.sql.sources.v2.bucketing.enabled" -> "true",
@@ -379,7 +362,6 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
- withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
spark.sql(s"""
|create table $leftTable(id int, name string, p int)
@@ -396,12 +378,9 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
|(3, 'a4', 2);
|""".stripMargin
)
- }
// Partition key of integer type.
- withSQLConf(
- GlutenConfig.GLUTEN_ENABLED.key -> "false"
- ) {
+
// Gluten does not support write iceberg table.
spark.sql(s"""
|create table $rightTable(id int, name string, p int)
@@ -418,7 +397,6 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
|(1, 'b1', 1);
|""".stripMargin
)
- }
withSQLConf(
"spark.sql.sources.v2.bucketing.enabled" -> "true",
@@ -494,51 +472,53 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
test("iceberg read mor table - delete and update") {
withTable("iceberg_mor_tb") {
- withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
- spark.sql("""
- |create table iceberg_mor_tb (
- | id int,
- | name string,
- | p string
- |) using iceberg
- |tblproperties (
- | 'format-version' = '2',
- | 'write.delete.mode' = 'merge-on-read',
- | 'write.update.mode' = 'merge-on-read',
- | 'write.merge.mode' = 'merge-on-read'
- |)
- |partitioned by (p);
- |""".stripMargin)
-
- // Insert some test rows.
- spark.sql("""
- |insert into table iceberg_mor_tb
- |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
- | (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
- |""".stripMargin)
-
- // Delete row.
- spark.sql(
- """
- |delete from iceberg_mor_tb where name = 'a1';
- |""".stripMargin
- )
- // Update row.
- spark.sql(
- """
- |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
- |""".stripMargin
- )
- // Delete row again.
- spark.sql(
- """
- |delete from iceberg_mor_tb where id = 6;
- |""".stripMargin
- )
- }
- runQueryAndCompare("""
- |select * from iceberg_mor_tb;
- |""".stripMargin) {
+ spark.sql(
+ """
+ |create table iceberg_mor_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '2',
+ | 'write.delete.mode' = 'merge-on-read',
+ | 'write.update.mode' = 'merge-on-read',
+ | 'write.merge.mode' = 'merge-on-read'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
+
+ // Insert some test rows.
+ spark.sql(
+ """
+ |insert into table iceberg_mor_tb
+ |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
+ | (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
+ |""".stripMargin)
+
+ // Delete row.
+ spark.sql(
+ """
+ |delete from iceberg_mor_tb where name = 'a1';
+ |""".stripMargin
+ )
+ // Update row.
+ spark.sql(
+ """
+ |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
+ |""".stripMargin
+ )
+ // Delete row again.
+ spark.sql(
+ """
+ |delete from iceberg_mor_tb where id = 6;
+ |""".stripMargin
+ )
+
+ runQueryAndCompare(
+ """
+ |select * from iceberg_mor_tb;
+ |""".stripMargin) {
checkGlutenOperatorMatch[IcebergScanTransformer]
}
}
@@ -546,7 +526,6 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
test("iceberg read mor table - merge into") {
withTable("iceberg_mor_tb", "merge_into_source_tb") {
- withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
spark.sql("""
|create table iceberg_mor_tb (
| id int,
@@ -605,7 +584,6 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
| insert (id, name, p) values (s.id, s.name, s.p);
|""".stripMargin
)
- }
runQueryAndCompare("""
|select * from iceberg_mor_tb;
|""".stripMargin) {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index c8d72c0af6..937d503c1f 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -155,4 +155,6 @@ trait BackendSettingsApi {
def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false
+ def supportIcebergEqualityDeleteRead(): Boolean = true
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]