This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new dddf086a31 [GLUTEN-10118][VL] Support writing task statistics to the
event log (#10119)
dddf086a31 is described below
commit dddf086a31c37334773ee62a4dab3feca33d05fa
Author: Rong Ma <[email protected]>
AuthorDate: Tue Jul 29 12:28:43 2025 +0100
[GLUTEN-10118][VL] Support writing task statistics to the event log (#10119)
---
.../java/org/apache/gluten/metrics/Metrics.java | 6 ++-
.../org/apache/gluten/config/VeloxConfig.scala | 9 ++++
.../org/apache/gluten/metrics/MetricsUtil.scala | 15 +++++-
.../spark/metrics/TaskStatsAccumulator.scala | 60 ++++++++++++++++++++++
cpp/core/jni/JniWrapper.cc | 5 +-
cpp/core/utils/Metrics.h | 3 ++
cpp/velox/compute/WholeStageResultIterator.cc | 10 ++++
cpp/velox/config/VeloxConfig.h | 4 ++
docs/velox-configuration.md | 1 +
9 files changed, 108 insertions(+), 5 deletions(-)
diff --git
a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
index fb9745ae90..8d5c92b0cd 100644
--- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
+++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
@@ -59,6 +59,8 @@ public class Metrics implements IMetrics {
public SingleMetric singleMetric = new SingleMetric();
+ public String taskStats;
+
/** Create an instance for native metrics. */
public Metrics(
long[] inputRows,
@@ -97,7 +99,8 @@ public class Metrics implements IMetrics {
long[] preloadSplits,
long[] physicalWrittenBytes,
long[] writeIOTime,
- long[] numWrittenFiles) {
+ long[] numWrittenFiles,
+ String taskStats) {
this.inputRows = inputRows;
this.inputVectors = inputVectors;
this.inputBytes = inputBytes;
@@ -135,6 +138,7 @@ public class Metrics implements IMetrics {
this.physicalWrittenBytes = physicalWrittenBytes;
this.writeIOTime = writeIOTime;
this.numWrittenFiles = numWrittenFiles;
+ this.taskStats = taskStats;
}
public OperatorMetrics getOperatorMetrics(int index) {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index e29410aa4a..d4d0d0af32 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -348,6 +348,15 @@ object VeloxConfig {
.booleanConf
.createWithDefault(false)
+ val COLUMNAR_VELOX_TASK_METRICS_TO_EVENT_LOG_THRESHOLD =
+
buildConf("spark.gluten.sql.columnar.backend.velox.taskMetricsToEventLog.threshold")
+ .internal()
+ .doc("Sets the threshold in seconds for writing task statistics to the
event log if the " +
+ "task runs longer than this value. Configuring the value >=0 can
enable the feature. " +
+ "0 means all tasks report and save the metrics to eventlog. value <0
disable the feature.")
+ .timeConf(TimeUnit.SECONDS)
+ .createOptional
+
val COLUMNAR_VELOX_MEMORY_USE_HUGE_PAGES =
buildConf("spark.gluten.sql.columnar.backend.velox.memoryUseHugePages")
.internal()
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index a7abc93621..d9e269850b 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.execution._
import org.apache.gluten.substrait.{AggregationParams, JoinParams}
import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.TaskStatsAccumulator
import org.apache.spark.sql.execution.SparkPlan
import java.lang.{Long => JLong}
@@ -65,6 +66,9 @@ object MetricsUtil extends Logging {
}
}
+ val accumulator = new TaskStatsAccumulator()
+ child.session.sparkContext.register(accumulator, "velox task stats")
+
val mut: MetricsUpdaterTree = treeifyMetricsUpdaters(child)
genMetricsUpdatingFunction(
@@ -72,7 +76,8 @@ object MetricsUtil extends Logging {
relMap,
JLong.valueOf(relMap.size() - 1),
joinParamsMap,
- aggParamsMap)
+ aggParamsMap,
+ accumulator)
}
/**
@@ -316,7 +321,8 @@ object MetricsUtil extends Logging {
relMap: JMap[JLong, JList[JLong]],
operatorIdx: JLong,
joinParamsMap: JMap[JLong, JoinParams],
- aggParamsMap: JMap[JLong, AggregationParams]): IMetrics => Unit = {
+ aggParamsMap: JMap[JLong, AggregationParams],
+ taskStatsAccumulator: TaskStatsAccumulator): IMetrics => Unit = {
imetrics =>
try {
val metrics = imetrics.asInstanceOf[Metrics]
@@ -332,6 +338,11 @@ object MetricsUtil extends Logging {
numNativeMetrics - 1,
joinParamsMap,
aggParamsMap)
+
+ // Update the task stats accumulator with the metrics.
+ if (metrics.taskStats != null) {
+ taskStatsAccumulator.add(metrics.taskStats)
+ }
}
} catch {
case e: Exception =>
diff --git
a/backends-velox/src/main/scala/org/apache/spark/metrics/TaskStatsAccumulator.scala
b/backends-velox/src/main/scala/org/apache/spark/metrics/TaskStatsAccumulator.scala
new file mode 100644
index 0000000000..6fb29f6ca1
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/spark/metrics/TaskStatsAccumulator.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import org.apache.spark.scheduler.AccumulableInfo
+import org.apache.spark.util.AccumulatorV2
+
+class TaskStatsAccumulator extends AccumulatorV2[String, String] {
+ private var stats: String = ""
+
+ override def isZero: Boolean = stats.isEmpty
+
+ override def copy(): AccumulatorV2[String, String] = {
+ val newAcc = new TaskStatsAccumulator()
+ newAcc.stats = this.stats
+ newAcc
+ }
+
+ override def reset(): Unit = {
+ stats = ""
+ }
+
+ override def add(v: String): Unit = {
+ stats = v
+ }
+
+ override def merge(other: AccumulatorV2[String, String]): Unit = {
+ other match {
+ case o: TaskStatsAccumulator =>
+ // Overwrite stats. Can be empty if no stats were collected.
+ stats = o.stats
+ case _ =>
+ throw new IllegalArgumentException("Cannot merge with
non-VeloxTaskStatsAccumulator")
+ }
+ }
+
+ override def value: String = stats
+
+ override def toInfo(update: Option[Any], value: Option[Any]):
AccumulableInfo = {
+ // If `update` is None, it means the `toInfo` method was called from stage
completion, and we
+ // don't send the stats to the stage metrics.
+ val v = update.map(_ => stats)
+ // `update` field is always empty. `value` field shows the stats of the
current task.
+ AccumulableInfo(id, name, None, v, internal = false, countFailedValues =
false)
+ }
+}
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 5cca5dd2ac..523c4a7494 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -223,7 +223,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
env,
metricsBuilderClass,
"<init>",
-
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
+
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
@@ -548,7 +548,8 @@ JNIEXPORT jobject JNICALL
Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
longArray[Metrics::kPreloadSplits],
longArray[Metrics::kPhysicalWrittenBytes],
longArray[Metrics::kWriteIOTime],
- longArray[Metrics::kNumWrittenFiles]);
+ longArray[Metrics::kNumWrittenFiles],
+ metrics && metrics->stats.has_value() ?
env->NewStringUTF(metrics->stats->c_str()) : nullptr);
JNI_METHOD_END(nullptr)
}
diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h
index 761f5fbe79..cb48a137e7 100644
--- a/cpp/core/utils/Metrics.h
+++ b/cpp/core/utils/Metrics.h
@@ -31,6 +31,9 @@ struct Metrics {
// Point to array.get() after the above unique_ptr created.
long* arrayRawPtr = nullptr;
+ // Optional stats string.
+ std::optional<std::string> stats = std::nullopt;
+
enum TYPE {
// Begin from 0.
kBegin = 0,
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 9cd4233bd5..136879c266 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -344,6 +344,7 @@ void WholeStageResultIterator::collectMetrics() {
return;
}
+ // Save and print the plan with stats if debug mode is enabled or
showTaskMetricsWhenFinished is true.
if (veloxCfg_->get<bool>(kDebugModeEnabled, false) ||
veloxCfg_->get<bool>(kShowTaskMetricsWhenFinished,
kShowTaskMetricsWhenFinishedDefault)) {
auto planWithStats = velox::exec::printPlanWithStats(*veloxPlan_.get(),
taskStats, true);
@@ -445,6 +446,15 @@ void WholeStageResultIterator::collectMetrics() {
metricIndex += 1;
}
}
+
+ // Populate the metrics with task stats for long running tasks.
+ if (const int64_t collectTaskStatsThreshold =
+ veloxCfg_->get<int64_t>(kTaskMetricsToEventLogThreshold,
kTaskMetricsToEventLogThresholdDefault);
+ collectTaskStatsThreshold >= 0 &&
+ static_cast<int64_t>(taskStats.terminationTimeMs -
taskStats.executionStartTimeMs) > collectTaskStatsThreshold) {
+ auto jsonStats = velox::exec::toPlanStatsJson(taskStats);
+ metrics_->stats = folly::toJson(jsonStats);
+ }
}
int64_t WholeStageResultIterator::runtimeMetric(
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index 38752a8c14..627ce9a857 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -69,6 +69,10 @@ const std::string kVeloxSplitPreloadPerDriver =
"spark.gluten.sql.columnar.backe
const std::string kShowTaskMetricsWhenFinished =
"spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished";
const bool kShowTaskMetricsWhenFinishedDefault = false;
+const std::string kTaskMetricsToEventLogThreshold =
+ "spark.gluten.sql.columnar.backend.velox.taskMetricsToEventLog.threshold";
+const int64_t kTaskMetricsToEventLogThresholdDefault = -1;
+
const std::string kEnableUserExceptionStacktrace =
"spark.gluten.sql.columnar.backend.velox.enableUserExceptionStacktrace";
const bool kEnableUserExceptionStacktraceDefault = true;
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index 9b636aa5a4..b7712c00a9 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -72,6 +72,7 @@ nav_order: 16
| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled
| false | If true, checksum read verification from SSD is
enabled.
[...]
| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow
| false | True if copy on write should be disabled.
[...]
| spark.gluten.sql.columnar.backend.velox.ssdODirect
| false | The O_DIRECT flag for cache writing
[...]
+| spark.gluten.sql.columnar.backend.velox.taskMetricsToEventLog.threshold
| <undefined> | Sets the threshold in seconds for writing task
statistics to the event log if the task runs longer than this value.
Configuring the value >=0 can enable the feature. 0 means all tasks report and
save the metrics to eventlog. value <0 disable the feature.
[...]
| spark.gluten.sql.columnar.backend.velox.window.type
| streaming | Velox backend supports both SortWindow and
StreamingWindow operators. The StreamingWindow operator skips the sorting step
in the input but does not support spill. On the other hand, the SortWindow
operator is responsible for sorting the input data within the Window operator
and also supports spill.
[...]
| spark.gluten.velox.awsSdkLogLevel
| FATAL | Log granularity of AWS C++ SDK in velox.
[...]
| spark.gluten.velox.castFromVarcharAddTrimNode
| false | If true, will add a trim node which has the same
sementic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing.
[...]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]