This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f2884c6d2 Spark: Fix Decimal value conversion in V2 filters (#8682)
3f2884c6d2 is described below

commit 3f2884c6d2e514f1112bd440fedfbb7d30227a96
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Sep 29 12:38:36 2023 -0700

    Spark: Fix Decimal value conversion in V2 filters (#8682)
---
 .../org/apache/iceberg/spark/SparkV2Filters.java     |  3 +++
 .../apache/iceberg/spark/sql/TestFilterPushDown.java | 19 +++++++++++++++++++
 .../org/apache/iceberg/spark/SparkV2Filters.java     |  3 +++
 .../apache/iceberg/spark/sql/TestFilterPushDown.java | 20 ++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkV2Filters.java     |  3 +++
 .../apache/iceberg/spark/sql/TestFilterPushDown.java | 20 ++++++++++++++++++++
 6 files changed, 68 insertions(+)

diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
index 6d564bbd62..43c7cecfa1 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
@@ -49,6 +49,7 @@ import org.apache.spark.sql.connector.expressions.filter.And;
 import org.apache.spark.sql.connector.expressions.filter.Not;
 import org.apache.spark.sql.connector.expressions.filter.Or;
 import org.apache.spark.sql.connector.expressions.filter.Predicate;
+import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.types.UTF8String;
 
 public class SparkV2Filters {
@@ -285,6 +286,8 @@ public class SparkV2Filters {
   private static Object convertLiteral(Literal<?> literal) {
     if (literal.value() instanceof UTF8String) {
       return ((UTF8String) literal.value()).toString();
+    } else if (literal.value() instanceof Decimal) {
+      return ((Decimal) literal.value()).toJavaBigDecimal();
     }
     return literal.value();
   }
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
index 0ea34e187f..f1e2169af4 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark.sql;
 
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.List;
@@ -38,6 +39,24 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
     sql("DROP TABLE IF EXISTS tmp_view");
   }
 
+  @Test
+  public void testFilterPushdownWithDecimalValues() {
+    sql(
+        "CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)"
+            + "USING iceberg "
+            + "PARTITIONED BY (dep)",
+        tableName);
+
+    sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName);
+    sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName);
+
+    checkFilters(
+        "dep = 'd1' AND salary > 100.03" /* query predicate */,
+        "isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */,
+        "dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /* 
Iceberg scan filters */,
+        ImmutableList.of(row(2, new BigDecimal("100.05"), "d1")));
+  }
+
   @Test
   public void testFilterPushdownWithIdentityTransform() {
     sql(
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
index 06cb1335aa..57b9d61e38 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
@@ -64,6 +64,7 @@ import org.apache.spark.sql.connector.expressions.filter.And;
 import org.apache.spark.sql.connector.expressions.filter.Not;
 import org.apache.spark.sql.connector.expressions.filter.Or;
 import org.apache.spark.sql.connector.expressions.filter.Predicate;
+import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.types.UTF8String;
 
 public class SparkV2Filters {
@@ -378,6 +379,8 @@ public class SparkV2Filters {
   private static Object convertLiteral(Literal<?> literal) {
     if (literal.value() instanceof UTF8String) {
       return ((UTF8String) literal.value()).toString();
+    } else if (literal.value() instanceof Decimal) {
+      return ((Decimal) literal.value()).toJavaBigDecimal();
     }
     return literal.value();
   }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
index e7401a00e8..f2ef2d4705 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.sql;
 import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
 import static org.apache.iceberg.PlanningMode.LOCAL;
 
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.List;
@@ -56,6 +57,25 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
     sql("DROP TABLE IF EXISTS tmp_view");
   }
 
+  @Test
+  public void testFilterPushdownWithDecimalValues() {
+    sql(
+        "CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)"
+            + "USING iceberg "
+            + "PARTITIONED BY (dep)",
+        tableName);
+    configurePlanningMode(planningMode);
+
+    sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName);
+    sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName);
+
+    checkFilters(
+        "dep = 'd1' AND salary > 100.03" /* query predicate */,
+        "isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */,
+        "dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /* 
Iceberg scan filters */,
+        ImmutableList.of(row(2, new BigDecimal("100.05"), "d1")));
+  }
+
   @Test
   public void testFilterPushdownWithIdentityTransform() {
     sql(
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
index 06cb1335aa..57b9d61e38 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
@@ -64,6 +64,7 @@ import org.apache.spark.sql.connector.expressions.filter.And;
 import org.apache.spark.sql.connector.expressions.filter.Not;
 import org.apache.spark.sql.connector.expressions.filter.Or;
 import org.apache.spark.sql.connector.expressions.filter.Predicate;
+import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.types.UTF8String;
 
 public class SparkV2Filters {
@@ -378,6 +379,8 @@ public class SparkV2Filters {
   private static Object convertLiteral(Literal<?> literal) {
     if (literal.value() instanceof UTF8String) {
       return ((UTF8String) literal.value()).toString();
+    } else if (literal.value() instanceof Decimal) {
+      return ((Decimal) literal.value()).toJavaBigDecimal();
     }
     return literal.value();
   }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
index e7401a00e8..f2ef2d4705 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.sql;
 import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
 import static org.apache.iceberg.PlanningMode.LOCAL;
 
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.List;
@@ -56,6 +57,25 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
     sql("DROP TABLE IF EXISTS tmp_view");
   }
 
+  @Test
+  public void testFilterPushdownWithDecimalValues() {
+    sql(
+        "CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)"
+            + "USING iceberg "
+            + "PARTITIONED BY (dep)",
+        tableName);
+    configurePlanningMode(planningMode);
+
+    sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName);
+    sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName);
+
+    checkFilters(
+        "dep = 'd1' AND salary > 100.03" /* query predicate */,
+        "isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */,
+        "dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /* 
Iceberg scan filters */,
+        ImmutableList.of(row(2, new BigDecimal("100.05"), "d1")));
+  }
+
   @Test
   public void testFilterPushdownWithIdentityTransform() {
     sql(

Reply via email to