This is an automated email from the ASF dual-hosted git repository.

kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7dda6866d0  [GLUTEN-8385][VL] Support write compatible-hive bucket 
table for Spark3.4 and Spark3.5.(#8386)
7dda6866d0 is described below

commit 7dda6866d005deff4c0dd070727f890b2815a565
Author: Kaifei Yi <[email protected]>
AuthorDate: Wed Jan 8 09:27:21 2025 +0800

     [GLUTEN-8385][VL] Support write compatible-hive bucket table for Spark3.4 
and Spark3.5.(#8386)
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |   1 +
 .../gluten/backendsapi/velox/VeloxBackend.scala    |  14 ++-
 .../spark/sql/execution/BucketWriteUtils.scala     |  95 ++++++++++++++++++
 .../execution/VeloxParquetWriteForHiveSuite.scala  | 110 ++++++++++++++++++++-
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        |  36 ++++++-
 docs/developers/SubstraitModifications.md          |   1 +
 .../apache/gluten/substrait/rel/RelBuilder.java    |   8 +-
 .../apache/gluten/substrait/rel/WriteRelNode.java  |  15 ++-
 .../substrait/proto/substrait/algebra.proto        |  10 ++
 .../gluten/backendsapi/BackendSettingsApi.scala    |   1 +
 .../execution/WriteFilesExecTransformer.scala      |  15 ++-
 11 files changed, 288 insertions(+), 18 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 924f3b3170..aa9e3e553c 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -227,6 +227,7 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
       format: FileFormat,
       fields: Array[StructField],
       bucketSpec: Option[BucketSpec],
+      isPartitionedTable: Boolean,
       options: Map[String, String]): ValidationResult = {
 
     def validateCompressionCodec(): Option[String] = {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 4918a6eade..6c51ad484c 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -232,6 +232,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
       format: FileFormat,
       fields: Array[StructField],
       bucketSpec: Option[BucketSpec],
+      isPartitionedTable: Boolean,
       options: Map[String, String]): ValidationResult = {
 
     // Validate if HiveFileFormat write is supported based on output file type
@@ -331,10 +332,17 @@ object VeloxBackendSettings extends BackendSettingsApi {
     }
 
     def validateBucketSpec(): Option[String] = {
-      if (bucketSpec.nonEmpty) {
-        Some("Unsupported native write: bucket write is not supported.")
-      } else {
+      val isHiveCompatibleBucketTable = bucketSpec.nonEmpty && options
+        .getOrElse("__hive_compatible_bucketed_table_insertion__", "false")
+        .equals("true")
+      // Currently, the velox backend only supports bucketed tables compatible 
with Hive and
+      // is limited to partitioned tables. Therefore, we should add this 
condition restriction.
+      // After velox supports bucketed non-partitioned tables, we can remove 
the restriction on
+      // partitioned tables.
+      if (bucketSpec.isEmpty || (isHiveCompatibleBucketTable && 
isPartitionedTable)) {
         None
+      } else {
+        Some("Unsupported native write: non-compatible hive bucket write is 
not supported.")
       }
     }
 
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/BucketWriteUtils.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/BucketWriteUtils.scala
new file mode 100644
index 0000000000..a9fe8269e9
--- /dev/null
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/BucketWriteUtils.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{DataFrame, GlutenQueryTest}
+import org.apache.spark.sql.catalyst.expressions.{BitwiseAnd, Expression, 
HiveHash, Literal, Pmod, UnsafeProjection}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SQLTestUtils
+
+import java.io.File
+
+trait BucketWriteUtils extends GlutenQueryTest with SQLTestUtils {
+
+  def tableDir(table: String): File = {
+    val identifier = spark.sessionState.sqlParser.parseTableIdentifier(table)
+    new File(spark.sessionState.catalog.defaultTablePath(identifier))
+  }
+
+  protected def testBucketing(
+      dataDir: File,
+      source: String = "parquet",
+      numBuckets: Int,
+      bucketCols: Seq[String],
+      sortCols: Seq[String] = Nil,
+      inputDF: DataFrame,
+      bucketIdExpression: (Seq[Expression], Int) => Expression,
+      getBucketIdFromFileName: String => Option[Int]): Unit = {
+    val allBucketFiles =
+      dataDir.listFiles().filterNot(f => f.getName.startsWith(".") || 
f.getName.startsWith("_"))
+
+    for (bucketFile <- allBucketFiles) {
+      val bucketId = getBucketIdFromFileName(bucketFile.getName).getOrElse {
+        fail(s"Unable to find the related bucket files.")
+      }
+
+      // Remove the duplicate columns in bucketCols and sortCols;
+      // Otherwise, we got analysis errors due to duplicate names
+      val selectedColumns = (bucketCols ++ sortCols).distinct
+      // We may lose the type information after write(e.g. json format doesn't 
keep schema
+      // information), here we get the types from the original dataframe.
+      val types = inputDF.select(selectedColumns.map(col): 
_*).schema.map(_.dataType)
+      val columns = selectedColumns.zip(types).map { case (colName, dt) => 
col(colName).cast(dt) }
+
+      // Read the bucket file into a dataframe, so that it's easier to test.
+      val readBack = spark.read
+        .format(source)
+        .load(bucketFile.getAbsolutePath)
+        .select(columns: _*)
+
+      // If we specified sort columns while writing bucket table, make sure 
the data in this
+      // bucket file is already sorted.
+      if (sortCols.nonEmpty) {
+        checkAnswer(readBack.sort(sortCols.map(col): _*), readBack.collect())
+      }
+
+      // Go through all rows in this bucket file, calculate bucket id 
according to bucket column
+      // values, and make sure it equals to the expected bucket id that 
inferred from file name.
+      val qe = readBack.select(bucketCols.map(col): _*).queryExecution
+      val rows = qe.toRdd.map(_.copy()).collect()
+      val getBucketId = UnsafeProjection.create(
+        bucketIdExpression(qe.analyzed.output, numBuckets) :: Nil,
+        qe.analyzed.output)
+
+      for (row <- rows) {
+        val actualBucketId = getBucketId(row).getInt(0)
+        assert(actualBucketId == bucketId)
+      }
+    }
+  }
+
+  def bucketIdExpression(expressions: Seq[Expression], numBuckets: Int): 
Expression =
+    Pmod(BitwiseAnd(HiveHash(expressions), Literal(Int.MaxValue)), 
Literal(numBuckets))
+
+  def getBucketIdFromFileName(fileName: String): Option[Int] = {
+    val hiveBucketedFileName = """^(\d+)_0_.*$""".r
+    fileName match {
+      case hiveBucketedFileName(bucketId) => Some(bucketId.toInt)
+      case _ => None
+    }
+  }
+}
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 5932f4e5a7..11efdccfcf 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -20,6 +20,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql.{GlutenQueryTest, Row, SparkSession}
+import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
 import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
 import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
@@ -33,8 +34,14 @@ import org.apache.hadoop.fs.Path
 import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.hadoop.util.HadoopInputFile
 
-class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
+import java.io.File
+
+class VeloxParquetWriteForHiveSuite
+  extends GlutenQueryTest
+  with SQLTestUtils
+  with BucketWriteUtils {
   private var _spark: SparkSession = _
+  import testImplicits._
 
   override protected def beforeAll(): Unit = {
     super.beforeAll()
@@ -222,4 +229,105 @@ class VeloxParquetWriteForHiveSuite extends 
GlutenQueryTest with SQLTestUtils {
         }
     }
   }
+
+  test("Native writer support compatible hive bucket write with dynamic 
partition") {
+    if (isSparkVersionGE("3.4")) {
+      Seq("true", "false").foreach {
+        enableConvertMetastore =>
+          withSQLConf("spark.sql.hive.convertMetastoreParquet" -> 
enableConvertMetastore) {
+            val source = "hive_source_table"
+            val target = "hive_bucketed_table"
+            withTable(source, target) {
+              sql(s"""
+                     |CREATE TABLE IF NOT EXISTS $target (i int, j string)
+                     |PARTITIONED BY(k string)
+                     |CLUSTERED BY (i, j) SORTED BY (i) INTO 8 BUCKETS
+                     |STORED AS PARQUET
+               """.stripMargin)
+
+              val df =
+                (0 until 50).map(i => (i % 13, i.toString, i % 5)).toDF("i", 
"j", "k")
+              df.write.mode(SaveMode.Overwrite).saveAsTable(source)
+
+              withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+                checkNativeWrite(s"INSERT INTO $target SELECT * FROM $source", 
checkNative = true)
+              }
+
+              for (k <- 0 until 5) {
+                testBucketing(
+                  new File(tableDir(target), s"k=$k"),
+                  "parquet",
+                  8,
+                  Seq("i", "j"),
+                  Seq("i"),
+                  df,
+                  bucketIdExpression,
+                  getBucketIdFromFileName)
+              }
+            }
+          }
+      }
+    }
+  }
+
+  test("bucket writer with non-dynamic partition should fallback") {
+    if (isSparkVersionGE("3.4")) {
+      Seq("true", "false").foreach {
+        enableConvertMetastore =>
+          withSQLConf("spark.sql.hive.convertMetastoreParquet" -> 
enableConvertMetastore) {
+            val source = "hive_source_table"
+            val target = "hive_bucketed_table"
+            withTable(source, target) {
+              sql(s"""
+                     |CREATE TABLE IF NOT EXISTS $target (i int, j string)
+                     |PARTITIONED BY(k string)
+                     |CLUSTERED BY (i, j) SORTED BY (i) INTO 8 BUCKETS
+                     |STORED AS PARQUET
+               """.stripMargin)
+
+              val df =
+                (0 until 50).map(i => (i % 13, i.toString, i % 5)).toDF("i", 
"j", "k")
+              df.write.mode(SaveMode.Overwrite).saveAsTable(source)
+
+              // hive relation convert always use dynamic, so it will offload 
to native.
+              checkNativeWrite(
+                s"INSERT INTO $target PARTITION(k='0') SELECT i, j FROM 
$source",
+                checkNative = enableConvertMetastore.toBoolean)
+              val files = tableDir(target)
+                .listFiles()
+                .filterNot(f => f.getName.startsWith(".") || 
f.getName.startsWith("_"))
+              assert(files.length == 1 && files.head.getName.contains("k=0"))
+              checkAnswer(spark.table(target).select("i", "j"), df.select("i", 
"j"))
+            }
+          }
+      }
+    }
+  }
+
+  test("bucket writer with non-partition table should fallback") {
+    if (isSparkVersionGE("3.4")) {
+      Seq("true", "false").foreach {
+        enableConvertMetastore =>
+          withSQLConf("spark.sql.hive.convertMetastoreParquet" -> 
enableConvertMetastore) {
+            val source = "hive_source_table"
+            val target = "hive_bucketed_table"
+            withTable(source, target) {
+              sql(s"""
+                     |CREATE TABLE IF NOT EXISTS $target (i int, j string)
+                     |CLUSTERED BY (i, j) SORTED BY (i) INTO 8 BUCKETS
+                     |STORED AS PARQUET
+               """.stripMargin)
+
+              val df =
+                (0 until 50).map(i => (i % 13, i.toString)).toDF("i", "j")
+              df.write.mode(SaveMode.Overwrite).saveAsTable(source)
+
+              checkNativeWrite(s"INSERT INTO $target SELECT i, j FROM 
$source", checkNative = false)
+
+              checkAnswer(spark.table(target), df)
+            }
+          }
+      }
+    }
+  }
 }
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index dab9837936..e01d2d8985 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -514,11 +514,12 @@ std::shared_ptr<connector::hive::LocationHandle> 
makeLocationHandle(
     const std::string& targetDirectory,
     dwio::common::FileFormat fileFormat,
     common::CompressionKind compression,
+    const bool& isBucketed,
     const std::optional<std::string>& writeDirectory = std::nullopt,
     const connector::hive::LocationHandle::TableType& tableType =
         connector::hive::LocationHandle::TableType::kExisting) {
   std::string targetFileName = "";
-  if (fileFormat == dwio::common::FileFormat::PARQUET) {
+  if (fileFormat == dwio::common::FileFormat::PARQUET && !isBucketed) {
     targetFileName = fmt::format("gluten-part-{}{}{}", makeUuid(), 
compressionFileNameSuffix(compression), ".parquet");
   }
   return std::make_shared<connector::hive::LocationHandle>(
@@ -607,6 +608,35 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
     }
   }
 
+  std::shared_ptr<connector::hive::HiveBucketProperty> bucketProperty = 
nullptr;
+  if (writeRel.has_bucket_spec()) {
+    const auto& bucketSpec = writeRel.bucket_spec();
+    const auto& numBuckets = bucketSpec.num_buckets();
+
+    std::vector<std::string> bucketedBy;
+    for (const auto& name : bucketSpec.bucket_column_names()) {
+      bucketedBy.emplace_back(name);
+    }
+
+    std::vector<TypePtr> bucketedTypes;
+    bucketedTypes.reserve(bucketedBy.size());
+    std::vector<TypePtr> tableColumnTypes = inputType->children();
+    for (const auto& name : bucketedBy) {
+      auto it = std::find(tableColumnNames.begin(), tableColumnNames.end(), 
name);
+      VELOX_CHECK(it != tableColumnNames.end(), "Invalid bucket {}", name);
+      std::size_t index = std::distance(tableColumnNames.begin(), it);
+      bucketedTypes.emplace_back(tableColumnTypes[index]);
+    }
+
+    std::vector<std::shared_ptr<const connector::hive::HiveSortingColumn>> 
sortedBy;
+    for (const auto& name : bucketSpec.sort_column_names()) {
+      
sortedBy.emplace_back(std::make_shared<connector::hive::HiveSortingColumn>(name,
 core::SortOrder{true, true}));
+    }
+
+    bucketProperty = std::make_shared<connector::hive::HiveBucketProperty>(
+        connector::hive::HiveBucketProperty::Kind::kHiveCompatible, 
numBuckets, bucketedBy, bucketedTypes, sortedBy);
+  }
+
   std::string writePath;
   if (writeFilesTempPath_.has_value()) {
     writePath = writeFilesTempPath_.value();
@@ -652,8 +682,8 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
               tableColumnNames, /*inputType->names() clolumn name is 
different*/
               inputType->children(),
               partitionedKey,
-              nullptr /*bucketProperty*/,
-              makeLocationHandle(writePath, fileFormat, compressionCodec),
+              bucketProperty,
+              makeLocationHandle(writePath, fileFormat, compressionCodec, 
bucketProperty != nullptr),
               fileFormat,
               compressionCodec)),
       (!partitionedKey.empty()),
diff --git a/docs/developers/SubstraitModifications.md 
b/docs/developers/SubstraitModifications.md
index 24a9c1a212..3db2b5869c 100644
--- a/docs/developers/SubstraitModifications.md
+++ b/docs/developers/SubstraitModifications.md
@@ -28,6 +28,7 @@ changed `Unbounded` in `WindowFunction` into 
`Unbounded_Preceding` and `Unbounde
 * Added `WriteRel` 
([#3690](https://github.com/apache/incubator-gluten/pull/3690)).
 * Added `TopNRel` 
([#5409](https://github.com/apache/incubator-gluten/pull/5409)).
 * Added `ref` field in window bound `Preceding` and `Following` 
([#5626](https://github.com/apache/incubator-gluten/pull/5626)).
+* Added `BucketSpec` field in 
`WriteRel`([#8386](https://github.com/apache/incubator-gluten/pull/8386))
 
 ## Modifications to type.proto
 
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/RelBuilder.java
 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/RelBuilder.java
index 7d19311808..86b2735318 100644
--- 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/RelBuilder.java
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/RelBuilder.java
@@ -25,10 +25,7 @@ import 
org.apache.gluten.substrait.extensions.AdvancedExtensionNode;
 import org.apache.gluten.substrait.type.ColumnTypeNode;
 import org.apache.gluten.substrait.type.TypeNode;
 
-import io.substrait.proto.CrossRel;
-import io.substrait.proto.JoinRel;
-import io.substrait.proto.SetRel;
-import io.substrait.proto.SortField;
+import io.substrait.proto.*;
 import org.apache.spark.sql.catalyst.expressions.Attribute;
 
 import java.util.List;
@@ -191,10 +188,11 @@ public class RelBuilder {
       List<String> names,
       List<ColumnTypeNode> columnTypeNodes,
       AdvancedExtensionNode extensionNode,
+      WriteRel.BucketSpec bucketSpec,
       SubstraitContext context,
       Long operatorId) {
     context.registerRelToOperator(operatorId);
-    return new WriteRelNode(input, types, names, columnTypeNodes, 
extensionNode);
+    return new WriteRelNode(input, types, names, columnTypeNodes, 
extensionNode, bucketSpec);
   }
 
   public static RelNode makeSortRel(
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/WriteRelNode.java
 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/WriteRelNode.java
index 45b4cd659e..74ffc8282c 100644
--- 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/WriteRelNode.java
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/WriteRelNode.java
@@ -21,10 +21,7 @@ import org.apache.gluten.substrait.type.ColumnTypeNode;
 import org.apache.gluten.substrait.type.TypeNode;
 import org.apache.gluten.utils.SubstraitUtil;
 
-import io.substrait.proto.NamedObjectWrite;
-import io.substrait.proto.NamedStruct;
-import io.substrait.proto.Rel;
-import io.substrait.proto.WriteRel;
+import io.substrait.proto.*;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -39,17 +36,21 @@ public class WriteRelNode implements RelNode, Serializable {
 
   private final AdvancedExtensionNode extensionNode;
 
+  private final WriteRel.BucketSpec bucketSpec;
+
   WriteRelNode(
       RelNode input,
       List<TypeNode> types,
       List<String> names,
       List<ColumnTypeNode> partitionColumnTypeNodes,
-      AdvancedExtensionNode extensionNode) {
+      AdvancedExtensionNode extensionNode,
+      WriteRel.BucketSpec bucketSpec) {
     this.input = input;
     this.types.addAll(types);
     this.names.addAll(names);
     this.columnTypeNodes.addAll(partitionColumnTypeNodes);
     this.extensionNode = extensionNode;
+    this.bucketSpec = bucketSpec;
   }
 
   @Override
@@ -68,6 +69,10 @@ public class WriteRelNode implements RelNode, Serializable {
       nameObjectWriter.setAdvancedExtension(extensionNode.toProtobuf());
     }
 
+    if (bucketSpec != null) {
+      writeBuilder.setBucketSpec(bucketSpec);
+    }
+
     writeBuilder.setNamedTable(nameObjectWriter);
 
     if (input != null) {
diff --git 
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto 
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
index 0abb50b323..ca669c7639 100644
--- 
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
+++ 
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -608,6 +608,9 @@ message WriteRel {
   // Output mode determines what is the output of executing this rel
   OutputMode output = 6;
 
+  // The bucket spec for the writer.
+  BucketSpec bucket_spec = 7;
+
   enum WriteOp {
     WRITE_OP_UNSPECIFIED = 0;
     // The insert of new tuples in a table
@@ -631,6 +634,13 @@ message WriteRel {
     // subplans in the body of the Rel input) and return those with anounter 
PlanRel.relations.
     OUTPUT_MODE_MODIFIED_TUPLES = 2;
   }
+
+  // A container for bucketing information.
+  message BucketSpec {
+    int32 num_buckets = 1;
+    repeated string bucket_column_names = 2;
+    repeated string sort_column_names = 3;
+  }
 }
 
 // The hash equijoin join operator will build a hash table out of the right 
input based on a set of join keys.
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 0c220ada64..04aa7fe7ca 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -52,6 +52,7 @@ trait BackendSettingsApi {
       format: FileFormat,
       fields: Array[StructField],
       bucketSpec: Option[BucketSpec],
+      isPartitionedTable: Boolean,
       options: Map[String, String]): ValidationResult = 
ValidationResult.succeeded
 
   def supportNativeWrite(fields: Array[StructField]): Boolean = true
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
index 1a95a96a55..3c1857236b 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{ArrayType, MapType, MetadataBuilder}
 
-import io.substrait.proto.NamedStruct
+import io.substrait.proto.{NamedStruct, WriteRel}
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import java.util.Locale
@@ -107,12 +107,23 @@ case class WriteFilesExecTransformer(
       ExtensionBuilder.makeAdvancedExtension(
         SubstraitUtil.createEnhancement(originalInputAttributes))
     }
+
+    val bucketSpecOption = bucketSpec.map {
+      bucketSpec =>
+        val builder = WriteRel.BucketSpec.newBuilder()
+        builder.setNumBuckets(bucketSpec.numBuckets)
+        bucketSpec.bucketColumnNames.foreach(builder.addBucketColumnNames)
+        bucketSpec.sortColumnNames.foreach(builder.addSortColumnNames)
+        builder.build()
+    }
+
     RelBuilder.makeWriteRel(
       input,
       typeNodes,
       nameList,
       columnTypeNodes,
       extensionNode,
+      bucketSpecOption.orNull,
       context,
       operatorId)
   }
@@ -147,11 +158,13 @@ case class WriteFilesExecTransformer(
           "complex data type with constant")
     }
 
+    val childOutput = this.child.output.map(_.exprId)
     val validationResult =
       BackendsApiManager.getSettings.supportWriteFilesExec(
         fileFormat,
         finalChildOutput.toStructType.fields,
         bucketSpec,
+        partitionColumns.exists(c => childOutput.contains(c.exprId)),
         caseInsensitiveOptions)
     if (!validationResult.ok()) {
       return ValidationResult.failed("Unsupported native write: " + 
validationResult.reason())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to