This is an automated email from the ASF dual-hosted git repository.

shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b66b60201 [GLUTEN-9525][FLINK]Support tps metric for source task 
(#10023)
8b66b60201 is described below

commit 8b66b60201c80c4e1c90d6ec121722dc7b8fb919
Author: kevinyhzou <[email protected]>
AuthorDate: Sat Oct 11 09:19:46 2025 +0800

    [GLUTEN-9525][FLINK]Support tps metric for source task (#10023)
    
    * support tps metric for source task
---
 .../table/runtime/metrics/SourceTaskMetrics.java   | 63 ++++++++++++++++++++++
 .../operators/GlutenVectorSourceFunction.java      |  4 ++
 2 files changed, 67 insertions(+)

diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java
new file mode 100644
index 0000000000..5bbbb028b6
--- /dev/null
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.table.runtime.metrics;
+
+import io.github.zhztheplayer.velox4j.query.SerialTask;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class SourceTaskMetrics {
+
+  private final String keyOperatorType = "operatorType";
+  private final String sourceOperatorName = "TableScan";
+  private final String keyInputRows = "rawInputRows";
+  private final String keyInputBytes = "rawInputBytes";
+  private final long metricUpdateInterval = 2000;
+  private Counter sourceNumRecordsOut;
+  private Counter sourceNumBytesOut;
+  private long lastUpdateTime = System.currentTimeMillis();
+
+  public SourceTaskMetrics(OperatorMetricGroup metricGroup) {
+    sourceNumRecordsOut = 
metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+    sourceNumBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+  }
+
+  public boolean updateMetrics(SerialTask task, String planId) {
+    long currentTime = System.currentTimeMillis();
+    if (currentTime - lastUpdateTime < metricUpdateInterval) {
+      return false;
+    }
+    try {
+      ObjectNode planStats = task.collectStats().planStats(planId);
+      JsonNode jsonNode = planStats.get(keyOperatorType);
+      if (jsonNode.asText().equals(sourceOperatorName)) {
+        long numRecordsOut = planStats.get(keyInputRows).asInt();
+        long numBytesOut = planStats.get(keyInputBytes).asInt();
+        sourceNumRecordsOut.inc(numRecordsOut - 
sourceNumRecordsOut.getCount());
+        sourceNumBytesOut.inc(numBytesOut - sourceNumBytesOut.getCount());
+      }
+    } catch (Exception e) {
+      return false;
+    }
+    lastUpdateTime = currentTime;
+    return true;
+  }
+}
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
index 399211ec1b..472bd0bfed 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
@@ -17,6 +17,7 @@
 package org.apache.gluten.table.runtime.operators;
 
 import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
+import org.apache.gluten.table.runtime.metrics.SourceTaskMetrics;
 
 import io.github.zhztheplayer.velox4j.Velox4j;
 import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
@@ -66,6 +67,7 @@ public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<State
   private BufferAllocator allocator;
   private MemoryManager memoryManager;
   private SerialTask task;
+  private SourceTaskMetrics taskMetrics;
 
   public GlutenVectorSourceFunction(
       StatefulPlanNode planNode,
@@ -108,6 +110,7 @@ public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<State
       task.addSplit(id, split);
       task.noMoreSplits(id);
     }
+    taskMetrics = new SourceTaskMetrics(getRuntimeContext().getMetricGroup());
   }
 
   @Override
@@ -128,6 +131,7 @@ public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<State
         LOG.info("Velox task finished");
         break;
       }
+      taskMetrics.updateMetrics(task, id);
     }
 
     task.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to