This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b08258e51 [VL][GLUTEN-5362] Enable iceberg tpch partitioned table test
(#5373)
b08258e51 is described below
commit b08258e51cde7dd657a82d67e2d854df5570144d
Author: Joey <[email protected]>
AuthorDate: Sat Apr 20 19:49:53 2024 +0800
[VL][GLUTEN-5362] Enable iceberg tpch partitioned table test (#5373)
* Restore iceberg tpch partitioned table test
* Fix oom
* Disable FANOUT
---
.../gluten/execution/VeloxTPCHIcebergSuite.scala | 18 ++++++++----------
1 file changed, 8 insertions(+), 10 deletions(-)
diff --git
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
index f997693f8..22a5b6b70 100644
---
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
+++
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
@@ -17,14 +17,12 @@
package org.apache.gluten.execution
import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.col
import org.apache.iceberg.spark.SparkWriteOptions
-import org.scalatest.Ignore
import java.io.File
-// Ignored due to failures, see
https://github.com/apache/incubator-gluten/issues/5362
-@Ignore
class VeloxTPCHIcebergSuite extends VeloxTPCHSuite {
protected val tpchBasePath: String = new File(
@@ -97,21 +95,21 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite {
}
}
-// Ignored due to failures, see
https://github.com/apache/incubator-gluten/issues/5362
-@Ignore
class VeloxPartitionedTableTPCHIcebergSuite extends VeloxTPCHIcebergSuite {
override protected def createTPCHNotNullTables(): Unit = {
TPCHTables.map {
table =>
val tablePath = new File(resourcePath, table.name).getAbsolutePath
- val tableDF = spark.read.format(fileFormat).load(tablePath)
+ val tableDF = spark.read
+ .format(fileFormat)
+ .load(tablePath)
+ .repartition(table.partitionColumns.map(col): _*)
+ .sortWithinPartitions(table.partitionColumns.map(col): _*)
- tableDF
- .repartition(800)
- .write
+ tableDF.write
.format("iceberg")
.partitionBy(table.partitionColumns: _*)
- .option(SparkWriteOptions.FANOUT_ENABLED, "true")
+ .option(SparkWriteOptions.FANOUT_ENABLED, "false")
.mode("overwrite")
.saveAsTable(table.name)
(table.name, tableDF)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]