baibaichen commented on code in PR #8570:
URL: https://github.com/apache/incubator-gluten/pull/8570#discussion_r1929919837


##########
backends-clickhouse/src-kafka/main/scala/org/apache/gluten/component/CHKafkaComponent.scala:
##########


Review Comment:
   Is this class related to this PR?



##########
backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala:
##########
@@ -270,15 +270,15 @@ class ClickhouseOptimisticTransaction(
       // TODO: DeltaOptimizedWriterExec

Review Comment:
   remove TODO



##########
backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala:
##########
@@ -270,15 +270,15 @@ class ClickhouseOptimisticTransaction(
       // TODO: DeltaOptimizedWriterExec
       // No need to plan optimized write if the write command is OPTIMIZE, 
which aims to produce
       // evenly-balanced data files already.
-       val physicalPlan =
-         if (
-           !isOptimize &&
-           shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
-         ) {
-           DeltaOptimizedWriterExec(checkInvariants, 
metadata.partitionColumns, deltaLog)
-         } else {
-           checkInvariants
-         }
+      val physicalPlan =
+        if (
+          !isOptimize &&
+          shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
+        ) {
+          DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, 
deltaLog)

Review Comment:
   I think it is better to not chagne unrelated codes



##########
backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala:
##########
@@ -340,14 +340,14 @@ class ClickhouseOptimisticTransaction(
 
     var resultFiles =
       (if (optionalStatsTracker.isDefined) {
-        committer.addedStatuses.map { a =>
-          a.copy(stats = optionalStatsTracker.map(
-            _.recordedStats(a.toPath.getName)).getOrElse(a.stats))
-        }
-      }
-      else {
-        committer.addedStatuses
-      })
+         committer.addedStatuses.map {

Review Comment:
   I think it is better to not chagne unrelated codes



##########
gluten-delta/src-delta-32/main/scala/org/apache/spark/sql/perf/DeltaOptimizedWriterTransformer.scala:
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.perf
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark._
+import org.apache.spark.internal.config
+import org.apache.spark.internal.config.ConfigEntry
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.SerializerManager
+import org.apache.spark.shuffle.{FetchFailedException, ShuffleReader, 
ShuffleReadMetricsReporter}
+import org.apache.spark.shuffle.sort.{ColumnarShuffleHandle, 
ColumnarShuffleManager}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog}
+import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.perf.{DeltaOptimizedWriterExec, 
OptimizedWriterBlocks}
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.util.BinPackingUtils
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.storage.{BlockId, BlockManagerId, 
ShuffleBlockFetcherIterator, ShuffleBlockId}
+import org.apache.spark.util.ThreadUtils
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+/**
+ * An execution node which shuffles data to a target output of 
`DELTA_OPTIMIZE_WRITE_SHUFFLE_BLOCKS`
+ * blocks, hash partitioned on the table partition columns. We group all 
blocks by their
+ * reducer_id's and bin-pack into `DELTA_OPTIMIZE_WRITE_BIN_SIZE` bins. Then 
we launch a Spark task
+ * per bin to write out a single file for each bin.
+ *
+ * @param child
+ *   The execution plan
+ * @param partitionColumns
+ *   The partition columns of the table. Used for hash partitioning the write
+ * @param deltaLog
+ *   The DeltaLog for the table. Used for logging only
+ */
+
+case class DeltaOptimizedWriterTransformer(
+    child: SparkPlan,
+    partitionColumns: Seq[String],
+    @transient deltaLog: DeltaLog
+) extends UnaryExecNode
+  with GlutenPlan
+  with DeltaLogging {
+
+  lazy val useSortBasedShuffle: Boolean =
+    
BackendsApiManager.getSparkPlanExecApiInstance.useSortBasedShuffle(outputPartitioning,
 output)
+
+  override def output: Seq[Attribute] = child.output
+
+  private lazy val readMetrics =
+    
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+  private[sql] lazy val writeMetrics =
+    SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+  // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
+  @transient override lazy val metrics =
+    BackendsApiManager.getMetricsApiInstance
+      .genColumnarShuffleExchangeMetrics(
+        sparkContext,
+        useSortBasedShuffle) ++ readMetrics ++ writeMetrics
+
+  @transient lazy val inputColumnarRDD: RDD[ColumnarBatch] = 
child.executeColumnar()
+
+  private lazy val childNumPartitions = inputColumnarRDD.getNumPartitions
+
+  private lazy val numPartitions: Int = {
+    if (childNumPartitions > 0) {
+      val targetShuffleBlocks = 
getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_SHUFFLE_BLOCKS)
+      math.min(
+        math.max(targetShuffleBlocks / childNumPartitions, 1),
+        getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS))
+    } else {
+      childNumPartitions
+    }
+  }
+
+  @transient private var cachedShuffleRDD: ShuffledColumnarBatchRDD = _
+
+  @transient override def outputPartitioning: Partitioning = {
+    val resolver = 
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution

Review Comment:
   109 ~ 116 duplicated with  124 ~131



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to