This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new f8eb57a42 docs: Split configuration guide into different sections
(scan, exec, shuffle, etc) (#2568)
f8eb57a42 is described below
commit f8eb57a4213165060b5b9a1620dbc4782ae56d79
Author: Andy Grove <[email protected]>
AuthorDate: Tue Oct 14 17:08:47 2025 -0600
docs: Split configuration guide into different sections (scan, exec,
shuffle, etc) (#2568)
---
.../main/scala/org/apache/comet/CometConf.scala | 150 ++++++++---
docs/source/user-guide/latest/configs.md | 288 ++++++++++++++++++---
docs/source/user-guide/latest/expressions.md | 6 +-
.../main/scala/org/apache/comet/GenerateDocs.scala | 43 ++-
.../org/apache/comet/serde/QueryPlanSerde.scala | 28 +-
.../scala/org/apache/comet/serde/aggregates.scala | 41 ++-
.../apache/comet/exec/CometAggregateSuite.scala | 8 +-
7 files changed, 436 insertions(+), 128 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index c8915f29f..6474280ff 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -57,7 +57,16 @@ object CometConf extends ShimCometConf {
/** List of all configs that is used for generating documentation */
val allConfs = new ListBuffer[ConfigEntry[_]]
+ private val CATEGORY_SCAN = "scan"
+ private val CATEGORY_PARQUET = "parquet"
+ private val CATEGORY_EXEC = "exec"
+ private val CATEGORY_ENABLE_EXEC = "enable_exec"
+ private val CATEGORY_SHUFFLE = "shuffle"
+ private val CATEGORY_TUNING = "tuning"
+ private val CATEGORY_TESTING = "testing"
+
def register(conf: ConfigEntry[_]): Unit = {
+ assert(conf.category.nonEmpty, s"${conf.key} does not have a category
defined")
allConfs.append(conf)
}
@@ -70,6 +79,7 @@ object CometConf extends ShimCometConf {
val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression";
val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
+ .category(CATEGORY_EXEC)
.doc(
"Whether to enable Comet extension for Spark. When this is turned on,
Spark will use " +
"Comet to read Parquet data source. Note that to enable native
vectorized execution, " +
@@ -79,6 +89,7 @@ object CometConf extends ShimCometConf {
.createWithDefault(sys.env.getOrElse("ENABLE_COMET", "true").toBoolean)
val COMET_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.enabled")
+ .category(CATEGORY_SCAN)
.doc(
"Whether to enable native scans. When this is turned on, Spark will use
Comet to " +
"read supported data sources (currently only Parquet is supported
natively). Note " +
@@ -93,6 +104,7 @@ object CometConf extends ShimCometConf {
val SCAN_AUTO = "auto"
val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] =
conf("spark.comet.scan.impl")
+ .category(CATEGORY_SCAN)
.doc(
s"The implementation of Comet Native Scan to use. Available modes are
'$SCAN_NATIVE_COMET'," +
s"'$SCAN_NATIVE_DATAFUSION', and '$SCAN_NATIVE_ICEBERG_COMPAT'. " +
@@ -112,6 +124,7 @@ object CometConf extends ShimCometConf {
val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =
conf("spark.comet.parquet.respectFilterPushdown")
+ .category(CATEGORY_PARQUET)
.doc(
"Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config.
This needs to be " +
"respected when running the Spark SQL test suite but the default
setting " +
@@ -122,6 +135,7 @@ object CometConf extends ShimCometConf {
val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.parallel.io.enabled")
+ .category(CATEGORY_PARQUET)
.doc(
"Whether to enable Comet's parallel reader for Parquet files. The
parallel reader reads " +
"ranges of consecutive data in a file in parallel. It is faster for
large files and " +
@@ -131,6 +145,7 @@ object CometConf extends ShimCometConf {
val COMET_PARQUET_PARALLEL_IO_THREADS: ConfigEntry[Int] =
conf("spark.comet.parquet.read.parallel.io.thread-pool.size")
+ .category(CATEGORY_PARQUET)
.doc("The maximum number of parallel threads the parallel reader will
use in a single " +
"executor. For executors configured with a smaller number of cores,
use a smaller number.")
.intConf
@@ -138,6 +153,7 @@ object CometConf extends ShimCometConf {
val COMET_IO_MERGE_RANGES: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.io.mergeRanges")
+ .category(CATEGORY_PARQUET)
.doc(
"When enabled the parallel reader will try to merge ranges of data
that are separated " +
"by less than 'comet.parquet.read.io.mergeRanges.delta' bytes.
Longer continuous reads " +
@@ -147,14 +163,15 @@ object CometConf extends ShimCometConf {
val COMET_IO_MERGE_RANGES_DELTA: ConfigEntry[Int] =
conf("spark.comet.parquet.read.io.mergeRanges.delta")
- .doc(
- "The delta in bytes between consecutive read ranges below which the
parallel reader " +
- "will try to merge the ranges. The default is 8MB.")
+ .category(CATEGORY_PARQUET)
+ .doc("The delta in bytes between consecutive read ranges below which the
parallel reader " +
+ "will try to merge the ranges. The default is 8MB.")
.intConf
.createWithDefault(1 << 23) // 8 MB
val COMET_IO_ADJUST_READRANGE_SKEW: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.io.adjust.readRange.skew")
+ .category(CATEGORY_PARQUET)
.doc("In the parallel reader, if the read ranges submitted are skewed in
sizes, this " +
"option will cause the reader to break up larger read ranges into
smaller ranges to " +
"reduce the skew. This will result in a slightly larger number of
connections opened to " +
@@ -164,6 +181,7 @@ object CometConf extends ShimCometConf {
val COMET_CONVERT_FROM_PARQUET_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.convert.parquet.enabled")
+ .category(CATEGORY_SCAN)
.doc(
"When enabled, data from Spark (non-native) Parquet v1 and v2 scans
will be converted to " +
"Arrow format. Note that to enable native vectorized execution, both
this config and " +
@@ -173,6 +191,7 @@ object CometConf extends ShimCometConf {
val COMET_CONVERT_FROM_JSON_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.convert.json.enabled")
+ .category(CATEGORY_SCAN)
.doc(
"When enabled, data from Spark (non-native) JSON v1 and v2 scans will
be converted to " +
"Arrow format. Note that to enable native vectorized execution, both
this config and " +
@@ -182,6 +201,7 @@ object CometConf extends ShimCometConf {
val COMET_CONVERT_FROM_CSV_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.convert.csv.enabled")
+ .category(CATEGORY_SCAN)
.doc(
"When enabled, data from Spark (non-native) CSV v1 and v2 scans will
be converted to " +
"Arrow format. Note that to enable native vectorized execution, both
this config and " +
@@ -190,6 +210,7 @@ object CometConf extends ShimCometConf {
.createWithDefault(false)
val COMET_EXEC_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.enabled")
+ .category(CATEGORY_EXEC)
.doc(
"Whether to enable Comet native vectorized execution for Spark. This
controls whether " +
"Spark should convert operators into their Comet counterparts and
execute them in " +
@@ -235,23 +256,19 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED:
ConfigEntry[Boolean] =
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
+ .category(CATEGORY_ENABLE_EXEC)
.doc("Experimental support for Sort Merge Join with filter")
.booleanConf
.createWithDefault(false)
- val COMET_EXPR_STDDEV_ENABLED: ConfigEntry[Boolean] =
- createExecEnabledConfig(
- "stddev",
- defaultValue = true,
- notes = Some("stddev is slower than Spark's implementation"))
-
val COMET_TRACING_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.tracing.enabled")
+ .category(CATEGORY_TUNING)
.doc(s"Enable fine-grained tracing of events and memory usage.
$TRACING_GUIDE.")
- .internal()
.booleanConf
.createWithDefault(false)
val COMET_MEMORY_OVERHEAD: OptionalConfigEntry[Long] =
conf("spark.comet.memoryOverhead")
+ .category(CATEGORY_TESTING)
.doc(
"The amount of additional memory to be allocated per executor process
for Comet, in MiB, " +
"when running Spark in on-heap mode. " +
@@ -263,6 +280,7 @@ object CometConf extends ShimCometConf {
val COMET_MEMORY_OVERHEAD_FACTOR: ConfigEntry[Double] =
conf("spark.comet.memory.overhead.factor")
+ .category(CATEGORY_TESTING)
.doc(
"Fraction of executor memory to be allocated as additional memory for
Comet " +
"when running Spark in on-heap mode. " +
@@ -275,6 +293,7 @@ object CometConf extends ShimCometConf {
.createWithDefault(0.2)
val COMET_MEMORY_OVERHEAD_MIN_MIB: ConfigEntry[Long] =
conf("spark.comet.memory.overhead.min")
+ .category(CATEGORY_TESTING)
.doc("Minimum amount of additional memory to be allocated per executor
process for Comet, " +
s"in MiB, when running Spark in on-heap mode. $TUNING_GUIDE.")
.internal()
@@ -286,6 +305,7 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
+ .category(CATEGORY_SHUFFLE)
.doc(
"Whether to enable Comet native shuffle. " +
"Note that this requires setting 'spark.shuffle.manager' to " +
@@ -296,6 +316,7 @@ object CometConf extends ShimCometConf {
.createWithDefault(true)
val COMET_SHUFFLE_MODE: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.mode")
+ .category(CATEGORY_SHUFFLE)
.doc(
"This is test config to allow tests to force a particular shuffle
implementation to be " +
"used. Valid values are `jvm` for Columnar Shuffle, `native` for
Native Shuffle, " +
@@ -308,6 +329,7 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
+ .category(CATEGORY_EXEC)
.doc(
"Whether to force enabling broadcasting for Comet native operators. " +
"Comet broadcast feature will be enabled automatically by " +
@@ -319,6 +341,7 @@ object CometConf extends ShimCometConf {
val COMET_REPLACE_SMJ: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin")
+ .category(CATEGORY_EXEC)
.doc("Experimental feature to force Spark to replace SortMergeJoin with
ShuffledHashJoin " +
s"for improved performance. This feature is not stable yet.
$TUNING_GUIDE.")
.booleanConf
@@ -326,18 +349,21 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.native.shuffle.partitioning.hash.enabled")
+ .category(CATEGORY_SHUFFLE)
.doc("Whether to enable hash partitioning for Comet native shuffle.")
.booleanConf
.createWithDefault(true)
val COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED: ConfigEntry[Boolean]
=
conf("spark.comet.native.shuffle.partitioning.range.enabled")
+ .category(CATEGORY_SHUFFLE)
.doc("Whether to enable range partitioning for Comet native shuffle.")
.booleanConf
.createWithDefault(true)
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
+ .category(CATEGORY_SHUFFLE)
.doc(
"The codec of Comet native shuffle used to compress shuffle data. lz4,
zstd, and " +
"snappy are supported. Compression can be disabled by setting " +
@@ -348,18 +374,21 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.zstd.level")
+ .category(CATEGORY_SHUFFLE)
.doc("The compression level to use when compressing shuffle files with
zstd.")
.intConf
.createWithDefault(1)
val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.columnar.shuffle.async.enabled")
+ .category(CATEGORY_SHUFFLE)
.doc("Whether to enable asynchronous shuffle for Arrow-based shuffle.")
.booleanConf
.createWithDefault(false)
val COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.async.thread.num")
+ .category(CATEGORY_SHUFFLE)
.doc(
"Number of threads used for Comet async columnar shuffle per shuffle
task. " +
"Note that more threads means more memory requirement to " +
@@ -370,6 +399,7 @@ object CometConf extends ShimCometConf {
val COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
conf("spark.comet.columnar.shuffle.async.max.thread.num")
+ .category(CATEGORY_SHUFFLE)
.doc("Maximum number of threads on an executor used for Comet async
columnar shuffle. " +
"This is the upper bound of total number of shuffle " +
"threads per executor. In other words, if the number of cores * the
number of shuffle " +
@@ -382,6 +412,7 @@ object CometConf extends ShimCometConf {
val COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.spill.threshold")
+ .category(CATEGORY_SHUFFLE)
.doc(
"Number of rows to be spilled used for Comet columnar shuffle. " +
"For every configured number of rows, a new spill file will be
created. " +
@@ -396,6 +427,7 @@ object CometConf extends ShimCometConf {
val COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE: OptionalConfigEntry[Long] =
conf("spark.comet.columnar.shuffle.memorySize")
.internal()
+ .category(CATEGORY_TESTING)
.doc("Amount of memory to reserve for columnar shuffle when running in
on-heap mode. " +
s"$TUNING_GUIDE.")
.bytesConf(ByteUnit.MiB)
@@ -404,6 +436,7 @@ object CometConf extends ShimCometConf {
val COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
conf("spark.comet.columnar.shuffle.memory.factor")
.internal()
+ .category(CATEGORY_TESTING)
.doc("Fraction of Comet memory to be allocated per executor process for
columnar shuffle " +
s"when running in on-heap mode. $TUNING_GUIDE.")
.doubleConf
@@ -414,7 +447,7 @@ object CometConf extends ShimCometConf {
val COMET_COLUMNAR_SHUFFLE_BATCH_SIZE: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.batch.size")
- .internal()
+ .category(CATEGORY_SHUFFLE)
.doc("Batch size when writing out sorted spill files on the native side.
Note that " +
"this should not be larger than batch size (i.e.,
`spark.comet.batchSize`). Otherwise " +
"it will produce larger batches than expected in the native operator
after shuffle.")
@@ -423,6 +456,7 @@ object CometConf extends ShimCometConf {
val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf(
"spark.comet.shuffle.preferDictionary.ratio")
+ .category(CATEGORY_SHUFFLE)
.doc(
"The ratio of total values to distinct values in a string column to
decide whether to " +
"prefer dictionary encoding when shuffling the column. If the ratio is
higher than " +
@@ -434,6 +468,7 @@ object CometConf extends ShimCometConf {
val COMET_EXCHANGE_SIZE_MULTIPLIER: ConfigEntry[Double] = conf(
"spark.comet.shuffle.sizeInBytesMultiplier")
+ .category(CATEGORY_SHUFFLE)
.doc(
"Comet reports smaller sizes for shuffle due to using Arrow's columnar
memory format " +
"and this can result in Spark choosing a different join strategy due
to the estimated " +
@@ -444,12 +479,14 @@ object CometConf extends ShimCometConf {
val COMET_DPP_FALLBACK_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.dppFallback.enabled")
+ .category(CATEGORY_EXEC)
.doc("Whether to fall back to Spark for queries that use DPP.")
.booleanConf
.createWithDefault(true)
val COMET_DEBUG_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.debug.enabled")
+ .category(CATEGORY_EXEC)
.doc(
"Whether to enable debug mode for Comet. " +
"When enabled, Comet will do additional checks for debugging
purpose. For example, " +
@@ -461,6 +498,7 @@ object CometConf extends ShimCometConf {
val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explain.verbose.enabled")
+ .category(CATEGORY_EXEC)
.doc(
"When this setting is enabled, Comet's extended explain output will
provide the full " +
"query plan annotated with fallback reasons as well as a summary of
how much of " +
@@ -471,6 +509,7 @@ object CometConf extends ShimCometConf {
val COMET_EXPLAIN_NATIVE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explain.native.enabled")
+ .category(CATEGORY_EXEC)
.doc(
"When this setting is enabled, Comet will provide a tree
representation of " +
"the native query plan before execution and again after execution,
with " +
@@ -480,6 +519,7 @@ object CometConf extends ShimCometConf {
val COMET_EXPLAIN_TRANSFORMATIONS: ConfigEntry[Boolean] =
conf("spark.comet.explain.rules")
+ .category(CATEGORY_EXEC)
.doc("When this setting is enabled, Comet will log all plan
transformations performed " +
"in physical optimizer rules. Default: false")
.internal()
@@ -488,6 +528,7 @@ object CometConf extends ShimCometConf {
val COMET_LOG_FALLBACK_REASONS: ConfigEntry[Boolean] =
conf("spark.comet.logFallbackReasons.enabled")
+ .category(CATEGORY_EXEC)
.doc("When this setting is enabled, Comet will log warnings for all
fallback reasons.")
.booleanConf
.createWithDefault(
@@ -495,6 +536,7 @@ object CometConf extends ShimCometConf {
val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explainFallback.enabled")
+ .category(CATEGORY_EXEC)
.doc(
"When this setting is enabled, Comet will provide logging explaining
the reason(s) " +
"why a query stage cannot be executed natively. Set this to false to
" +
@@ -503,18 +545,21 @@ object CometConf extends ShimCometConf {
.createWithDefault(false)
val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
+ .category(CATEGORY_TUNING)
.doc("The columnar batch size, i.e., the maximum number of rows that a
batch can contain.")
.intConf
.createWithDefault(8192)
val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
conf("spark.comet.parquet.enable.directBuffer")
+ .category(CATEGORY_PARQUET)
.doc("Whether to use Java direct byte buffer when reading Parquet.")
.booleanConf
.createWithDefault(false)
val COMET_ENABLE_ONHEAP_MODE: ConfigEntry[Boolean] =
conf("spark.comet.exec.onHeap.enabled")
+ .category(CATEGORY_TESTING)
.doc("Whether to allow Comet to run in on-heap mode. Required for
running Spark SQL tests.")
.internal()
.booleanConf
@@ -522,6 +567,7 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_OFFHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] =
conf("spark.comet.exec.memoryPool")
+ .category(CATEGORY_TUNING)
.doc(
"The type of memory pool to be used for Comet native execution when
running Spark in " +
"off-heap mode. Available pool types are `greedy_unified` and
`fair_unified`. " +
@@ -531,6 +577,7 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf(
"spark.comet.exec.onHeap.memoryPool")
+ .category(CATEGORY_TUNING)
.doc(
"The type of memory pool to be used for Comet native execution " +
"when running Spark in on-heap mode. Available pool types are
`greedy`, `fair_spill`, " +
@@ -542,6 +589,7 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_MEMORY_POOL_FRACTION: ConfigEntry[Double] =
conf("spark.comet.exec.memoryPool.fraction")
+ .category(CATEGORY_TUNING)
.doc(
"Fraction of off-heap memory pool that is available to Comet. " +
"Only applies to off-heap mode. " +
@@ -551,12 +599,14 @@ object CometConf extends ShimCometConf {
val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.preFetch.enabled")
+ .category(CATEGORY_SCAN)
.doc("Whether to enable pre-fetching feature of CometScan.")
.booleanConf
.createWithDefault(false)
val COMET_SCAN_PREFETCH_THREAD_NUM: ConfigEntry[Int] =
conf("spark.comet.scan.preFetch.threadNum")
+ .category(CATEGORY_SCAN)
.doc(
"The number of threads running pre-fetching for CometScan. Effective
if " +
s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. Note that more " +
@@ -565,6 +615,7 @@ object CometConf extends ShimCometConf {
.createWithDefault(2)
val COMET_NATIVE_LOAD_REQUIRED: ConfigEntry[Boolean] =
conf("spark.comet.nativeLoadRequired")
+ .category(CATEGORY_EXEC)
.doc(
"Whether to require Comet native library to load successfully when Comet
is enabled. " +
"If not, Comet will silently fallback to Spark when it fails to load
the native lib. " +
@@ -574,6 +625,7 @@ object CometConf extends ShimCometConf {
val COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP: ConfigEntry[Boolean] =
conf("spark.comet.exceptionOnDatetimeRebase")
+ .category(CATEGORY_EXEC)
.doc("Whether to throw exception when seeing dates/timestamps from the
legacy hybrid " +
"(Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were
written according " +
"to the Proleptic Gregorian calendar. When this is true, Comet will " +
@@ -585,6 +637,7 @@ object CometConf extends ShimCometConf {
val COMET_USE_DECIMAL_128: ConfigEntry[Boolean] =
conf("spark.comet.use.decimal128")
.internal()
+ .category(CATEGORY_EXEC)
.doc("If true, Comet will always use 128 bits to represent a decimal
value, regardless of " +
"its precision. If false, Comet will use 32, 64 and 128 bits
respectively depending on " +
"the precision. N.B. this is NOT a user-facing config but should be
inferred and set by " +
@@ -595,6 +648,7 @@ object CometConf extends ShimCometConf {
val COMET_USE_LAZY_MATERIALIZATION: ConfigEntry[Boolean] = conf(
"spark.comet.use.lazyMaterialization")
.internal()
+ .category(CATEGORY_PARQUET)
.doc(
"Whether to enable lazy materialization for Comet. When this is turned
on, Comet will " +
"read Parquet data source lazily for string and binary columns. For
filter operations, " +
@@ -602,19 +656,19 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)
- val COMET_SCHEMA_EVOLUTION_ENABLED: ConfigEntry[Boolean] = conf(
- "spark.comet.schemaEvolution.enabled")
- .internal()
- .doc(
- "Whether to enable schema evolution in Comet. For instance, promoting a
integer " +
+ val COMET_SCHEMA_EVOLUTION_ENABLED: ConfigEntry[Boolean] =
+ conf("spark.comet.schemaEvolution.enabled")
+ .internal()
+ .category(CATEGORY_SCAN)
+ .doc("Whether to enable schema evolution in Comet. For instance,
promoting a integer " +
"column to a long column, a float column to a double column, etc. This
is automatically" +
"enabled when reading from Iceberg tables.")
- .booleanConf
- .createWithDefault(COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT)
+ .booleanConf
+ .createWithDefault(COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT)
val COMET_SPARK_TO_ARROW_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.sparkToColumnar.enabled")
- .internal()
+ .category(CATEGORY_SCAN)
.doc("Whether to enable Spark to Arrow columnar conversion. When this is
turned on, " +
"Comet will convert operators in " +
"`spark.comet.sparkToColumnar.supportedOperatorList` into Arrow
columnar format before " +
@@ -624,50 +678,50 @@ object CometConf extends ShimCometConf {
val COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST: ConfigEntry[Seq[String]] =
conf("spark.comet.sparkToColumnar.supportedOperatorList")
- .doc(
- "A comma-separated list of operators that will be converted to Arrow
columnar " +
- "format when 'spark.comet.sparkToColumnar.enabled' is true")
+ .category(CATEGORY_SCAN)
+ .doc("A comma-separated list of operators that will be converted to
Arrow columnar " +
+ "format when 'spark.comet.sparkToColumnar.enabled' is true")
.stringConf
.toSequence
.createWithDefault(Seq("Range,InMemoryTableScan,RDDScan"))
val COMET_CASE_CONVERSION_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.caseConversion.enabled")
- .doc(
- "Java uses locale-specific rules when converting strings to upper or
lower case and " +
- "Rust does not, so we disable upper and lower by default.")
+ .category(CATEGORY_EXEC)
+ .doc("Java uses locale-specific rules when converting strings to upper
or lower case and " +
+ "Rust does not, so we disable upper and lower by default.")
.booleanConf
.createWithDefault(false)
val COMET_SCAN_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.scan.allowIncompatible")
- .doc(
- "Some Comet scan implementations are not currently fully compatible
with Spark for " +
- s"all datatypes. Set this config to true to allow them anyway.
$COMPAT_GUIDE.")
+ .category(CATEGORY_SCAN)
+ .doc("Some Comet scan implementations are not currently fully compatible
with Spark for " +
+ s"all datatypes. Set this config to true to allow them anyway.
$COMPAT_GUIDE.")
.booleanConf
.createWithDefault(false)
val COMET_EXPR_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.expression.allowIncompatible")
- .doc(
- "Comet is not currently fully compatible with Spark for all
expressions. " +
- s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
+ .category(CATEGORY_EXEC)
+ .doc("Comet is not currently fully compatible with Spark for all
expressions. " +
+ s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
.booleanConf
.createWithDefault(false)
val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.regexp.allowIncompatible")
- .doc(
- "Comet is not currently fully compatible with Spark for all regular
expressions. " +
- s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
+ .category(CATEGORY_EXEC)
+ .doc("Comet is not currently fully compatible with Spark for all regular
expressions. " +
+ s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
.booleanConf
.createWithDefault(false)
val COMET_METRICS_UPDATE_INTERVAL: ConfigEntry[Long] =
conf("spark.comet.metrics.updateInterval")
- .doc(
- "The interval in milliseconds to update metrics. If interval is
negative," +
- " metrics will be updated upon task completion.")
+ .category(CATEGORY_EXEC)
+ .doc("The interval in milliseconds to update metrics. If interval is
negative," +
+ " metrics will be updated upon task completion.")
.longConf
.createWithDefault(3000L)
@@ -675,14 +729,15 @@ object CometConf extends ShimCometConf {
val COMET_LIBHDFS_SCHEMES: OptionalConfigEntry[String] =
conf(s"spark.hadoop.$COMET_LIBHDFS_SCHEMES_KEY")
- .doc(
- "Defines filesystem schemes (e.g., hdfs, webhdfs) that the native side
accesses " +
- "via libhdfs, separated by commas. Valid only when built with hdfs
feature enabled.")
+ .category(CATEGORY_SCAN)
+ .doc("Defines filesystem schemes (e.g., hdfs, webhdfs) that the native
side accesses " +
+ "via libhdfs, separated by commas. Valid only when built with hdfs
feature enabled.")
.stringConf
.createOptional
val COMET_MAX_TEMP_DIRECTORY_SIZE: ConfigEntry[Long] =
conf("spark.comet.maxTempDirectorySize")
+ .category(CATEGORY_EXEC)
.doc("The maximum amount of data (in bytes) stored inside the temporary
directories.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(100L * 1024 * 1024 * 1024) // 100 GB
@@ -693,6 +748,7 @@ object CometConf extends ShimCometConf {
defaultValue: Boolean,
notes: Option[String] = None): ConfigEntry[Boolean] = {
conf(s"$COMET_EXEC_CONFIG_PREFIX.$exec.enabled")
+ .category(CATEGORY_ENABLE_EXEC)
.doc(
s"Whether to enable $exec by default." + notes
.map(s => s" $s.")
@@ -816,6 +872,7 @@ private class TypedConfigBuilder[T](
converter,
stringConverter,
parent._doc,
+ parent._category,
parent._public,
parent._version)
CometConf.register(conf)
@@ -831,6 +888,7 @@ private class TypedConfigBuilder[T](
converter,
stringConverter,
parent._doc,
+ parent._category,
parent._public,
parent._version)
CometConf.register(conf)
@@ -843,6 +901,7 @@ private[comet] abstract class ConfigEntry[T](
val valueConverter: String => T,
val stringConverter: T => String,
val doc: String,
+ val category: String,
val isPublic: Boolean,
val version: String) {
@@ -874,9 +933,10 @@ private[comet] class ConfigEntryWithDefault[T](
valueConverter: String => T,
stringConverter: T => String,
doc: String,
+ category: String,
isPublic: Boolean,
version: String)
- extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic,
version) {
+ extends ConfigEntry(key, valueConverter, stringConverter, doc, category,
isPublic, version) {
override def defaultValue: Option[T] = Some(_defaultValue)
override def defaultValueString: String = stringConverter(_defaultValue)
@@ -896,6 +956,7 @@ private[comet] class OptionalConfigEntry[T](
val rawValueConverter: String => T,
val rawStringConverter: T => String,
doc: String,
+ category: String,
isPublic: Boolean,
version: String)
extends ConfigEntry[Option[T]](
@@ -903,6 +964,7 @@ private[comet] class OptionalConfigEntry[T](
s => Some(rawValueConverter(s)),
v => v.map(rawStringConverter).orNull,
doc,
+ category,
isPublic,
version) {
@@ -920,6 +982,7 @@ private[comet] case class ConfigBuilder(key: String) {
var _public = true
var _doc = ""
var _version = ""
+ var _category = ""
def internal(): ConfigBuilder = {
_public = false
@@ -931,6 +994,11 @@ private[comet] case class ConfigBuilder(key: String) {
this
}
+ def category(s: String): ConfigBuilder = {
+ _category = s
+ this
+ }
+
def version(v: String): ConfigBuilder = {
_version = v
this
diff --git a/docs/source/user-guide/latest/configs.md
b/docs/source/user-guide/latest/configs.md
index 918fbc4b1..1a7cba1a4 100644
--- a/docs/source/user-guide/latest/configs.md
+++ b/docs/source/user-guide/latest/configs.md
@@ -21,72 +21,286 @@ under the License.
Comet provides the following configuration settings.
-<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->
+## Scan Configuration Settings
-<!--BEGIN:CONFIG_TABLE-->
+<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->
+<!--BEGIN:CONFIG_TABLE[scan]-->
| Config | Description | Default Value |
|--------|-------------|---------------|
-| spark.comet.batchSize | The columnar batch size, i.e., the maximum number of
rows that a batch can contain. | 8192 |
-| spark.comet.caseConversion.enabled | Java uses locale-specific rules when
converting strings to upper or lower case and Rust does not, so we disable
upper and lower by default. | false |
-| spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous
shuffle for Arrow-based shuffle. | false |
-| spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of
threads on an executor used for Comet async columnar shuffle. This is the upper
bound of total number of shuffle threads per executor. In other words, if the
number of cores * the number of shuffle threads per task
`spark.comet.columnar.shuffle.async.thread.num` is larger than this config.
Comet will use this config as the number of shuffle threads per executor
instead. | 100 |
-| spark.comet.columnar.shuffle.async.thread.num | Number of threads used for
Comet async columnar shuffle per shuffle task. Note that more threads means
more memory requirement to buffer shuffle data before flushing to disk. Also,
more threads may not always improve performance, and should be set based on the
number of cores available. | 3 |
| spark.comet.convert.csv.enabled | When enabled, data from Spark (non-native)
CSV v1 and v2 scans will be converted to Arrow format. Note that to enable
native vectorized execution, both this config and 'spark.comet.exec.enabled'
need to be enabled. | false |
| spark.comet.convert.json.enabled | When enabled, data from Spark
(non-native) JSON v1 and v2 scans will be converted to Arrow format. Note that
to enable native vectorized execution, both this config and
'spark.comet.exec.enabled' need to be enabled. | false |
| spark.comet.convert.parquet.enabled | When enabled, data from Spark
(non-native) Parquet v1 and v2 scans will be converted to Arrow format. Note
that to enable native vectorized execution, both this config and
'spark.comet.exec.enabled' need to be enabled. | false |
+| spark.comet.scan.allowIncompatible | Some Comet scan implementations are not
currently fully compatible with Spark for all datatypes. Set this config to
true to allow them anyway. For more information, refer to the Comet
Compatibility Guide
(https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
+| spark.comet.scan.enabled | Whether to enable native scans. When this is
turned on, Spark will use Comet to read supported data sources (currently only
Parquet is supported natively). Note that to enable native vectorized
execution, both this config and 'spark.comet.exec.enabled' need to be enabled.
| true |
+| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature
of CometScan. | false |
+| spark.comet.scan.preFetch.threadNum | The number of threads running
pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is
enabled. Note that more pre-fetching threads means more memory requirement to
store pre-fetched row groups. | 2 |
+| spark.comet.sparkToColumnar.enabled | Whether to enable Spark to Arrow
columnar conversion. When this is turned on, Comet will convert operators in
`spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format
before processing. | false |
+| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list
of operators that will be converted to Arrow columnar format when
'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan,RDDScan
|
+| spark.hadoop.fs.comet.libhdfs.schemes | Defines filesystem schemes (e.g.,
hdfs, webhdfs) that the native side accesses via libhdfs, separated by commas.
Valid only when built with hdfs feature enabled. | |
+<!--END:CONFIG_TABLE-->
+
+## Parquet Reader Configuration Settings
+
+<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->
+<!--BEGIN:CONFIG_TABLE[parquet]-->
+| Config | Description | Default Value |
+|--------|-------------|---------------|
+| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte
buffer when reading Parquet. | false |
+| spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader,
if the read ranges submitted are skewed in sizes, this option will cause the
reader to break up larger read ranges into smaller ranges to reduce the skew.
This will result in a slightly larger number of connections opened to the file
system but may give improved performance. | false |
+| spark.comet.parquet.read.io.mergeRanges | When enabled the parallel reader
will try to merge ranges of data that are separated by less than
'comet.parquet.read.io.mergeRanges.delta' bytes. Longer continuous reads are
faster on cloud storage. | true |
+| spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between
consecutive read ranges below which the parallel reader will try to merge the
ranges. The default is 8MB. | 8388608 |
+| spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's
parallel reader for Parquet files. The parallel reader reads ranges of
consecutive data in a file in parallel. It is faster for large files and row
groups but uses more resources. | true |
+| spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number
of parallel threads the parallel reader will use in a single executor. For
executors configured with a smaller number of cores, use a smaller number. | 16
|
+| spark.comet.parquet.respectFilterPushdown | Whether to respect Spark's
PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be respected when running
the Spark SQL test suite but the default setting results in poor performance in
Comet when using the new native scans, disabled by default | false |
+<!--END:CONFIG_TABLE-->
+
+## Query Execution Settings
+
+<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->
+<!--BEGIN:CONFIG_TABLE[exec]-->
+| Config | Description | Default Value |
+|--------|-------------|---------------|
+| spark.comet.caseConversion.enabled | Java uses locale-specific rules when
converting strings to upper or lower case and Rust does not, so we disable
upper and lower by default. | false |
| spark.comet.debug.enabled | Whether to enable debug mode for Comet. When
enabled, Comet will do additional checks for debugging purpose. For example,
validating array when importing arrays from JVM at native side. Note that these
checks may be expensive in performance and should only be enabled for debugging
purpose. | false |
| spark.comet.dppFallback.enabled | Whether to fall back to Spark for queries
that use DPP. | true |
| spark.comet.enabled | Whether to enable Comet extension for Spark. When this
is turned on, Spark will use Comet to read Parquet data source. Note that to
enable native vectorized execution, both this config and
'spark.comet.exec.enabled' need to be enabled. By default, this config is the
value of the env var `ENABLE_COMET` if set, or true otherwise. | true |
| spark.comet.exceptionOnDatetimeRebase | Whether to throw exception when
seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar.
Since Spark 3, dates/timestamps were written according to the Proleptic
Gregorian calendar. When this is true, Comet will throw exceptions when seeing
these dates/timestamps that were written by Spark version before 3.0. If this
is false, these dates/timestamps will be read as if they were written to the
Proleptic Gregorian calendar and w [...]
+| spark.comet.exec.enabled | Whether to enable Comet native vectorized
execution for Spark. This controls whether Spark should convert operators into
their Comet counterparts and execute them in native space. Note: each operator
is associated with a separate config in the format of
'spark.comet.exec.<operator_name>.enabled' at the moment, and both the config
and this need to be turned on, in order for the operator to be executed in
native. | true |
+| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark
to replace SortMergeJoin with ShuffledHashJoin for improved performance. This
feature is not stable yet. For more information, refer to the Comet Tuning
Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
+| spark.comet.explain.native.enabled | When this setting is enabled, Comet
will provide a tree representation of the native query plan before execution
and again after execution, with metrics. | false |
+| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet's
extended explain output will provide the full query plan annotated with
fallback reasons as well as a summary of how much of the plan was accelerated
by Comet. When this setting is disabled, a list of fallback reasons will be
provided instead. | false |
+| spark.comet.explainFallback.enabled | When this setting is enabled, Comet
will provide logging explaining the reason(s) why a query stage cannot be
executed natively. Set this to false to reduce the amount of logging. | false |
+| spark.comet.expression.allowIncompatible | Comet is not currently fully
compatible with Spark for all expressions. Set this config to true to allow
them anyway. For more information, refer to the Comet Compatibility Guide
(https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
+| spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet
will log warnings for all fallback reasons. | false |
+| spark.comet.maxTempDirectorySize | The maximum amount of data (in bytes)
stored inside the temporary directories. | 107374182400b |
+| spark.comet.metrics.updateInterval | The interval in milliseconds to update
metrics. If interval is negative, metrics will be updated upon task completion.
| 3000 |
+| spark.comet.nativeLoadRequired | Whether to require Comet native library to
load successfully when Comet is enabled. If not, Comet will silently fallback
to Spark when it fails to load the native lib. Otherwise, an error will be
thrown and the Spark job will be aborted. | false |
+| spark.comet.regexp.allowIncompatible | Comet is not currently fully
compatible with Spark for all regular expressions. Set this config to true to
allow them anyway. For more information, refer to the Comet Compatibility Guide
(https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
+<!--END:CONFIG_TABLE-->
+
+## Enabling or Disabling Individual Operators
+
+<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->
+<!--BEGIN:CONFIG_TABLE[enable_exec]-->
+| Config | Description | Default Value |
+|--------|-------------|---------------|
| spark.comet.exec.aggregate.enabled | Whether to enable aggregate by default.
| true |
| spark.comet.exec.broadcastExchange.enabled | Whether to enable
broadcastExchange by default. | true |
| spark.comet.exec.broadcastHashJoin.enabled | Whether to enable
broadcastHashJoin by default. | true |
| spark.comet.exec.coalesce.enabled | Whether to enable coalesce by default. |
true |
| spark.comet.exec.collectLimit.enabled | Whether to enable collectLimit by
default. | true |
-| spark.comet.exec.enabled | Whether to enable Comet native vectorized
execution for Spark. This controls whether Spark should convert operators into
their Comet counterparts and execute them in native space. Note: each operator
is associated with a separate config in the format of
'spark.comet.exec.<operator_name>.enabled' at the moment, and both the config
and this need to be turned on, in order for the operator to be executed in
native. | true |
| spark.comet.exec.expand.enabled | Whether to enable expand by default. |
true |
| spark.comet.exec.filter.enabled | Whether to enable filter by default. |
true |
| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by
default. | true |
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. |
true |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by
default. | true |
-| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet
native execution when running Spark in off-heap mode. Available pool types are
'greedy_unified' and `fair_unified`. For more information, refer to the Comet
Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). |
fair_unified |
-| spark.comet.exec.memoryPool.fraction | Fraction of off-heap memory pool that
is available to Comet. Only applies to off-heap mode. For more information,
refer to the Comet Tuning Guide
(https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 |
| spark.comet.exec.project.enabled | Whether to enable project by default. |
true |
-| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark
to replace SortMergeJoin with ShuffledHashJoin for improved performance. This
feature is not stable yet. For more information, refer to the Comet Tuning
Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
-| spark.comet.exec.shuffle.compression.codec | The codec of Comet native
shuffle used to compress shuffle data. lz4, zstd, and snappy are supported.
Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
-| spark.comet.exec.shuffle.compression.zstd.level | The compression level to
use when compressing shuffle files with zstd. | 1 |
-| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle.
Note that this requires setting 'spark.shuffle.manager' to
'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'.
'spark.shuffle.manager' must be set before starting the Spark application and
cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by
default. | true |
| spark.comet.exec.sortMergeJoinWithJoinFilter.enabled | Experimental support
for Sort Merge Join with filter | false |
-| spark.comet.exec.stddev.enabled | Whether to enable stddev by default.
stddev is slower than Spark's implementation. | true |
| spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable
takeOrderedAndProject by default. | true |
| spark.comet.exec.union.enabled | Whether to enable union by default. | true |
| spark.comet.exec.window.enabled | Whether to enable window by default. |
true |
-| spark.comet.explain.native.enabled | When this setting is enabled, Comet
will provide a tree representation of the native query plan before execution
and again after execution, with metrics. | false |
-| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet's
extended explain output will provide the full query plan annotated with
fallback reasons as well as a summary of how much of the plan was accelerated
by Comet. When this setting is disabled, a list of fallback reasons will be
provided instead. | false |
-| spark.comet.explainFallback.enabled | When this setting is enabled, Comet
will provide logging explaining the reason(s) why a query stage cannot be
executed natively. Set this to false to reduce the amount of logging. | false |
-| spark.comet.expression.allowIncompatible | Comet is not currently fully
compatible with Spark for all expressions. Set this config to true to allow
them anyway. For more information, refer to the Comet Compatibility Guide
(https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
-| spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet
will log warnings for all fallback reasons. | false |
-| spark.comet.maxTempDirectorySize | The maximum amount of data (in bytes)
stored inside the temporary directories. | 107374182400b |
-| spark.comet.metrics.updateInterval | The interval in milliseconds to update
metrics. If interval is negative, metrics will be updated upon task completion.
| 3000 |
+<!--END:CONFIG_TABLE-->
+
+## Enabling or Disabling Individual Scalar Expressions
+
+<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->
+<!--BEGIN:CONFIG_TABLE[enable_expr]-->
+| Config | Description | Default Value |
+|--------|-------------|---------------|
+| spark.comet.expression.Acos.enabled | Enable Comet acceleration for Acos |
true |
+| spark.comet.expression.Add.enabled | Enable Comet acceleration for Add |
true |
+| spark.comet.expression.Alias.enabled | Enable Comet acceleration for Alias |
true |
+| spark.comet.expression.And.enabled | Enable Comet acceleration for And |
true |
+| spark.comet.expression.ArrayAppend.enabled | Enable Comet acceleration for
ArrayAppend | true |
+| spark.comet.expression.ArrayCompact.enabled | Enable Comet acceleration for
ArrayCompact | true |
+| spark.comet.expression.ArrayContains.enabled | Enable Comet acceleration for
ArrayContains | true |
+| spark.comet.expression.ArrayDistinct.enabled | Enable Comet acceleration for
ArrayDistinct | true |
+| spark.comet.expression.ArrayExcept.enabled | Enable Comet acceleration for
ArrayExcept | true |
+| spark.comet.expression.ArrayFilter.enabled | Enable Comet acceleration for
ArrayFilter | true |
+| spark.comet.expression.ArrayInsert.enabled | Enable Comet acceleration for
ArrayInsert | true |
+| spark.comet.expression.ArrayIntersect.enabled | Enable Comet acceleration
for ArrayIntersect | true |
+| spark.comet.expression.ArrayJoin.enabled | Enable Comet acceleration for
ArrayJoin | true |
+| spark.comet.expression.ArrayMax.enabled | Enable Comet acceleration for
ArrayMax | true |
+| spark.comet.expression.ArrayMin.enabled | Enable Comet acceleration for
ArrayMin | true |
+| spark.comet.expression.ArrayRemove.enabled | Enable Comet acceleration for
ArrayRemove | true |
+| spark.comet.expression.ArrayRepeat.enabled | Enable Comet acceleration for
ArrayRepeat | true |
+| spark.comet.expression.ArrayUnion.enabled | Enable Comet acceleration for
ArrayUnion | true |
+| spark.comet.expression.ArraysOverlap.enabled | Enable Comet acceleration for
ArraysOverlap | true |
+| spark.comet.expression.Ascii.enabled | Enable Comet acceleration for Ascii |
true |
+| spark.comet.expression.Asin.enabled | Enable Comet acceleration for Asin |
true |
+| spark.comet.expression.Atan.enabled | Enable Comet acceleration for Atan |
true |
+| spark.comet.expression.Atan2.enabled | Enable Comet acceleration for Atan2 |
true |
+| spark.comet.expression.AttributeReference.enabled | Enable Comet
acceleration for AttributeReference | true |
+| spark.comet.expression.BitLength.enabled | Enable Comet acceleration for
BitLength | true |
+| spark.comet.expression.BitwiseAnd.enabled | Enable Comet acceleration for
BitwiseAnd | true |
+| spark.comet.expression.BitwiseCount.enabled | Enable Comet acceleration for
BitwiseCount | true |
+| spark.comet.expression.BitwiseGet.enabled | Enable Comet acceleration for
BitwiseGet | true |
+| spark.comet.expression.BitwiseNot.enabled | Enable Comet acceleration for
BitwiseNot | true |
+| spark.comet.expression.BitwiseOr.enabled | Enable Comet acceleration for
BitwiseOr | true |
+| spark.comet.expression.BitwiseXor.enabled | Enable Comet acceleration for
BitwiseXor | true |
+| spark.comet.expression.CaseWhen.enabled | Enable Comet acceleration for
CaseWhen | true |
+| spark.comet.expression.Cast.enabled | Enable Comet acceleration for Cast |
true |
+| spark.comet.expression.Ceil.enabled | Enable Comet acceleration for Ceil |
true |
+| spark.comet.expression.CheckOverflow.enabled | Enable Comet acceleration for
CheckOverflow | true |
+| spark.comet.expression.Chr.enabled | Enable Comet acceleration for Chr |
true |
+| spark.comet.expression.Coalesce.enabled | Enable Comet acceleration for
Coalesce | true |
+| spark.comet.expression.ConcatWs.enabled | Enable Comet acceleration for
ConcatWs | true |
+| spark.comet.expression.Contains.enabled | Enable Comet acceleration for
Contains | true |
+| spark.comet.expression.Cos.enabled | Enable Comet acceleration for Cos |
true |
+| spark.comet.expression.CreateArray.enabled | Enable Comet acceleration for
CreateArray | true |
+| spark.comet.expression.CreateNamedStruct.enabled | Enable Comet acceleration
for CreateNamedStruct | true |
+| spark.comet.expression.DateAdd.enabled | Enable Comet acceleration for
DateAdd | true |
+| spark.comet.expression.DateSub.enabled | Enable Comet acceleration for
DateSub | true |
+| spark.comet.expression.DayOfMonth.enabled | Enable Comet acceleration for
DayOfMonth | true |
+| spark.comet.expression.DayOfWeek.enabled | Enable Comet acceleration for
DayOfWeek | true |
+| spark.comet.expression.DayOfYear.enabled | Enable Comet acceleration for
DayOfYear | true |
+| spark.comet.expression.Divide.enabled | Enable Comet acceleration for Divide
| true |
+| spark.comet.expression.ElementAt.enabled | Enable Comet acceleration for
ElementAt | true |
+| spark.comet.expression.EndsWith.enabled | Enable Comet acceleration for
EndsWith | true |
+| spark.comet.expression.EqualNullSafe.enabled | Enable Comet acceleration for
EqualNullSafe | true |
+| spark.comet.expression.EqualTo.enabled | Enable Comet acceleration for
EqualTo | true |
+| spark.comet.expression.Exp.enabled | Enable Comet acceleration for Exp |
true |
+| spark.comet.expression.Expm1.enabled | Enable Comet acceleration for Expm1 |
true |
+| spark.comet.expression.Flatten.enabled | Enable Comet acceleration for
Flatten | true |
+| spark.comet.expression.Floor.enabled | Enable Comet acceleration for Floor |
true |
+| spark.comet.expression.FromUnixTime.enabled | Enable Comet acceleration for
FromUnixTime | true |
+| spark.comet.expression.GetArrayItem.enabled | Enable Comet acceleration for
GetArrayItem | true |
+| spark.comet.expression.GetArrayStructFields.enabled | Enable Comet
acceleration for GetArrayStructFields | true |
+| spark.comet.expression.GetMapValue.enabled | Enable Comet acceleration for
GetMapValue | true |
+| spark.comet.expression.GetStructField.enabled | Enable Comet acceleration
for GetStructField | true |
+| spark.comet.expression.GreaterThan.enabled | Enable Comet acceleration for
GreaterThan | true |
+| spark.comet.expression.GreaterThanOrEqual.enabled | Enable Comet
acceleration for GreaterThanOrEqual | true |
+| spark.comet.expression.Hex.enabled | Enable Comet acceleration for Hex |
true |
+| spark.comet.expression.Hour.enabled | Enable Comet acceleration for Hour |
true |
+| spark.comet.expression.If.enabled | Enable Comet acceleration for If | true |
+| spark.comet.expression.In.enabled | Enable Comet acceleration for In | true |
+| spark.comet.expression.InSet.enabled | Enable Comet acceleration for InSet |
true |
+| spark.comet.expression.InitCap.enabled | Enable Comet acceleration for
InitCap | true |
+| spark.comet.expression.IntegralDivide.enabled | Enable Comet acceleration
for IntegralDivide | true |
+| spark.comet.expression.IsNaN.enabled | Enable Comet acceleration for IsNaN |
true |
+| spark.comet.expression.IsNotNull.enabled | Enable Comet acceleration for
IsNotNull | true |
+| spark.comet.expression.IsNull.enabled | Enable Comet acceleration for IsNull
| true |
+| spark.comet.expression.Length.enabled | Enable Comet acceleration for Length
| true |
+| spark.comet.expression.LessThan.enabled | Enable Comet acceleration for
LessThan | true |
+| spark.comet.expression.LessThanOrEqual.enabled | Enable Comet acceleration
for LessThanOrEqual | true |
+| spark.comet.expression.Like.enabled | Enable Comet acceleration for Like |
true |
+| spark.comet.expression.Literal.enabled | Enable Comet acceleration for
Literal | true |
+| spark.comet.expression.Log.enabled | Enable Comet acceleration for Log |
true |
+| spark.comet.expression.Log10.enabled | Enable Comet acceleration for Log10 |
true |
+| spark.comet.expression.Log2.enabled | Enable Comet acceleration for Log2 |
true |
+| spark.comet.expression.Lower.enabled | Enable Comet acceleration for Lower |
true |
+| spark.comet.expression.MapEntries.enabled | Enable Comet acceleration for
MapEntries | true |
+| spark.comet.expression.MapFromArrays.enabled | Enable Comet acceleration for
MapFromArrays | true |
+| spark.comet.expression.MapKeys.enabled | Enable Comet acceleration for
MapKeys | true |
+| spark.comet.expression.MapValues.enabled | Enable Comet acceleration for
MapValues | true |
+| spark.comet.expression.Md5.enabled | Enable Comet acceleration for Md5 |
true |
+| spark.comet.expression.Minute.enabled | Enable Comet acceleration for Minute
| true |
+| spark.comet.expression.MonotonicallyIncreasingID.enabled | Enable Comet
acceleration for MonotonicallyIncreasingID | true |
+| spark.comet.expression.Month.enabled | Enable Comet acceleration for Month |
true |
+| spark.comet.expression.Multiply.enabled | Enable Comet acceleration for
Multiply | true |
+| spark.comet.expression.Murmur3Hash.enabled | Enable Comet acceleration for
Murmur3Hash | true |
+| spark.comet.expression.Not.enabled | Enable Comet acceleration for Not |
true |
+| spark.comet.expression.OctetLength.enabled | Enable Comet acceleration for
OctetLength | true |
+| spark.comet.expression.Or.enabled | Enable Comet acceleration for Or | true |
+| spark.comet.expression.Pow.enabled | Enable Comet acceleration for Pow |
true |
+| spark.comet.expression.Quarter.enabled | Enable Comet acceleration for
Quarter | true |
+| spark.comet.expression.RLike.enabled | Enable Comet acceleration for RLike |
true |
+| spark.comet.expression.Rand.enabled | Enable Comet acceleration for Rand |
true |
+| spark.comet.expression.Randn.enabled | Enable Comet acceleration for Randn |
true |
+| spark.comet.expression.RegExpReplace.enabled | Enable Comet acceleration for
RegExpReplace | true |
+| spark.comet.expression.Remainder.enabled | Enable Comet acceleration for
Remainder | true |
+| spark.comet.expression.Reverse.enabled | Enable Comet acceleration for
Reverse | true |
+| spark.comet.expression.Round.enabled | Enable Comet acceleration for Round |
true |
+| spark.comet.expression.Second.enabled | Enable Comet acceleration for Second
| true |
+| spark.comet.expression.Sha2.enabled | Enable Comet acceleration for Sha2 |
true |
+| spark.comet.expression.ShiftLeft.enabled | Enable Comet acceleration for
ShiftLeft | true |
+| spark.comet.expression.ShiftRight.enabled | Enable Comet acceleration for
ShiftRight | true |
+| spark.comet.expression.Signum.enabled | Enable Comet acceleration for Signum
| true |
+| spark.comet.expression.Sin.enabled | Enable Comet acceleration for Sin |
true |
+| spark.comet.expression.SparkPartitionID.enabled | Enable Comet acceleration
for SparkPartitionID | true |
+| spark.comet.expression.Sqrt.enabled | Enable Comet acceleration for Sqrt |
true |
+| spark.comet.expression.StartsWith.enabled | Enable Comet acceleration for
StartsWith | true |
+| spark.comet.expression.StringInstr.enabled | Enable Comet acceleration for
StringInstr | true |
+| spark.comet.expression.StringLPad.enabled | Enable Comet acceleration for
StringLPad | true |
+| spark.comet.expression.StringRPad.enabled | Enable Comet acceleration for
StringRPad | true |
+| spark.comet.expression.StringRepeat.enabled | Enable Comet acceleration for
StringRepeat | true |
+| spark.comet.expression.StringReplace.enabled | Enable Comet acceleration for
StringReplace | true |
+| spark.comet.expression.StringSpace.enabled | Enable Comet acceleration for
StringSpace | true |
+| spark.comet.expression.StringTranslate.enabled | Enable Comet acceleration
for StringTranslate | true |
+| spark.comet.expression.StringTrim.enabled | Enable Comet acceleration for
StringTrim | true |
+| spark.comet.expression.StringTrimBoth.enabled | Enable Comet acceleration
for StringTrimBoth | true |
+| spark.comet.expression.StringTrimLeft.enabled | Enable Comet acceleration
for StringTrimLeft | true |
+| spark.comet.expression.StringTrimRight.enabled | Enable Comet acceleration
for StringTrimRight | true |
+| spark.comet.expression.StructsToJson.enabled | Enable Comet acceleration for
StructsToJson | true |
+| spark.comet.expression.Substring.enabled | Enable Comet acceleration for
Substring | true |
+| spark.comet.expression.Subtract.enabled | Enable Comet acceleration for
Subtract | true |
+| spark.comet.expression.Tan.enabled | Enable Comet acceleration for Tan |
true |
+| spark.comet.expression.TruncDate.enabled | Enable Comet acceleration for
TruncDate | true |
+| spark.comet.expression.TruncTimestamp.enabled | Enable Comet acceleration
for TruncTimestamp | true |
+| spark.comet.expression.UnaryMinus.enabled | Enable Comet acceleration for
UnaryMinus | true |
+| spark.comet.expression.Unhex.enabled | Enable Comet acceleration for Unhex |
true |
+| spark.comet.expression.Upper.enabled | Enable Comet acceleration for Upper |
true |
+| spark.comet.expression.WeekDay.enabled | Enable Comet acceleration for
WeekDay | true |
+| spark.comet.expression.WeekOfYear.enabled | Enable Comet acceleration for
WeekOfYear | true |
+| spark.comet.expression.XxHash64.enabled | Enable Comet acceleration for
XxHash64 | true |
+| spark.comet.expression.Year.enabled | Enable Comet acceleration for Year |
true |
+<!--END:CONFIG_TABLE-->
+
+## Enabling or Disabling Individual Aggregate Expressions
+
+<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->
+<!--BEGIN:CONFIG_TABLE[enable_agg_expr]-->
+| Config | Description | Default Value |
+|--------|-------------|---------------|
+| spark.comet.expression.Average.enabled | Enable Comet acceleration for
Average | true |
+| spark.comet.expression.BitAndAgg.enabled | Enable Comet acceleration for
BitAndAgg | true |
+| spark.comet.expression.BitOrAgg.enabled | Enable Comet acceleration for
BitOrAgg | true |
+| spark.comet.expression.BitXorAgg.enabled | Enable Comet acceleration for
BitXorAgg | true |
+| spark.comet.expression.BloomFilterAggregate.enabled | Enable Comet
acceleration for BloomFilterAggregate | true |
+| spark.comet.expression.Corr.enabled | Enable Comet acceleration for Corr |
true |
+| spark.comet.expression.Count.enabled | Enable Comet acceleration for Count |
true |
+| spark.comet.expression.CovPopulation.enabled | Enable Comet acceleration for
CovPopulation | true |
+| spark.comet.expression.CovSample.enabled | Enable Comet acceleration for
CovSample | true |
+| spark.comet.expression.First.enabled | Enable Comet acceleration for First |
true |
+| spark.comet.expression.Last.enabled | Enable Comet acceleration for Last |
true |
+| spark.comet.expression.Max.enabled | Enable Comet acceleration for Max |
true |
+| spark.comet.expression.Min.enabled | Enable Comet acceleration for Min |
true |
+| spark.comet.expression.StddevPop.enabled | Enable Comet acceleration for
StddevPop | true |
+| spark.comet.expression.StddevSamp.enabled | Enable Comet acceleration for
StddevSamp | true |
+| spark.comet.expression.Sum.enabled | Enable Comet acceleration for Sum |
true |
+| spark.comet.expression.VariancePop.enabled | Enable Comet acceleration for
VariancePop | true |
+| spark.comet.expression.VarianceSamp.enabled | Enable Comet acceleration for
VarianceSamp | true |
+<!--END:CONFIG_TABLE-->
+
+## Shuffle Configuration Settings
+
+<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->
+<!--BEGIN:CONFIG_TABLE[shuffle]-->
+| Config | Description | Default Value |
+|--------|-------------|---------------|
+| spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous
shuffle for Arrow-based shuffle. | false |
+| spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of
threads on an executor used for Comet async columnar shuffle. This is the upper
bound of total number of shuffle threads per executor. In other words, if the
number of cores * the number of shuffle threads per task
`spark.comet.columnar.shuffle.async.thread.num` is larger than this config.
Comet will use this config as the number of shuffle threads per executor
instead. | 100 |
+| spark.comet.columnar.shuffle.async.thread.num | Number of threads used for
Comet async columnar shuffle per shuffle task. Note that more threads means
more memory requirement to buffer shuffle data before flushing to disk. Also,
more threads may not always improve performance, and should be set based on the
number of cores available. | 3 |
+| spark.comet.columnar.shuffle.batch.size | Batch size when writing out sorted
spill files on the native side. Note that this should not be larger than batch
size (i.e., `spark.comet.batchSize`). Otherwise it will produce larger batches
than expected in the native operator after shuffle. | 8192 |
+| spark.comet.exec.shuffle.compression.codec | The codec of Comet native
shuffle used to compress shuffle data. lz4, zstd, and snappy are supported.
Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
+| spark.comet.exec.shuffle.compression.zstd.level | The compression level to
use when compressing shuffle files with zstd. | 1 |
+| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle.
Note that this requires setting 'spark.shuffle.manager' to
'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'.
'spark.shuffle.manager' must be set before starting the Spark application and
cannot be changed during the application. | true |
| spark.comet.native.shuffle.partitioning.hash.enabled | Whether to enable
hash partitioning for Comet native shuffle. | true |
| spark.comet.native.shuffle.partitioning.range.enabled | Whether to enable
range partitioning for Comet native shuffle. | true |
-| spark.comet.nativeLoadRequired | Whether to require Comet native library to
load successfully when Comet is enabled. If not, Comet will silently fallback
to Spark when it fails to load the native lib. Otherwise, an error will be
thrown and the Spark job will be aborted. | false |
-| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte
buffer when reading Parquet. | false |
-| spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader,
if the read ranges submitted are skewed in sizes, this option will cause the
reader to break up larger read ranges into smaller ranges to reduce the skew.
This will result in a slightly larger number of connections opened to the file
system but may give improved performance. | false |
-| spark.comet.parquet.read.io.mergeRanges | When enabled the parallel reader
will try to merge ranges of data that are separated by less than
'comet.parquet.read.io.mergeRanges.delta' bytes. Longer continuous reads are
faster on cloud storage. | true |
-| spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between
consecutive read ranges below which the parallel reader will try to merge the
ranges. The default is 8MB. | 8388608 |
-| spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's
parallel reader for Parquet files. The parallel reader reads ranges of
consecutive data in a file in parallel. It is faster for large files and row
groups but uses more resources. | true |
-| spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number
of parallel threads the parallel reader will use in a single executor. For
executors configured with a smaller number of cores, use a smaller number. | 16
|
-| spark.comet.parquet.respectFilterPushdown | Whether to respect Spark's
PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be respected when running
the Spark SQL test suite but the default setting results in poor performance in
Comet when using the new native scans, disabled by default | false |
-| spark.comet.regexp.allowIncompatible | Comet is not currently fully
compatible with Spark for all regular expressions. Set this config to true to
allow them anyway. For more information, refer to the Comet Compatibility Guide
(https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
-| spark.comet.scan.allowIncompatible | Some Comet scan implementations are not
currently fully compatible with Spark for all datatypes. Set this config to
true to allow them anyway. For more information, refer to the Comet
Compatibility Guide
(https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
-| spark.comet.scan.enabled | Whether to enable native scans. When this is
turned on, Spark will use Comet to read supported data sources (currently only
Parquet is supported natively). Note that to enable native vectorized
execution, both this config and 'spark.comet.exec.enabled' need to be enabled.
| true |
-| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature
of CometScan. | false |
-| spark.comet.scan.preFetch.threadNum | The number of threads running
pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is
enabled. Note that more pre-fetching threads means more memory requirement to
store pre-fetched row groups. | 2 |
| spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to
distinct values in a string column to decide whether to prefer dictionary
encoding when shuffling the column. If the ratio is higher than this config,
dictionary encoding will be used on shuffling string column. This config is
effective if it is higher than 1.0. Note that this config is only used when
`spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 |
| spark.comet.shuffle.sizeInBytesMultiplier | Comet reports smaller sizes for
shuffle due to using Arrow's columnar memory format and this can result in
Spark choosing a different join strategy due to the estimated size of the
exchange being smaller. Comet will multiple sizeInBytes by this amount to avoid
regressions in join strategy. | 1.0 |
-| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list
of operators that will be converted to Arrow columnar format when
'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan,RDDScan
|
-| spark.hadoop.fs.comet.libhdfs.schemes | Defines filesystem schemes (e.g.,
hdfs, webhdfs) that the native side accesses via libhdfs, separated by commas.
Valid only when built with hdfs feature enabled. | |
+<!--END:CONFIG_TABLE-->
+
+## Memory & Tuning Configuration Settings
+
+<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->
+<!--BEGIN:CONFIG_TABLE[tuning]-->
+| Config | Description | Default Value |
+|--------|-------------|---------------|
+| spark.comet.batchSize | The columnar batch size, i.e., the maximum number of
rows that a batch can contain. | 8192 |
+| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet
native execution when running Spark in off-heap mode. Available pool types are
`greedy_unified` and `fair_unified`. For more information, refer to the Comet
Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). |
fair_unified |
+| spark.comet.exec.memoryPool.fraction | Fraction of off-heap memory pool that
is available to Comet. Only applies to off-heap mode. For more information,
refer to the Comet Tuning Guide
(https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 |
+| spark.comet.tracing.enabled | Enable fine-grained tracing of events and
memory usage. For more information, refer to the Comet Tracing Guide
(https://datafusion.apache.org/comet/user-guide/tracing.html). | false |
<!--END:CONFIG_TABLE-->
diff --git a/docs/source/user-guide/latest/expressions.md
b/docs/source/user-guide/latest/expressions.md
index 003d03a53..b07d788db 100644
--- a/docs/source/user-guide/latest/expressions.md
+++ b/docs/source/user-guide/latest/expressions.md
@@ -23,9 +23,10 @@ Comet supports the following Spark expressions. Expressions
that are marked as S
natively in Comet and provide the same results as Spark, or will fall back to
Spark for cases that would not
be compatible.
-All expressions are enabled by default, but can be disabled by setting
+All expressions are enabled by default, but most can be disabled by setting
`spark.comet.expression.EXPRNAME.enabled=false`, where `EXPRNAME` is the
expression name as specified in
-the following tables, such as `Length`, or `StartsWith`.
+the following tables, such as `Length`, or `StartsWith`. See the [Comet
Configuration Guide] for a full list
+of expressions that be disabled.
Expressions that are not Spark-compatible will fall back to Spark by default
and can be enabled by setting
`spark.comet.expression.EXPRNAME.allowIncompatible=true`.
@@ -269,4 +270,5 @@ incompatible expressions.
| ToPrettyString | Yes |
|
| UnscaledValue | Yes |
|
+[Comet Configuration Guide]: configs.md
[Comet Compatibility Guide]: compatibility.md
diff --git a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala
b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala
index dd4d931d0..3586e2860 100644
--- a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala
+++ b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.comet.expressions.{CometCast, CometEvalMode}
-import org.apache.comet.serde.{Compatible, Incompatible}
+import org.apache.comet.serde.{Compatible, Incompatible, QueryPlanSerde}
/**
* Utility for generating markdown documentation from the configs.
@@ -37,29 +37,48 @@ object GenerateDocs {
private def userGuideLocation = "docs/source/user-guide/latest/"
+ val publicConfigs: Set[ConfigEntry[_]] =
CometConf.allConfs.filter(_.isPublic).toSet
+
def main(args: Array[String]): Unit = {
generateConfigReference()
generateCompatibilityGuide()
}
private def generateConfigReference(): Unit = {
+
+ val pattern = "<!--BEGIN:CONFIG_TABLE\\[(.*)]-->".r
val filename = s"$userGuideLocation/configs.md"
val lines = readFile(filename)
val w = new BufferedOutputStream(new FileOutputStream(filename))
for (line <- lines) {
w.write(s"${line.stripTrailing()}\n".getBytes)
- if (line.trim == "<!--BEGIN:CONFIG_TABLE-->") {
- val publicConfigs = CometConf.allConfs.filter(_.isPublic)
- val confs = publicConfigs.sortBy(_.key)
- w.write("| Config | Description | Default Value |\n".getBytes)
- w.write("|--------|-------------|---------------|\n".getBytes)
- for (conf <- confs) {
- if (conf.defaultValue.isEmpty) {
- w.write(s"| ${conf.key} | ${conf.doc.trim} | |\n".getBytes)
- } else {
- w.write(s"| ${conf.key} | ${conf.doc.trim} |
${conf.defaultValueString} |\n".getBytes)
+ line match {
+ case pattern(category) =>
+ w.write("| Config | Description | Default Value |\n".getBytes)
+ w.write("|--------|-------------|---------------|\n".getBytes)
+ category match {
+ case "enable_expr" =>
+ for (expr <-
QueryPlanSerde.exprSerdeMap.keys.map(_.getSimpleName).toList.sorted) {
+ val config = s"spark.comet.expression.$expr.enabled"
+ w.write(s"| $config | Enable Comet acceleration for $expr |
true |\n".getBytes)
+ }
+ case "enable_agg_expr" =>
+ for (expr <-
QueryPlanSerde.aggrSerdeMap.keys.map(_.getSimpleName).toList.sorted) {
+ val config = s"spark.comet.expression.$expr.enabled"
+ w.write(s"| $config | Enable Comet acceleration for $expr |
true |\n".getBytes)
+ }
+ case _ =>
+ val confs = publicConfigs.filter(_.category ==
category).toList.sortBy(_.key)
+ for (conf <- confs) {
+ if (conf.defaultValue.isEmpty) {
+ w.write(s"| ${conf.key} | ${conf.doc.trim} | |\n".getBytes)
+ } else {
+ w.write(
+ s"| ${conf.key} | ${conf.doc.trim} |
${conf.defaultValueString} |\n".getBytes)
+ }
+ }
}
- }
+ case _ =>
}
}
w.close()
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index f8395a691..233261091 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -239,7 +239,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
/**
* Mapping of Spark expression class to Comet expression handler.
*/
- private val exprSerdeMap: Map[Class[_ <: Expression],
CometExpressionSerde[_]] =
+ val exprSerdeMap: Map[Class[_ <: Expression], CometExpressionSerde[_]] =
mathExpressions ++ hashExpressions ++ stringExpressions ++
conditionalExpressions ++ mapExpressions ++ predicateExpressions ++
structExpressions ++ bitwiseExpressions ++ miscExpressions ++
arrayExpressions ++
@@ -248,7 +248,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
/**
* Mapping of Spark aggregate expression class to Comet expression handler.
*/
- private val aggrSerdeMap: Map[Class[_], CometAggregateExpressionSerde[_]] =
Map(
+ val aggrSerdeMap: Map[Class[_], CometAggregateExpressionSerde[_]] = Map(
classOf[Average] -> CometAverage,
classOf[BitAndAgg] -> CometBitAndAgg,
classOf[BitOrAgg] -> CometBitOrAgg,
@@ -578,9 +578,16 @@ object QueryPlanSerde extends Logging with CometExprShim {
val cometExpr = aggrSerdeMap.get(fn.getClass)
cometExpr match {
case Some(handler) =>
- handler
- .asInstanceOf[CometAggregateExpressionSerde[AggregateFunction]]
- .convert(aggExpr, fn, inputs, binding, conf)
+ val aggHandler =
handler.asInstanceOf[CometAggregateExpressionSerde[AggregateFunction]]
+ val exprConfName = aggHandler.getExprConfigName(fn)
+ if (!CometConf.isExprEnabled(exprConfName)) {
+ withInfo(
+ aggExpr,
+ "Expression support is disabled. Set " +
+ s"${CometConf.getExprEnabledConfigKey(exprConfName)}=true to
enable it.")
+ return None
+ }
+ aggHandler.convert(aggExpr, fn, inputs, binding, conf)
case _ =>
withInfo(
aggExpr,
@@ -1832,6 +1839,17 @@ trait CometExpressionSerde[T <: Expression] {
*/
trait CometAggregateExpressionSerde[T <: AggregateFunction] {
+ /**
+ * Get a short name for the expression that can be used as part of a config
key related to the
+ * expression, such as enabling or disabling that expression.
+ *
+ * @param expr
+ * The Spark expression.
+ * @return
+ * Short name for the expression, defaulting to the Spark class name
+ */
+ def getExprConfigName(expr: T): String = expr.getClass.getSimpleName
+
/**
* Convert a Spark expression into a protocol buffer representation that can
be passed into
* native code.
diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
index d6e129a3c..4b8a74c15 100644
--- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
@@ -538,32 +538,23 @@ trait CometStddev {
binding: Boolean,
conf: SQLConf): Option[ExprOuterClass.AggExpr] = {
val child = stddev.child
- if (CometConf.COMET_EXPR_STDDEV_ENABLED.get(conf)) {
- val childExpr = exprToProto(child, inputs, binding)
- val dataType = serializeDataType(stddev.dataType)
-
- if (childExpr.isDefined && dataType.isDefined) {
- val builder = ExprOuterClass.Stddev.newBuilder()
- builder.setChild(childExpr.get)
- builder.setNullOnDivideByZero(nullOnDivideByZero)
- builder.setDatatype(dataType.get)
- builder.setStatsTypeValue(statsType)
-
- Some(
- ExprOuterClass.AggExpr
- .newBuilder()
- .setStddev(builder)
- .build())
- } else {
- withInfo(aggExpr, child)
- None
- }
+ val childExpr = exprToProto(child, inputs, binding)
+ val dataType = serializeDataType(stddev.dataType)
+
+ if (childExpr.isDefined && dataType.isDefined) {
+ val builder = ExprOuterClass.Stddev.newBuilder()
+ builder.setChild(childExpr.get)
+ builder.setNullOnDivideByZero(nullOnDivideByZero)
+ builder.setDatatype(dataType.get)
+ builder.setStatsTypeValue(statsType)
+
+ Some(
+ ExprOuterClass.AggExpr
+ .newBuilder()
+ .setStddev(builder)
+ .build())
} else {
- withInfo(
- aggExpr,
- "stddev disabled by default because it can be slower than Spark. " +
- s"Set ${CometConf.COMET_EXPR_STDDEV_ENABLED}=true to enable it.",
- child)
+ withInfo(aggExpr, child)
None
}
}
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index 6574d9568..d0b1dfb36 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -61,9 +61,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("stddev_pop should return NaN for some cases") {
- withSQLConf(
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_EXPR_STDDEV_ENABLED.key -> "true") {
+ withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
Seq(true, false).foreach { nullOnDivideByZero =>
withSQLConf("spark.sql.legacy.statisticalAggregate" ->
nullOnDivideByZero.toString) {
@@ -1446,9 +1444,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("stddev_pop and stddev_samp") {
- withSQLConf(
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_EXPR_STDDEV_ENABLED.key -> "true") {
+ withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
Seq("native", "jvm").foreach { cometShuffleMode =>
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) {
Seq(true, false).foreach { dictionary =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]