This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new eaee24cba chore: Improve framework for specifying that configs can be
set with env vars (#2722)
eaee24cba is described below
commit eaee24cba176f74b9f3e528f0b451c7b88129593
Author: Andy Grove <[email protected]>
AuthorDate: Fri Nov 7 14:48:20 2025 -0700
chore: Improve framework for specifying that configs can be set with env
vars (#2722)
---
.../main/scala/org/apache/comet/CometConf.scala | 58 +++++++++++++++++-----
docs/source/user-guide/latest/configs.md | 8 +--
.../main/scala/org/apache/comet/GenerateDocs.scala | 13 +++--
3 files changed, 60 insertions(+), 19 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index d48d14972..4760cbbfc 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -84,10 +84,9 @@ object CometConf extends ShimCometConf {
.doc(
"Whether to enable Comet extension for Spark. When this is turned on,
Spark will use " +
"Comet to read Parquet data source. Note that to enable native
vectorized execution, " +
- "both this config and `spark.comet.exec.enabled` need to be enabled.
By default, this " +
- "config is the value of the env var `ENABLE_COMET` if set, or true
otherwise.")
+ "both this config and `spark.comet.exec.enabled` need to be enabled.")
.booleanConf
- .createWithDefault(sys.env.getOrElse("ENABLE_COMET", "true").toBoolean)
+ .createWithEnvVarOrDefault("ENABLE_COMET", true)
val COMET_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.enabled")
.category(CATEGORY_SCAN)
@@ -119,9 +118,7 @@ object CometConf extends ShimCometConf {
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(
Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION,
SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
- .createWithDefault(sys.env
- .getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_AUTO)
- .toLowerCase(Locale.ROOT))
+ .createWithEnvVarOrDefault("COMET_PARQUET_SCAN_IMPL", SCAN_AUTO)
val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =
conf("spark.comet.parquet.respectFilterPushdown")
@@ -493,8 +490,7 @@ object CometConf extends ShimCometConf {
.category(CATEGORY_EXEC_EXPLAIN)
.doc("When this setting is enabled, Comet will log warnings for all
fallback reasons.")
.booleanConf
- .createWithDefault(
- sys.env.getOrElse("ENABLE_COMET_LOG_FALLBACK_REASONS",
"false").toBoolean)
+ .createWithEnvVarOrDefault("ENABLE_COMET_LOG_FALLBACK_REASONS", false)
val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explainFallback.enabled")
@@ -524,7 +520,7 @@ object CometConf extends ShimCometConf {
.category(CATEGORY_TESTING)
.doc("Whether to allow Comet to run in on-heap mode. Required for
running Spark SQL tests.")
.booleanConf
- .createWithDefault(sys.env.getOrElse("ENABLE_COMET_ONHEAP",
"false").toBoolean)
+ .createWithEnvVarOrDefault("ENABLE_COMET_ONHEAP", false)
val COMET_OFFHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] =
conf("spark.comet.exec.memoryPool")
@@ -705,9 +701,9 @@ object CometConf extends ShimCometConf {
val COMET_STRICT_TESTING: ConfigEntry[Boolean] =
conf(s"$COMET_PREFIX.testing.strict")
.category(CATEGORY_TESTING)
.doc("Experimental option to enable strict testing, which will fail tests
that could be " +
- "more comprehensive, such as checking for a specific fallback reason")
+ "more comprehensive, such as checking for a specific fallback reason.")
.booleanConf
- .createWithDefault(sys.env.getOrElse("ENABLE_COMET_STRICT_TESTING",
"false").toBoolean)
+ .createWithEnvVarOrDefault("ENABLE_COMET_STRICT_TESTING", false)
/** Create a config to enable a specific operator */
private def createExecEnabledConfig(
@@ -865,6 +861,36 @@ private class TypedConfigBuilder[T](
CometConf.register(conf)
conf
}
+
+ /**
+ * Creates a [[ConfigEntry]] that has a default value, with support for
environment variable
+ * override.
+ *
+ * The value is resolved in the following priority order:
+ * 1. Spark config value (if set) 2. Environment variable value (if set)
3. Default value
+ *
+ * @param envVar
+ * The environment variable name to check for override value
+ * @param default
+ * The default value to use if neither config nor env var is set
+ * @return
+ * A ConfigEntry with environment variable support
+ */
+ def createWithEnvVarOrDefault(envVar: String, default: T): ConfigEntry[T] = {
+ val transformedDefault = converter(sys.env.getOrElse(envVar,
stringConverter(default)))
+ val conf = new ConfigEntryWithDefault[T](
+ parent.key,
+ transformedDefault,
+ converter,
+ stringConverter,
+ parent._doc,
+ parent._category,
+ parent._public,
+ parent._version,
+ Some(envVar))
+ CometConf.register(conf)
+ conf
+ }
}
private[comet] abstract class ConfigEntry[T](
@@ -892,6 +918,11 @@ private[comet] abstract class ConfigEntry[T](
def defaultValueString: String
+ /**
+ * The environment variable name that can override this config's default
value, if applicable.
+ */
+ def envVar: Option[String] = None
+
override def toString: String = {
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, " +
s"public=$isPublic, version=$version)"
@@ -906,12 +937,15 @@ private[comet] class ConfigEntryWithDefault[T](
doc: String,
category: String,
isPublic: Boolean,
- version: String)
+ version: String,
+ _envVar: Option[String] = None)
extends ConfigEntry(key, valueConverter, stringConverter, doc, category,
isPublic, version) {
override def defaultValue: Option[T] = Some(_defaultValue)
override def defaultValueString: String = stringConverter(_defaultValue)
+ override def envVar: Option[String] = _envVar
+
def get(conf: SQLConf): T = {
val tmp = conf.getConfString(key, null)
if (tmp == null) {
diff --git a/docs/source/user-guide/latest/configs.md
b/docs/source/user-guide/latest/configs.md
index 6caaa53b1..66e1795cb 100644
--- a/docs/source/user-guide/latest/configs.md
+++ b/docs/source/user-guide/latest/configs.md
@@ -63,7 +63,7 @@ Comet provides the following configuration settings.
| `spark.comet.caseConversion.enabled` | Java uses locale-specific rules when
converting strings to upper or lower case and Rust does not, so we disable
upper and lower by default. | false |
| `spark.comet.debug.enabled` | Whether to enable debug mode for Comet. When
enabled, Comet will do additional checks for debugging purpose. For example,
validating array when importing arrays from JVM at native side. Note that these
checks may be expensive in performance and should only be enabled for debugging
purpose. | false |
| `spark.comet.dppFallback.enabled` | Whether to fall back to Spark for
queries that use DPP. | true |
-| `spark.comet.enabled` | Whether to enable Comet extension for Spark. When
this is turned on, Spark will use Comet to read Parquet data source. Note that
to enable native vectorized execution, both this config and
`spark.comet.exec.enabled` need to be enabled. By default, this config is the
value of the env var `ENABLE_COMET` if set, or true otherwise. | true |
+| `spark.comet.enabled` | Whether to enable Comet extension for Spark. When
this is turned on, Spark will use Comet to read Parquet data source. Note that
to enable native vectorized execution, both this config and
`spark.comet.exec.enabled` need to be enabled. Can be overridden by environment
variable `ENABLE_COMET`. | true |
| `spark.comet.exceptionOnDatetimeRebase` | Whether to throw exception when
seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar.
Since Spark 3, dates/timestamps were written according to the Proleptic
Gregorian calendar. When this is true, Comet will throw exceptions when seeing
these dates/timestamps that were written by Spark version before 3.0. If this
is false, these dates/timestamps will be read as if they were written to the
Proleptic Gregorian calendar and [...]
| `spark.comet.exec.enabled` | Whether to enable Comet native vectorized
execution for Spark. This controls whether Spark should convert operators into
their Comet counterparts and execute them in native space. Note: each operator
is associated with a separate config in the format of
`spark.comet.exec.<operator_name>.enabled` at the moment, and both the config
and this need to be turned on, in order for the operator to be executed in
native. | true |
| `spark.comet.exec.replaceSortMergeJoin` | Experimental feature to force
Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance.
This feature is not stable yet. For more information, refer to the [Comet
Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). |
false |
@@ -86,7 +86,7 @@ These settings can be used to determine which parts of the
plan are accelerated
| `spark.comet.explain.rules` | When this setting is enabled, Comet will log
all plan transformations performed in physical optimizer rules. Default: false
| false |
| `spark.comet.explain.verbose.enabled` | When this setting is enabled,
Comet's extended explain output will provide the full query plan annotated with
fallback reasons as well as a summary of how much of the plan was accelerated
by Comet. When this setting is disabled, a list of fallback reasons will be
provided instead. | false |
| `spark.comet.explainFallback.enabled` | When this setting is enabled, Comet
will provide logging explaining the reason(s) why a query stage cannot be
executed natively. Set this to false to reduce the amount of logging. | false |
-| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled,
Comet will log warnings for all fallback reasons. | false |
+| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled,
Comet will log warnings for all fallback reasons. Can be overridden by
environment variable `ENABLE_COMET_LOG_FALLBACK_REASONS`. | false |
<!--END:CONFIG_TABLE-->
## Shuffle Configuration Settings
@@ -127,10 +127,10 @@ These settings can be used to determine which parts of
the plan are accelerated
| Config | Description | Default Value |
|--------|-------------|---------------|
| `spark.comet.columnar.shuffle.memory.factor` | Fraction of Comet memory to
be allocated per executor process for columnar shuffle when running in on-heap
mode. For more information, refer to the [Comet Tuning
Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 |
-| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap
mode. Required for running Spark SQL tests. | false |
+| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap
mode. Required for running Spark SQL tests. Can be overridden by environment
variable `ENABLE_COMET_ONHEAP`. | false |
| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used
for Comet native execution when running Spark in on-heap mode. Available pool
types are `greedy`, `fair_spill`, `greedy_task_shared`,
`fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and
`unbounded`. | greedy_task_shared |
| `spark.comet.memoryOverhead` | The amount of additional memory to be
allocated per executor process for Comet, in MiB, when running Spark in on-heap
mode. | 1024 MiB |
-| `spark.comet.testing.strict` | Experimental option to enable strict testing,
which will fail tests that could be more comprehensive, such as checking for a
specific fallback reason | false |
+| `spark.comet.testing.strict` | Experimental option to enable strict testing,
which will fail tests that could be more comprehensive, such as checking for a
specific fallback reason. Can be overridden by environment variable
`ENABLE_COMET_STRICT_TESTING`. | false |
<!--END:CONFIG_TABLE-->
## Enabling or Disabling Individual Operators
diff --git a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala
b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala
index 4c2d65e1c..8abd09241 100644
--- a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala
+++ b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala
@@ -74,15 +74,22 @@ object GenerateDocs {
// convert links to Markdown
val doc =
urlPattern.replaceAllIn(conf.doc.trim, m => s"[Comet
${m.group(1)} Guide](")
+ // append env var info if present
+ val docWithEnvVar = conf.envVar match {
+ case Some(envVarName) =>
+ s"$doc Can be overridden by environment variable
`$envVarName`."
+ case None => doc
+ }
if (conf.defaultValue.isEmpty) {
- w.write(s"| `${conf.key}` | $doc | |\n".getBytes)
+ w.write(s"| `${conf.key}` | $docWithEnvVar | |\n".getBytes)
} else {
val isBytesConf = conf.key ==
COMET_ONHEAP_MEMORY_OVERHEAD.key
if (isBytesConf) {
val bytes = conf.defaultValue.get.asInstanceOf[Long]
- w.write(s"| `${conf.key}` | $doc | $bytes MiB
|\n".getBytes)
+ w.write(s"| `${conf.key}` | $docWithEnvVar | $bytes MiB
|\n".getBytes)
} else {
- w.write(s"| `${conf.key}` | $doc |
${conf.defaultValueString} |\n".getBytes)
+ val defaultVal = conf.defaultValueString
+ w.write(s"| `${conf.key}` | $docWithEnvVar | $defaultVal
|\n".getBytes)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]