sunchao commented on code in PR #30:
URL:
https://github.com/apache/arrow-datafusion-comet/pull/30#discussion_r1492750003
##########
common/src/main/scala/org/apache/comet/CometConf.scala:
##########
@@ -119,6 +119,110 @@ object CometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
+ s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
+ .doc("Whether to enable Comet native shuffle. By default, this config is
false. " +
+ "Note that this requires setting 'spark.shuffle.manager' to" +
Review Comment:
nit: space at the end
##########
common/src/main/scala/org/apache/comet/CometConf.scala:
##########
@@ -119,6 +119,110 @@ object CometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
+ s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
+ .doc("Whether to enable Comet native shuffle. By default, this config is
false. " +
+ "Note that this requires setting 'spark.shuffle.manager' to" +
+ "'org.apache.spark.sql.comet.execution.CometShuffleManager'.
'spark.shuffle.manager' must " +
+ "be set before starting the Spark application and cannot be changed
during the application")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COMET_COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
+ "spark.comet.columnar.shuffle.enabled")
+ .doc(
+ "Force Comet to only use Arrow-based shuffle for CometScan and Spark
regular operators. " +
Review Comment:
ditto: "columnar shuffle" instead of "Arrow-based shuffle" to be consistent
with other places.
##########
common/src/main/scala/org/apache/comet/CometConf.scala:
##########
@@ -119,6 +119,110 @@ object CometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
+ s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
+ .doc("Whether to enable Comet native shuffle. By default, this config is
false. " +
+ "Note that this requires setting 'spark.shuffle.manager' to" +
+ "'org.apache.spark.sql.comet.execution.CometShuffleManager'.
'spark.shuffle.manager' must " +
Review Comment:
I think this should be
`org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager`
##########
common/src/main/scala/org/apache/comet/CometConf.scala:
##########
@@ -119,6 +119,110 @@ object CometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
+ s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
+ .doc("Whether to enable Comet native shuffle. By default, this config is
false. " +
+ "Note that this requires setting 'spark.shuffle.manager' to" +
+ "'org.apache.spark.sql.comet.execution.CometShuffleManager'.
'spark.shuffle.manager' must " +
+ "be set before starting the Spark application and cannot be changed
during the application")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COMET_COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
+ "spark.comet.columnar.shuffle.enabled")
+ .doc(
+ "Force Comet to only use Arrow-based shuffle for CometScan and Spark
regular operators. " +
+ "If this is enabled, Comet native shuffle will not be enabled but only
Arrow shuffle. " +
+ "By default, this config is false.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf(
+ s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec")
+ .doc(
+ "The codec of Comet native shuffle used to compress shuffle data. Only
zstd is supported.")
+ .stringConf
+ .createWithDefault("zstd")
+
+ val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] = conf(
+ "spark.comet.columnar.shuffle.async.enabled")
+ .doc(
+ "Whether to enable asynchronous shuffle for Arrow-based shuffle. By
default, this config " +
+ "is false.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
+ conf("spark.comet.columnar.shuffle.async.thread.num")
+ .doc("Number of threads used for Comet async columnar shuffle per
shuffle task. " +
+ "By default, this config is 3. Note that more threads means more
memory requirement to " +
+ "buffer shuffle data before flushing to disk. Also, more threads may
not always " +
+ "improve performance, and should be set based on the number of cores
available.")
+ .intConf
+ .createWithDefault(3)
+
+ val COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
+ conf("spark.comet.columnar.shuffle.async.max.thread.num")
+ .doc("Maximum number of threads on an executor used for Comet async
columnar shuffle. " +
+ "By default, this config is 100. This is the upper bound of total
number of shuffle " +
+ "threads per executor. In other words, if the number of cores * the
number of shuffle " +
+ "threads per task `spark.comet.columnar.shuffle.async.thread.num` is
larger than " +
Review Comment:
There are two "than"s. I think this sentence is broken.
##########
common/src/main/scala/org/apache/comet/CometConf.scala:
##########
@@ -119,6 +119,110 @@ object CometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
+ s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
+ .doc("Whether to enable Comet native shuffle. By default, this config is
false. " +
+ "Note that this requires setting 'spark.shuffle.manager' to" +
+ "'org.apache.spark.sql.comet.execution.CometShuffleManager'.
'spark.shuffle.manager' must " +
+ "be set before starting the Spark application and cannot be changed
during the application")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COMET_COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
+ "spark.comet.columnar.shuffle.enabled")
+ .doc(
+ "Force Comet to only use Arrow-based shuffle for CometScan and Spark
regular operators. " +
+ "If this is enabled, Comet native shuffle will not be enabled but only
Arrow shuffle. " +
+ "By default, this config is false.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf(
+ s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec")
+ .doc(
+ "The codec of Comet native shuffle used to compress shuffle data. Only
zstd is supported.")
+ .stringConf
+ .createWithDefault("zstd")
+
+ val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] = conf(
+ "spark.comet.columnar.shuffle.async.enabled")
+ .doc(
+ "Whether to enable asynchronous shuffle for Arrow-based shuffle. By
default, this config " +
Review Comment:
nit: maybe change "Arrow-based shuffle" to "columnar shuffle"?
##########
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala:
##########
@@ -125,6 +128,30 @@ class CometSparkSessionExtensions
}
case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
+ private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
+ plan.transformUp {
+ case s: ShuffleExchangeExec
+ if isCometPlan(s.child) && !isCometColumnarShuffleEnabled(conf) &&
+ QueryPlanSerde.supportPartitioning(s.child.output,
s.outputPartitioning) =>
+ logInfo("Comet extension enabled for Native Shuffle")
+
+ // Switch to use Decimal128 regardless of precision, since Arrow
native execution
+ // doesn't support Decimal32 and Decimal64 yet.
+ conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
+ CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
+
+ // Arrow shuffle for regular Spark operators (not Comet) and Comet
operators (if configured)
Review Comment:
ditto
##########
spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.comet
+
+import scala.collection.JavaConverters.asJavaIterableConverter
+
+import org.apache.spark.rdd.{ParallelCollectionRDD, RDD}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression,
SortOrder}
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD,
CometShuffleExchangeExec}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode,
UnsafeRowSerializer}
+import org.apache.spark.sql.execution.metric.{SQLMetrics,
SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import org.apache.comet.serde.OperatorOuterClass
+import org.apache.comet.serde.OperatorOuterClass.Operator
+import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType}
+
+/**
+ * Comet physical plan node for Spark `TakeOrderedAndProjectExec`.
+ *
+ * It is used to execute a `TakeOrderedAndProjectExec` physical operator by
using Comet native
+ * engine. It is not like other physical plan nodes which are wrapped by
`CometExec`, because it
+ * contains two native executions separated by a Comet shuffle exchange.
+ */
+case class CometTakeOrderedAndProjectExec(
Review Comment:
Looks unrelated 😂 ?
##########
common/src/main/scala/org/apache/comet/CometConf.scala:
##########
@@ -119,6 +119,110 @@ object CometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
+ s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
+ .doc("Whether to enable Comet native shuffle. By default, this config is
false. " +
+ "Note that this requires setting 'spark.shuffle.manager' to" +
+ "'org.apache.spark.sql.comet.execution.CometShuffleManager'.
'spark.shuffle.manager' must " +
+ "be set before starting the Spark application and cannot be changed
during the application")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COMET_COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
+ "spark.comet.columnar.shuffle.enabled")
+ .doc(
+ "Force Comet to only use Arrow-based shuffle for CometScan and Spark
regular operators. " +
+ "If this is enabled, Comet native shuffle will not be enabled but only
Arrow shuffle. " +
+ "By default, this config is false.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf(
+ s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec")
+ .doc(
+ "The codec of Comet native shuffle used to compress shuffle data. Only
zstd is supported.")
+ .stringConf
+ .createWithDefault("zstd")
+
+ val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] = conf(
+ "spark.comet.columnar.shuffle.async.enabled")
+ .doc(
+ "Whether to enable asynchronous shuffle for Arrow-based shuffle. By
default, this config " +
+ "is false.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
+ conf("spark.comet.columnar.shuffle.async.thread.num")
+ .doc("Number of threads used for Comet async columnar shuffle per
shuffle task. " +
+ "By default, this config is 3. Note that more threads means more
memory requirement to " +
+ "buffer shuffle data before flushing to disk. Also, more threads may
not always " +
+ "improve performance, and should be set based on the number of cores
available.")
+ .intConf
+ .createWithDefault(3)
+
+ val COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
+ conf("spark.comet.columnar.shuffle.async.max.thread.num")
+ .doc("Maximum number of threads on an executor used for Comet async
columnar shuffle. " +
+ "By default, this config is 100. This is the upper bound of total
number of shuffle " +
+ "threads per executor. In other words, if the number of cores * the
number of shuffle " +
+ "threads per task `spark.comet.columnar.shuffle.async.thread.num` is
larger than " +
+ "than this config. Comet will use this config as the number of shuffle
threads per " +
+ "executor instead.")
+ .intConf
+ .createWithDefault(100)
+ }
+
+ val COMET_EXEC_SHUFFLE_SPILL_THRESHOLD: ConfigEntry[Int] =
+ conf("spark.comet.columnar.shuffle.spill.threshold")
+ .doc(
+ "Number of rows to be spilled used for Comet columnar shuffle. " +
Review Comment:
I think we can move this to the same line above with ".doc" but very small
nit
##########
common/src/main/scala/org/apache/comet/CometConf.scala:
##########
@@ -119,6 +119,110 @@ object CometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
+ s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
+ .doc("Whether to enable Comet native shuffle. By default, this config is
false. " +
+ "Note that this requires setting 'spark.shuffle.manager' to" +
+ "'org.apache.spark.sql.comet.execution.CometShuffleManager'.
'spark.shuffle.manager' must " +
+ "be set before starting the Spark application and cannot be changed
during the application")
Review Comment:
nit: dot at the end.
##########
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala:
##########
@@ -288,14 +315,67 @@ class CometSparkSessionExtensions
c
}
+ case s: TakeOrderedAndProjectExec
+ if isCometNative(s.child) && isCometOperatorEnabled(conf,
"takeOrderedAndProjectExec")
+ && isCometShuffleEnabled(conf) &&
+ CometTakeOrderedAndProjectExec.isSupported(s.projectList,
s.sortOrder, s.child) =>
+ // TODO: support offset for Spark 3.4
+ QueryPlanSerde.operator2Proto(s) match {
+ case Some(nativeOp) =>
+ val cometOp =
+ CometTakeOrderedAndProjectExec(s, s.limit, s.sortOrder,
s.projectList, s.child)
+ CometSinkPlaceHolder(nativeOp, s, cometOp)
+ case None =>
+ s
+ }
+
case u: UnionExec
if isCometOperatorEnabled(conf, "union") &&
u.children.forall(isCometNative) =>
QueryPlanSerde.operator2Proto(u) match {
case Some(nativeOp) =>
val cometOp = CometUnionExec(u, u.children)
CometSinkPlaceHolder(nativeOp, u, cometOp)
- case None => u
+ }
+
+ // Native shuffle for Comet operators
+ case s: ShuffleExchangeExec
+ if isCometShuffleEnabled(conf) &&
+ !isCometColumnarShuffleEnabled(conf) &&
+ QueryPlanSerde.supportPartitioning(s.child.output,
s.outputPartitioning) =>
+ logInfo("Comet extension enabled for Native Shuffle")
+
+ val newOp = transform1(s)
+ newOp match {
+ case Some(nativeOp) =>
+ // Switch to use Decimal128 regardless of precision, since Arrow
native execution
+ // doesn't support Decimal32 and Decimal64 yet.
+ conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
+ val cometOp = CometShuffleExchangeExec(s, shuffleType =
CometNativeShuffle)
+ CometSinkPlaceHolder(nativeOp, s, cometOp)
+ case None =>
+ s
+ }
+
+ // Arrow shuffle for regular Spark operators (not Comet) and Comet
operators
Review Comment:
"Arrow shuffle" -> "Columnar shuffle"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]