This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d2f8185699 [GLUTEN-10761] Add iceberg write metrics (#10908)
d2f8185699 is described below

commit d2f8185699c35ab1f97d3369e46af522fb25f374
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Oct 21 18:38:41 2025 +0800

    [GLUTEN-10761] Add iceberg write metrics (#10908)
---
 .../backendsapi/clickhouse/CHMetricsApi.scala      |  4 ++
 .../write/IcebergColumnarBatchDataWriter.scala     |  5 ++
 .../gluten/execution/IcebergWriteJniWrapper.java   |  5 +-
 .../execution/enhanced/VeloxIcebergSuite.scala     | 14 +++++
 .../apache/gluten/metrics/BatchWriteMetrics.java   | 59 ++++++++++++++++++++++
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala |  8 +++
 cpp/velox/compute/iceberg/IcebergWriter.cc         | 13 ++++-
 cpp/velox/compute/iceberg/IcebergWriter.h          | 17 +++++++
 cpp/velox/jni/VeloxJniWrapper.cc                   | 26 ++++++++++
 .../write/ColumnarBatchDataWriterFactory.java      |  1 +
 .../org/apache/gluten/backendsapi/MetricsApi.scala |  2 +
 .../execution/ColumnarV2TableWriteExec.scala       | 11 +++-
 .../v2/ColumnarWriteToDataSourceV2Exec.scala       |  2 +
 13 files changed, 164 insertions(+), 3 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index c8037d8c9e..a437665e00 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -472,4 +472,8 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
   def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): 
MetricsUpdater = {
     new WriteFilesMetricsUpdater(metrics)
   }
+
+  override def genBatchWriteMetrics(sparkContext: SparkContext): Map[String, 
SQLMetric] = {
+    throw new UnsupportedOperationException("BatchWrite is not supported in CH 
backend")
+  }
 }
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
index 068cdf6cd4..c4f202d84b 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
@@ -21,6 +21,7 @@ import org.apache.gluten.columnarbatch.ColumnarBatches
 import org.apache.gluten.execution.IcebergWriteJniWrapper
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.metric.CustomTaskMetric
 import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
@@ -81,4 +82,8 @@ case class IcebergColumnarBatchDataWriter(
       case _ => throw new UnsupportedOperationException()
     }
   }
+
+  override def currentMetricsValues(): Array[CustomTaskMetric] = {
+    jniWrapper.metrics(writer).toCustomTaskMetrics
+  }
 }
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
index a9192952cf..54ae50e578 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.execution;
 
+import org.apache.gluten.metrics.BatchWriteMetrics;
 import org.apache.gluten.runtime.Runtime;
 import org.apache.gluten.runtime.RuntimeAware;
 
@@ -33,11 +34,13 @@ public class IcebergWriteJniWrapper implements RuntimeAware 
{
                           byte[] partitionSpec,
                           byte[] field);
 
-  // Returns the json iceberg Datafile represent
   public native void write(long writerHandle, long batch);
 
+  // Returns the json iceberg Datafile represent
   public native String[] commit(long writerHandle);
 
+  public native BatchWriteMetrics metrics(long writerHandle);
+
   @Override
   public long rtHandle() {
     return runtime.getHandle();
diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index 6702c77885..5ff593a3ba 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -283,4 +283,18 @@ class VeloxIcebergSuite extends IcebergSuite {
       }
     }
   }
+
+  test("iceberg write metrics") {
+    withTable("iceberg_tbl") {
+      spark.sql("create table if not exists iceberg_tbl (id int) using 
iceberg".stripMargin)
+      val df = spark.sql("insert into iceberg_tbl values 1")
+      val metrics =
+        
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan.metrics
+      val statusStore = spark.sharedState.statusStore
+      val lastExecId = statusStore.executionsList().last.executionId
+      val executionMetrics = statusStore.executionMetrics(lastExecId)
+
+      assert(executionMetrics(metrics("numWrittenFiles").id).toLong == 1)
+    }
+  }
 }
diff --git 
a/backends-velox/src/main/java/org/apache/gluten/metrics/BatchWriteMetrics.java 
b/backends-velox/src/main/java/org/apache/gluten/metrics/BatchWriteMetrics.java
new file mode 100644
index 0000000000..dcf841dfd3
--- /dev/null
+++ 
b/backends-velox/src/main/java/org/apache/gluten/metrics/BatchWriteMetrics.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+import java.util.ArrayList;
+
+public class BatchWriteMetrics {
+  private final long numWrittenBytes;
+  private final int numWrittenFiles;
+  private final long writeIOTimeNs;
+  private final long writeWallNs;
+
+  public BatchWriteMetrics(
+      long numWrittenBytes, int numWrittenFiles, long writeIOTimeNs, long 
writeWallNs) {
+    this.numWrittenBytes = numWrittenBytes;
+    this.numWrittenFiles = numWrittenFiles;
+    this.writeIOTimeNs = writeIOTimeNs;
+    this.writeWallNs = writeWallNs;
+  }
+
+  public CustomTaskMetric[] toCustomTaskMetrics() {
+    ArrayList<CustomTaskMetric> customTaskMetrics = new ArrayList<>();
+    customTaskMetrics.add(customTaskMetric("numWrittenBytes", 
numWrittenBytes));
+    customTaskMetrics.add(customTaskMetric("numWrittenFiles", 
numWrittenFiles));
+    customTaskMetrics.add(customTaskMetric("writeIOTimeNs", writeIOTimeNs));
+    customTaskMetrics.add(customTaskMetric("writeWallNs", writeWallNs));
+    return customTaskMetrics.toArray(new CustomTaskMetric[0]);
+  }
+
+  private CustomTaskMetric customTaskMetric(String name, long value) {
+    return new CustomTaskMetric() {
+      @Override
+      public String name() {
+        return name;
+      }
+
+      @Override
+      public long value() {
+        return value;
+      }
+    };
+  }
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 57ccd3446e..03579e307b 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -460,6 +460,14 @@ class VeloxMetricsApi extends MetricsApi with Logging {
   def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): 
MetricsUpdater =
     new WriteFilesMetricsUpdater(metrics)
 
+  def genBatchWriteMetrics(sparkContext: SparkContext): Map[String, SQLMetric] 
=
+    Map(
+      "numWrittenBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number 
of written bytes"),
+      "writeIOTimeNs" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time 
of write IO"),
+      "writeWallNs" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time 
of write"),
+      "numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files")
+    )
+
   override def genSortTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
       "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.cc 
b/cpp/velox/compute/iceberg/IcebergWriter.cc
index 11718c8901..96a8561eca 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.cc
+++ b/cpp/velox/compute/iceberg/IcebergWriter.cc
@@ -100,7 +100,7 @@ IcebergWriter::IcebergWriter(
     const std::unordered_map<std::string, std::string>& sparkConfs,
     std::shared_ptr<facebook::velox::memory::MemoryPool> memoryPool,
     std::shared_ptr<facebook::velox::memory::MemoryPool> connectorPool)
-    : rowType_(rowType), field_(convertToIcebergNestedField(field)), 
pool_(memoryPool), connectorPool_(connectorPool) {
+    : rowType_(rowType), field_(convertToIcebergNestedField(field)), 
pool_(memoryPool), connectorPool_(connectorPool), 
createTimeNs_(getCurrentTimeNano()) {
   auto veloxCfg =
       
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>(sparkConfs));
   connectorSessionProperties_ = 
std::make_shared<facebook::velox::config::ConfigBase>(
@@ -142,6 +142,17 @@ std::vector<std::string> IcebergWriter::commit() {
   return dataSink_->close();
 }
 
+WriteStats IcebergWriter::writeStats() const {
+  const auto currentTimeNs = getCurrentTimeNano();
+  VELOX_CHECK_GE(currentTimeNs, createTimeNs_);
+  const auto sinkStats = dataSink_->stats();
+  return WriteStats(
+    sinkStats.numWrittenBytes,
+    sinkStats.numWrittenFiles,
+    sinkStats.writeIOTimeUs * 1000,
+    currentTimeNs - createTimeNs_);
+}
+
 std::shared_ptr<const iceberg::IcebergPartitionSpec>
 parseIcebergPartitionSpec(const uint8_t* data, const int32_t length, 
RowTypePtr rowType) {
   gluten::IcebergPartitionSpec protoSpec;
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.h 
b/cpp/velox/compute/iceberg/IcebergWriter.h
index 3de99ce0f5..f587195070 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.h
+++ b/cpp/velox/compute/iceberg/IcebergWriter.h
@@ -19,11 +19,23 @@
 
 #include "IcebergNestedField.pb.h"
 #include "memory/VeloxColumnarBatch.h"
+#include "utils/Metrics.h"
 #include "velox/connectors/hive/iceberg/IcebergColumnHandle.h"
 #include "velox/connectors/hive/iceberg/IcebergDataSink.h"
 
 namespace gluten {
 
+struct WriteStats {
+  uint64_t numWrittenBytes{0};
+  uint32_t numWrittenFiles{0};
+  uint64_t writeIOTimeNs{0};
+  uint64_t writeWallNs{0};
+
+  bool empty() const;
+
+  std::string toString() const;
+};
+
 class IcebergWriter {
  public:
   IcebergWriter(
@@ -41,6 +53,8 @@ class IcebergWriter {
 
   std::vector<std::string> commit();
 
+  WriteStats writeStats() const;
+
  private:
   facebook::velox::RowTypePtr rowType_;
   const facebook::velox::connector::hive::iceberg::IcebergNestedField field_;
@@ -52,6 +66,9 @@ class IcebergWriter {
   std::unique_ptr<facebook::velox::connector::ConnectorQueryCtx> 
connectorQueryCtx_;
 
   std::unique_ptr<facebook::velox::connector::hive::iceberg::IcebergDataSink> 
dataSink_;
+
+  // Records the writer creation time in ns.
+  const uint64_t createTimeNs_{0};
 };
 
 std::shared_ptr<const 
facebook::velox::connector::hive::iceberg::IcebergPartitionSpec>
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 0052880143..1ce34d6f95 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -56,6 +56,9 @@ jmethodID infoClsInitMethod;
 
 jclass blockStripesClass;
 jmethodID blockStripesConstructor;
+
+jclass batchWriteMetricsClass;
+jmethodID batchWriteMetricsConstructor;
 } // namespace
 
 #ifdef __cplusplus
@@ -80,6 +83,10 @@ jint JNI_OnLoad(JavaVM* vm, void*) {
       createGlobalClassReferenceOrError(env, 
"Lorg/apache/spark/sql/execution/datasources/BlockStripes;");
   blockStripesConstructor = getMethodIdOrError(env, blockStripesClass, 
"<init>", "(J[J[II[[B)V");
 
+  batchWriteMetricsClass =
+    createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/metrics/BatchWriteMetrics;");
+  batchWriteMetricsConstructor = getMethodIdOrError(env, 
batchWriteMetricsClass, "<init>", "(JIJJ)V");
+
   DLOG(INFO) << "Loaded Velox backend.";
 
   return jniVersion;
@@ -811,6 +818,25 @@ JNIEXPORT jobjectArray JNICALL 
Java_org_apache_gluten_execution_IcebergWriteJniW
 
   JNI_METHOD_END(nullptr)
 }
+
+JNIEXPORT jobject JNICALL 
Java_org_apache_gluten_execution_IcebergWriteJniWrapper_metrics( // NOLINT
+    JNIEnv* env,
+    jobject wrapper,
+    jlong writerHandle) {
+  JNI_METHOD_START
+  auto writer = ObjectStore::retrieve<IcebergWriter>(writerHandle);
+  auto writeStats = writer->writeStats();
+  jobject writeMetrics = env->NewObject(
+    batchWriteMetricsClass,
+    batchWriteMetricsConstructor,
+    writeStats.numWrittenBytes,
+    writeStats.numWrittenFiles,
+    writeStats.writeIOTimeNs,
+    writeStats.writeWallNs);
+  return writeMetrics;
+
+  JNI_METHOD_END(nullptr)
+}
 #endif
 
 #ifdef __cplusplus
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
 
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
index 16c6248a31..4254c5baff 100644
--- 
a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
@@ -18,6 +18,7 @@ package org.apache.gluten.connector.write;
 
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.connector.write.DataWriter;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 import java.io.Serializable;
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
index 0bd4144cd1..896979b100 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
@@ -104,6 +104,8 @@ trait MetricsApi extends Serializable {
 
   def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): 
MetricsUpdater
 
+  def genBatchWriteMetrics(sparkContext: SparkContext): Map[String, SQLMetric]
+
   def genSortTransformerMetrics(sparkContext: SparkContext): Map[String, 
SQLMetric]
 
   def genSortTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): 
MetricsUpdater
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
index 0479a2d36d..8c960e70dd 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.connector.write.{BatchWrite, 
WriterCommitMessage}
 import org.apache.spark.sql.datasources.v2.{DataWritingColumnarBatchSparkTask, 
DataWritingColumnarBatchSparkTaskResult, StreamWriterCommitProgressUtil, 
WritingColumnarBatchSparkTask}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.v2._
-import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.LongAccumulator
@@ -111,6 +111,15 @@ trait ColumnarV2TableWriteExec extends 
V2ExistingTableWriteExec with Validatable
         logError(s"Data source write support $batchWrite aborted.")
         throw cause
     }
+  }
 
+  override val customMetrics: Map[String, SQLMetric] = {
+    write
+      .supportedCustomMetrics()
+      .map {
+        customMetric =>
+          customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, 
customMetric)
+      }
+      .toMap ++ 
BackendsApiManager.getMetricsApiInstance.genBatchWriteMetrics(sparkContext)
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
index c4f2b98713..ef432aa119 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
@@ -63,6 +63,8 @@ trait WritingColumnarBatchSparkTask[W <: 
DataWriter[ColumnarBatch]]
       logInfo(s"Writer for partition ${context.partitionId()} is committing.")
 
       val msg = dataWriter.commit()
+      // Native write's metrics should be updated again after commit.
+      CustomMetrics.updateMetrics(dataWriter.currentMetricsValues, 
customMetrics)
 
       logInfo(
         s"Committed partition $partId (task $taskId, attempt $attemptId, " +


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to