This is an automated email from the ASF dual-hosted git repository.

yma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ff2fd4dd [GLUTEN-6509] enable read iceberg table with timestamptz as 
partitioned column. (#6508)
2ff2fd4dd is described below

commit 2ff2fd4dd7dbd2261bf49f39c5e69b8c2b0c201a
Author: j7nhai <[email protected]>
AuthorDate: Mon Jul 22 10:47:12 2024 +0800

    [GLUTEN-6509] enable read iceberg table with timestamptz as partitioned 
column. (#6508)
    
    * fix timestamp tz.
    
    * fix styles.
    
    * fix styles.
    
    * fix logs.
    
    * add iceberg tests.
    
    * pr ready.
    
    * delete logging
    
    * fix test sql
    
    * ignore 3.4
    
    * format
    
    * scalastyle.
    
    * fix test.
---
 .../org/apache/spark/sql/GlutenQueryTest.scala     |  9 +++++++
 .../spark/source/GlutenIcebergSourceUtil.scala     | 12 ++++-----
 .../gluten/execution/VeloxIcebergSuite.scala       | 31 ++++++++++++++++++++++
 3 files changed, 46 insertions(+), 6 deletions(-)

diff --git 
a/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala 
b/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
index ab30cb14e..53abaa9ac 100644
--- a/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
+++ b/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
@@ -95,6 +95,15 @@ abstract class GlutenQueryTest extends PlanTest {
     }
   }
 
+  def testWithSpecifiedSparkVersion(testName: String, versions: Array[String])(
+      testFun: => Any): Unit = {
+    if (versions.exists(v => shouldRun(Some(v), Some(v)))) {
+      test(testName) {
+        testFun
+      }
+    }
+  }
+
   /** Runs the plan and makes sure the answer contains all of the keywords. */
   def checkKeywordsExist(df: DataFrame, keywords: String*): Unit = {
     val outputs = df.collect().map(_.mkString).mkString
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
index 6b67e7636..ad8222cff 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
@@ -25,7 +25,8 @@ import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.connector.read.{InputPartition, Scan}
 import org.apache.spark.sql.types.StructType
 
-import org.apache.iceberg.{CombinedScanTask, DeleteFile, FileFormat, 
FileScanTask, ScanTask}
+import org.apache.iceberg.{CombinedScanTask, DeleteFile, FileFormat, 
FileScanTask, ScanTask, Schema}
+import org.apache.iceberg.spark.SparkSchemaUtil
 
 import java.lang.{Long => JLong}
 import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, 
Map => JMap}
@@ -104,7 +105,6 @@ object GlutenIcebergSourceUtil {
         task =>
           val spec = task.spec()
           if (spec.isPartitioned) {
-            var partitionSchema = new StructType()
             val readFields = scan.readSchema().fields.map(_.name).toSet
             // Iceberg will generate some non-table fields as partition 
fields, such as x_bucket,
             // which will not appear in readFields, they also cannot be 
filtered.
@@ -116,11 +116,11 @@ object GlutenIcebergSourceUtil {
                 .asScala
                 .filter(f => !tableFields.contains(f.name) || 
readFields.contains(f.name()))
             partitionFields.foreach {
-              field =>
-                TypeUtil.validatePartitionColumnType(field.`type`().typeId())
-                partitionSchema = partitionSchema.add(field.name(), 
field.`type`().toString)
+              field => 
TypeUtil.validatePartitionColumnType(field.`type`().typeId())
             }
-            return partitionSchema
+
+            val icebergSchema = new Schema(partitionFields.toList.asJava)
+            return SparkSchemaUtil.convert(icebergSchema)
           } else {
             return new StructType()
           }
diff --git 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
index 77a1c790b..bb604f534 100644
--- 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
+++ 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
@@ -457,4 +457,35 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite 
{
       }
     }
   }
+
+  // Spark configuration spark.sql.iceberg.handle-timestamp-without-timezone 
is not supported
+  // in Spark 3.4
+  testWithSpecifiedSparkVersion("iceberg partition type - timestamp", 
Array("3.2", "3.3", "3.5")) {
+    Seq("true", "false").foreach {
+      flag =>
+        withSQLConf(
+          "spark.sql.iceberg.handle-timestamp-without-timezone" -> flag,
+          "spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables" -> 
flag) {
+          withTable("part_by_timestamp") {
+            spark.sql("""
+                        |create table part_by_timestamp (
+                        |  p timestamp
+                        |) using iceberg
+                        |tblproperties (
+                        |  'format-version' = '1'
+                        |)
+                        |partitioned by (p);
+                        |""".stripMargin)
+
+            // Insert some test rows.
+            spark.sql("""
+                        |insert into table part_by_timestamp
+                        |values (TIMESTAMP '2022-01-01 00:01:20');
+                        |""".stripMargin)
+            val df = spark.sql("select * from part_by_timestamp")
+            checkAnswer(df, Row(java.sql.Timestamp.valueOf("2022-01-01 
00:01:20")) :: Nil)
+          }
+        }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to