This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d2f8185699 [GLUTEN-10761] Add iceberg write metrics (#10908)
d2f8185699 is described below
commit d2f8185699c35ab1f97d3369e46af522fb25f374
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Oct 21 18:38:41 2025 +0800
[GLUTEN-10761] Add iceberg write metrics (#10908)
---
.../backendsapi/clickhouse/CHMetricsApi.scala | 4 ++
.../write/IcebergColumnarBatchDataWriter.scala | 5 ++
.../gluten/execution/IcebergWriteJniWrapper.java | 5 +-
.../execution/enhanced/VeloxIcebergSuite.scala | 14 +++++
.../apache/gluten/metrics/BatchWriteMetrics.java | 59 ++++++++++++++++++++++
.../gluten/backendsapi/velox/VeloxMetricsApi.scala | 8 +++
cpp/velox/compute/iceberg/IcebergWriter.cc | 13 ++++-
cpp/velox/compute/iceberg/IcebergWriter.h | 17 +++++++
cpp/velox/jni/VeloxJniWrapper.cc | 26 ++++++++++
.../write/ColumnarBatchDataWriterFactory.java | 1 +
.../org/apache/gluten/backendsapi/MetricsApi.scala | 2 +
.../execution/ColumnarV2TableWriteExec.scala | 11 +++-
.../v2/ColumnarWriteToDataSourceV2Exec.scala | 2 +
13 files changed, 164 insertions(+), 3 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index c8037d8c9e..a437665e00 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -472,4 +472,8 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]):
MetricsUpdater = {
new WriteFilesMetricsUpdater(metrics)
}
+
+ override def genBatchWriteMetrics(sparkContext: SparkContext): Map[String,
SQLMetric] = {
+ throw new UnsupportedOperationException("BatchWrite is not supported in CH
backend")
+ }
}
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
index 068cdf6cd4..c4f202d84b 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
@@ -21,6 +21,7 @@ import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.execution.IcebergWriteJniWrapper
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.metric.CustomTaskMetric
import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -81,4 +82,8 @@ case class IcebergColumnarBatchDataWriter(
case _ => throw new UnsupportedOperationException()
}
}
+
+ override def currentMetricsValues(): Array[CustomTaskMetric] = {
+ jniWrapper.metrics(writer).toCustomTaskMetrics
+ }
}
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
index a9192952cf..54ae50e578 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution;
+import org.apache.gluten.metrics.BatchWriteMetrics;
import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.RuntimeAware;
@@ -33,11 +34,13 @@ public class IcebergWriteJniWrapper implements RuntimeAware
{
byte[] partitionSpec,
byte[] field);
- // Returns the json iceberg Datafile represent
public native void write(long writerHandle, long batch);
+ // Returns the json iceberg Datafile represent
public native String[] commit(long writerHandle);
+ public native BatchWriteMetrics metrics(long writerHandle);
+
@Override
public long rtHandle() {
return runtime.getHandle();
diff --git
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index 6702c77885..5ff593a3ba 100644
---
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -283,4 +283,18 @@ class VeloxIcebergSuite extends IcebergSuite {
}
}
}
+
+ test("iceberg write metrics") {
+ withTable("iceberg_tbl") {
+ spark.sql("create table if not exists iceberg_tbl (id int) using
iceberg".stripMargin)
+ val df = spark.sql("insert into iceberg_tbl values 1")
+ val metrics =
+
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan.metrics
+ val statusStore = spark.sharedState.statusStore
+ val lastExecId = statusStore.executionsList().last.executionId
+ val executionMetrics = statusStore.executionMetrics(lastExecId)
+
+ assert(executionMetrics(metrics("numWrittenFiles").id).toLong == 1)
+ }
+ }
}
diff --git
a/backends-velox/src/main/java/org/apache/gluten/metrics/BatchWriteMetrics.java
b/backends-velox/src/main/java/org/apache/gluten/metrics/BatchWriteMetrics.java
new file mode 100644
index 0000000000..dcf841dfd3
--- /dev/null
+++
b/backends-velox/src/main/java/org/apache/gluten/metrics/BatchWriteMetrics.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+import java.util.ArrayList;
+
+public class BatchWriteMetrics {
+ private final long numWrittenBytes;
+ private final int numWrittenFiles;
+ private final long writeIOTimeNs;
+ private final long writeWallNs;
+
+ public BatchWriteMetrics(
+ long numWrittenBytes, int numWrittenFiles, long writeIOTimeNs, long
writeWallNs) {
+ this.numWrittenBytes = numWrittenBytes;
+ this.numWrittenFiles = numWrittenFiles;
+ this.writeIOTimeNs = writeIOTimeNs;
+ this.writeWallNs = writeWallNs;
+ }
+
+ public CustomTaskMetric[] toCustomTaskMetrics() {
+ ArrayList<CustomTaskMetric> customTaskMetrics = new ArrayList<>();
+ customTaskMetrics.add(customTaskMetric("numWrittenBytes",
numWrittenBytes));
+ customTaskMetrics.add(customTaskMetric("numWrittenFiles",
numWrittenFiles));
+ customTaskMetrics.add(customTaskMetric("writeIOTimeNs", writeIOTimeNs));
+ customTaskMetrics.add(customTaskMetric("writeWallNs", writeWallNs));
+ return customTaskMetrics.toArray(new CustomTaskMetric[0]);
+ }
+
+ private CustomTaskMetric customTaskMetric(String name, long value) {
+ return new CustomTaskMetric() {
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public long value() {
+ return value;
+ }
+ };
+ }
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 57ccd3446e..03579e307b 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -460,6 +460,14 @@ class VeloxMetricsApi extends MetricsApi with Logging {
def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]):
MetricsUpdater =
new WriteFilesMetricsUpdater(metrics)
+ def genBatchWriteMetrics(sparkContext: SparkContext): Map[String, SQLMetric]
=
+ Map(
+ "numWrittenBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number
of written bytes"),
+ "writeIOTimeNs" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time
of write IO"),
+ "writeWallNs" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time
of write"),
+ "numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of
written files")
+ )
+
override def genSortTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.cc
b/cpp/velox/compute/iceberg/IcebergWriter.cc
index 11718c8901..96a8561eca 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.cc
+++ b/cpp/velox/compute/iceberg/IcebergWriter.cc
@@ -100,7 +100,7 @@ IcebergWriter::IcebergWriter(
const std::unordered_map<std::string, std::string>& sparkConfs,
std::shared_ptr<facebook::velox::memory::MemoryPool> memoryPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> connectorPool)
- : rowType_(rowType), field_(convertToIcebergNestedField(field)),
pool_(memoryPool), connectorPool_(connectorPool) {
+ : rowType_(rowType), field_(convertToIcebergNestedField(field)),
pool_(memoryPool), connectorPool_(connectorPool),
createTimeNs_(getCurrentTimeNano()) {
auto veloxCfg =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>(sparkConfs));
connectorSessionProperties_ =
std::make_shared<facebook::velox::config::ConfigBase>(
@@ -142,6 +142,17 @@ std::vector<std::string> IcebergWriter::commit() {
return dataSink_->close();
}
+WriteStats IcebergWriter::writeStats() const {
+ const auto currentTimeNs = getCurrentTimeNano();
+ VELOX_CHECK_GE(currentTimeNs, createTimeNs_);
+ const auto sinkStats = dataSink_->stats();
+ return WriteStats(
+ sinkStats.numWrittenBytes,
+ sinkStats.numWrittenFiles,
+ sinkStats.writeIOTimeUs * 1000,
+ currentTimeNs - createTimeNs_);
+}
+
std::shared_ptr<const iceberg::IcebergPartitionSpec>
parseIcebergPartitionSpec(const uint8_t* data, const int32_t length,
RowTypePtr rowType) {
gluten::IcebergPartitionSpec protoSpec;
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.h
b/cpp/velox/compute/iceberg/IcebergWriter.h
index 3de99ce0f5..f587195070 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.h
+++ b/cpp/velox/compute/iceberg/IcebergWriter.h
@@ -19,11 +19,23 @@
#include "IcebergNestedField.pb.h"
#include "memory/VeloxColumnarBatch.h"
+#include "utils/Metrics.h"
#include "velox/connectors/hive/iceberg/IcebergColumnHandle.h"
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
namespace gluten {
+struct WriteStats {
+ uint64_t numWrittenBytes{0};
+ uint32_t numWrittenFiles{0};
+ uint64_t writeIOTimeNs{0};
+ uint64_t writeWallNs{0};
+
+ bool empty() const;
+
+ std::string toString() const;
+};
+
class IcebergWriter {
public:
IcebergWriter(
@@ -41,6 +53,8 @@ class IcebergWriter {
std::vector<std::string> commit();
+ WriteStats writeStats() const;
+
private:
facebook::velox::RowTypePtr rowType_;
const facebook::velox::connector::hive::iceberg::IcebergNestedField field_;
@@ -52,6 +66,9 @@ class IcebergWriter {
std::unique_ptr<facebook::velox::connector::ConnectorQueryCtx>
connectorQueryCtx_;
std::unique_ptr<facebook::velox::connector::hive::iceberg::IcebergDataSink>
dataSink_;
+
+ // Records the writer creation time in ns.
+ const uint64_t createTimeNs_{0};
};
std::shared_ptr<const
facebook::velox::connector::hive::iceberg::IcebergPartitionSpec>
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 0052880143..1ce34d6f95 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -56,6 +56,9 @@ jmethodID infoClsInitMethod;
jclass blockStripesClass;
jmethodID blockStripesConstructor;
+
+jclass batchWriteMetricsClass;
+jmethodID batchWriteMetricsConstructor;
} // namespace
#ifdef __cplusplus
@@ -80,6 +83,10 @@ jint JNI_OnLoad(JavaVM* vm, void*) {
createGlobalClassReferenceOrError(env,
"Lorg/apache/spark/sql/execution/datasources/BlockStripes;");
blockStripesConstructor = getMethodIdOrError(env, blockStripesClass,
"<init>", "(J[J[II[[B)V");
+ batchWriteMetricsClass =
+ createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/metrics/BatchWriteMetrics;");
+ batchWriteMetricsConstructor = getMethodIdOrError(env,
batchWriteMetricsClass, "<init>", "(JIJJ)V");
+
DLOG(INFO) << "Loaded Velox backend.";
return jniVersion;
@@ -811,6 +818,25 @@ JNIEXPORT jobjectArray JNICALL
Java_org_apache_gluten_execution_IcebergWriteJniW
JNI_METHOD_END(nullptr)
}
+
+JNIEXPORT jobject JNICALL
Java_org_apache_gluten_execution_IcebergWriteJniWrapper_metrics( // NOLINT
+ JNIEnv* env,
+ jobject wrapper,
+ jlong writerHandle) {
+ JNI_METHOD_START
+ auto writer = ObjectStore::retrieve<IcebergWriter>(writerHandle);
+ auto writeStats = writer->writeStats();
+ jobject writeMetrics = env->NewObject(
+ batchWriteMetricsClass,
+ batchWriteMetricsConstructor,
+ writeStats.numWrittenBytes,
+ writeStats.numWrittenFiles,
+ writeStats.writeIOTimeNs,
+ writeStats.writeWallNs);
+ return writeMetrics;
+
+ JNI_METHOD_END(nullptr)
+}
#endif
#ifdef __cplusplus
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
index 16c6248a31..4254c5baff 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
@@ -18,6 +18,7 @@ package org.apache.gluten.connector.write;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.write.DataWriter;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import java.io.Serializable;
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
index 0bd4144cd1..896979b100 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
@@ -104,6 +104,8 @@ trait MetricsApi extends Serializable {
def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]):
MetricsUpdater
+ def genBatchWriteMetrics(sparkContext: SparkContext): Map[String, SQLMetric]
+
def genSortTransformerMetrics(sparkContext: SparkContext): Map[String,
SQLMetric]
def genSortTransformerMetricsUpdater(metrics: Map[String, SQLMetric]):
MetricsUpdater
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
index 0479a2d36d..8c960e70dd 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.connector.write.{BatchWrite,
WriterCommitMessage}
import org.apache.spark.sql.datasources.v2.{DataWritingColumnarBatchSparkTask,
DataWritingColumnarBatchSparkTaskResult, StreamWriterCommitProgressUtil,
WritingColumnarBatchSparkTask}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2._
-import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.LongAccumulator
@@ -111,6 +111,15 @@ trait ColumnarV2TableWriteExec extends
V2ExistingTableWriteExec with Validatable
logError(s"Data source write support $batchWrite aborted.")
throw cause
}
+ }
+ override val customMetrics: Map[String, SQLMetric] = {
+ write
+ .supportedCustomMetrics()
+ .map {
+ customMetric =>
+ customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext,
customMetric)
+ }
+ .toMap ++
BackendsApiManager.getMetricsApiInstance.genBatchWriteMetrics(sparkContext)
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
index c4f2b98713..ef432aa119 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
@@ -63,6 +63,8 @@ trait WritingColumnarBatchSparkTask[W <:
DataWriter[ColumnarBatch]]
logInfo(s"Writer for partition ${context.partitionId()} is committing.")
val msg = dataWriter.commit()
+ // Native write's metrics should be updated again after commit.
+ CustomMetrics.updateMetrics(dataWriter.currentMetricsValues,
customMetrics)
logInfo(
s"Committed partition $partId (task $taskId, attempt $attemptId, " +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]