kazuyukitanimura opened a new issue, #1268:
URL: https://github.com/apache/datafusion-comet/issues/1268
### Describe the bug
The attached test is taken from `WriteDistributionAndOrderingSuite` Spark
test `ordered distribution and sort with same exprs: append`
Looks like Comet shuffle read size is reported much larger than Spark
shuffle that causes more partitions
### Steps to reproduce
package org.apache.spark.sql
import java.sql.Date
import java.util.Collections
import org.scalactic.source.Position
import org.scalatest.Tag
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryCatalog}
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import org.apache.spark.sql.connector.distributions.Distributions
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.connector.expressions.LogicalExpressions.sort
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, IntegerType, StringType,
StructType}
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.comet.CometConf
class CSuite extends CometTestBase {
import testImplicits._
override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
pos: Position): Unit = {
super.test(testName, testTags: _*) {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
testFun
}
}
}
test("a") {
def catalog: InMemoryCatalog = {
spark.conf.set("spark.sql.catalog.testcat",
classOf[InMemoryCatalog].getName)
val catalog = spark.sessionState.catalogManager.catalog("testcat")
catalog.asTableCatalog.asInstanceOf[InMemoryCatalog]
}
val namespace = Array("ns1")
val ident = Identifier.of(namespace, "test_table")
val tableNameAsString = "testcat." + ident.toString
val emptyProps = Collections.emptyMap[String, String]
val schema = new StructType()
.add("id", IntegerType)
.add("data", StringType)
.add("day", DateType)
val tableOrdering = Array[SortOrder](
sort(FieldReference("data"), SortDirection.ASCENDING,
NullOrdering.NULLS_FIRST))
val tableDistribution = Distributions.ordered(tableOrdering)
val writeTransform: DataFrame => DataFrame = df => df
catalog.createTable(
ident = ident,
schema = schema,
partitions = Array.empty,
properties = emptyProps,
distribution = tableDistribution,
ordering = tableOrdering,
requiredNumPartitions = None,
advisoryPartitionSize = Some(1000),
distributionStrictlyRequired = true)
val df =
spark.sparkContext
.parallelize(
(1 to 10).map { i =>
(if (i > 4) 5 else i, i.toString, Date.valueOf(s"${2020 +
i}-$i-$i"))
},
3)
.toDF("id", "data", "day")
val writer = writeTransform(df).writeTo(tableNameAsString)
def execute(writeFunc: => Unit): SparkPlan = {
var executedPlan: SparkPlan = null
val listener = new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
executedPlan = qe.executedPlan
}
override def onFailure(
funcName: String,
qe: QueryExecution,
exception: Exception): Unit = {}
}
spark.listenerManager.register(listener)
writeFunc
sparkContext.listenerBus.waitUntilEmpty()
executedPlan match {
case w: V2TableWriteExec =>
stripAQEPlan(w.query)
case _ =>
fail("expected V2TableWriteExec")
}
}
def executeCommand(): SparkPlan = execute(writer.append())
// if the partition size is configured for the table, set the SQL conf
to something small
// so that the overriding behavior is tested
val defaultAdvisoryPartitionSize = "15"
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key ->
defaultAdvisoryPartitionSize,
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
val executedPlan = executeCommand()
val read = collect(executedPlan) { case r: AQEShuffleReadExec =>
r
}
assert(read.size == 1)
println(read.head.partitionSpecs)
assert(read.head.partitionSpecs.size == 1)
}
}
}
### Expected behavior
Spark shuffle partition specs
`ArrayBuffer(CoalescedPartitionSpec(0,5,Some(394)))`
Comet shuffle partion specs
`ArrayBuffer(CoalescedPartitionSpec(0,1,Some(890)),
CoalescedPartitionSpec(1,3,Some(890)), CoalescedPartitionSpec(3,4,Some(890)),
CoalescedPartitionSpec(4,5,Some(445)))`
### Additional context
May need Spark 3.5+ for the above test or backport
https://issues.apache.org/jira/browse/SPARK-42779
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]