This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3e54b82fb [feature][engine] add ST-Engine metrics (#3621)
3e54b82fb is described below

commit 3e54b82fb5cffdba517df173700a695e4ac4c89d
Author: ic4y <[email protected]>
AuthorDate: Fri Dec 2 22:31:52 2022 +0800

    [feature][engine] add ST-Engine metrics (#3621)
    
    * [feature][engine] add engine metrics
---
 LICENSE                                            |   2 +
 .../seatunnel/api/common/metrics/JobMetrics.java   | 159 +++++++++++++
 .../seatunnel/api/common/metrics/Measurement.java  | 143 ++++++++++++
 .../api/common/metrics/MeasurementPredicates.java  |  68 ++++++
 .../seatunnel/api/common/metrics/Metric.java       |  63 +++++
 .../seatunnel/api/common/metrics/MetricNames.java  |  33 +++
 .../seatunnel/api/common/metrics/MetricTags.java   |  44 ++++
 .../api/common/metrics/RawJobMetrics.java          |  75 ++++++
 .../apache/seatunnel/api/common/metrics/Unit.java  |  33 +++
 .../seatunnel/engine/client/SeaTunnelClient.java   |   8 +
 .../engine/client/SeaTunnelClientTest.java         |  69 ++++--
 .../apache/seatunnel/engine/common/Constant.java   |   2 +
 .../codec/SeaTunnelGetJobMetricsCodec.java         |  88 +++++++
 .../SeaTunnelEngine.yaml                           |  20 ++
 .../engine/server/CoordinatorService.java          |  36 ++-
 .../seatunnel/engine/server/SeaTunnelServer.java   |   1 +
 .../engine/server/TaskExecutionService.java        |  81 ++++++-
 .../seatunnel/engine/server/execution/Task.java    |  13 +-
 .../engine/server/master/JobHistoryService.java    |  24 +-
 .../seatunnel/engine/server/master/JobMaster.java  |  47 ++++
 .../engine/server/metrics/JobMetricsCollector.java |  80 +++++++
 .../engine/server/metrics/JobMetricsUtil.java      | 104 +++++++++
 .../seatunnel/engine/server/metrics/Metrics.java   |  55 +++++
 .../engine/server/metrics/MetricsContext.java      | 257 +++++++++++++++++++++
 .../engine/server/metrics/MetricsImpl.java         |  72 ++++++
 .../engine/server/operation/AsyncOperation.java    |  13 +-
 .../operation/CleanTaskGroupContextOperation.java  |  41 ++++
 .../server/operation/GetJobMetricsOperation.java   |  85 +++++++
 .../operation/GetTaskGroupMetricsOperation.java    |  61 +++++
 .../server/protocol/task/GetJobMetricsTask.java    |  50 ++++
 .../task/SeaTunnelMessageTaskFactoryProvider.java  |   3 +
 .../serializable/OperationDataSerializerHook.java  |   5 +
 .../server/task/SeaTunnelSourceCollector.java      |   7 +
 .../engine/server/task/SeaTunnelTask.java          |  20 ++
 .../engine/server/task/flow/SinkFlowLifeCycle.java |   6 +
 .../engine/server/master/JobMetricsTest.java       |  97 ++++++++
 .../resources/fake_to_console_job_metrics.conf     |  54 +++++
 37 files changed, 1982 insertions(+), 37 deletions(-)

diff --git a/LICENSE b/LICENSE
index 6d2243fa1..3c505f2fe 100644
--- a/LICENSE
+++ b/LICENSE
@@ -233,3 +233,5 @@ 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engi
 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
                   from https://github.com/apache/flink
 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/InternalCheckpointListener.java
            from https://github.com/apache/flink
 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
     from https://github.com/apache/flink
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics
                                           from 
https://github.com/hazelcast/hazelcast
+seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics            
                                                             from 
https://github.com/hazelcast/hazelcast
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
new file mode 100644
index 000000000..7c52013a6
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import static java.util.stream.Collectors.groupingBy;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+public final class JobMetrics implements Serializable {
+
+    private static final JobMetrics EMPTY = new 
JobMetrics(Collections.emptyMap());
+
+    private static final Collector<Measurement, ?, Map<String, 
List<Measurement>>> COLLECTOR =
+        Collectors.groupingBy(Measurement::metric);
+
+    private Map<String, List<Measurement>> metrics; //metric name -> set of 
measurements
+
+    JobMetrics() { //needed for deserialization
+    }
+
+    private JobMetrics(Map<String, List<Measurement>> metrics) {
+        this.metrics = new HashMap<>(metrics);
+    }
+
+    /**
+     * Returns an empty {@link JobMetrics} object.
+     */
+
+    public static JobMetrics empty() {
+        return EMPTY;
+    }
+
+    /**
+     * Builds a {@link JobMetrics} object based on a map of
+     * {@link Measurement}s.
+     */
+
+    public static JobMetrics of(Map<String, List<Measurement>> metrics) {
+        return new JobMetrics(metrics);
+    }
+
+    /**
+     * Returns all metrics present.
+     */
+
+    public Set<String> metrics() {
+        return Collections.unmodifiableSet(metrics.keySet());
+    }
+
+    /**
+     * Returns all {@link Measurement}s associated with a given metric name.
+     * <p>
+     * For a list of job-specific metric names please see {@link MetricNames}.
+     */
+    public List<Measurement> get(String metricName) {
+        Objects.requireNonNull(metricName);
+        List<Measurement> measurements = metrics.get(metricName);
+        return measurements == null ? Collections.emptyList() : measurements;
+    }
+
+    public JobMetrics filter(String tagName, String tagValue) {
+        return filter(MeasurementPredicates.tagValueEquals(tagName, tagValue));
+    }
+
+    public JobMetrics filter(Predicate<Measurement> predicate) {
+        Objects.requireNonNull(predicate, "predicate");
+
+        Map<String, List<Measurement>> filteredMetrics =
+            metrics.values().stream()
+                   .flatMap(List::stream)
+                   .filter(predicate)
+                   .collect(COLLECTOR);
+        return new JobMetrics(filteredMetrics);
+    }
+
+    @Override
+    public int hashCode() {
+        return metrics.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+
+        if (obj == this) {
+            return true;
+        }
+
+        return Objects.equals(metrics, ((JobMetrics) obj).metrics);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        metrics.entrySet().stream()
+            .sorted(Comparator.comparing(Entry::getKey))
+            .forEach(mainEntry -> {
+                sb.append(mainEntry.getKey()).append(":\n");
+                mainEntry.getValue().stream()
+                    .collect(groupingBy(m -> {
+                        String vertex = m.tag(MetricTags.TASK_NAME);
+                        return vertex == null ? "" : vertex;
+                    }))
+                    .entrySet().stream()
+                    .sorted(Comparator.comparing(Entry::getKey))
+                    .forEach(e -> {
+                        String vertexName = e.getKey();
+                        sb.append("  ").append(vertexName).append(":\n");
+                        e.getValue().forEach(m -> sb.append("    
").append(m).append("\n"));
+                    });
+            });
+        return sb.toString();
+    }
+
+    public String toJsonString() {
+        ObjectMapper objectMapper = new ObjectMapper();
+        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, 
false);
+        try {
+            return objectMapper.writeValueAsString(this.metrics);
+        } catch (JsonProcessingException e) {
+            ObjectNode objectNode = objectMapper.createObjectNode();
+            objectNode.put("err", "serialize JobMetrics err");
+            return objectNode.toString();
+        }
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
new file mode 100644
index 000000000..6ea645955
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Immutable data class containing information about one metric measurement,
+ * consisting of:
+ * <ul>
+ * <li>metric value</li>
+ * <li>metric timestamp, generated when the metric was gathered</li>
+ * <li>metric descriptor (set of tag name-value pairs) </li>
+ * </ul>
+ * <p>
+ * A metrics descriptor can be thought of as a set of attributes associated
+ * with a particular metric, metric which in turn is defined by its name
+ * (for a full list of metric names provided see {@link MetricNames}).
+ * The attributes are specified as tags that have names and values (for a
+ * full list of tag names see {@link MetricTags}). An example
+ * descriptor would have a collection of tags/attributes like this:
+ * {@code job=jobId, pipeline=pipelineId,
+ * unit=count, metric=SourceReceivedCount, ...}
+ *
+ */
+@Data
+public final class Measurement implements Serializable {
+
+    private Map<String, String> tags; //tag name -> tag value
+    private String metric;
+    private Object value;
+    private long timestamp;
+
+    Measurement() {
+    }
+
+    private Measurement(String metric, Object value, long timestamp, 
Map<String, String> tags) {
+        this.metric = metric;
+        this.value = value;
+        this.timestamp = timestamp;
+        this.tags = new HashMap<>(tags);
+    }
+
+    /**
+     * Builds a {@link Measurement} instance based on timestamp, value and
+     * the metric descriptor in map form.
+     */
+    public static Measurement of(
+        String metric, Object value, long timestamp, Map<String, String> tags
+    ) {
+        Objects.requireNonNull(tags, "metric");
+        Objects.requireNonNull(tags, "tags");
+        return new Measurement(metric, value, timestamp, tags);
+    }
+
+    /**
+     * Returns the value associated with this {@link Measurement}.
+     */
+    public Object value() {
+        return value;
+    }
+
+    /**
+     * Returns the timestamps associated with this {@link Measurement}, the
+     * moment when the value was gathered.
+     */
+    public long timestamp() {
+        return timestamp;
+    }
+
+    /**
+     * Returns the name of the metric. For a list of different metrics
+     * see {@link MetricNames}.
+     */
+
+    public String metric() {
+        return metric;
+    }
+
+    /**
+     * Returns the value associated with a specific tag, based on the metric
+     * description of this particular {@link Measurement}. For a list of
+     * possible tag names see {@link MetricTags}.
+     */
+
+    public String tag(String name) {
+        return tags.get(name);
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 * (int) (timestamp * 31 + value.hashCode()) + 
Objects.hashCode(tags);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        final Measurement that;
+        return this == obj || obj instanceof Measurement
+                && this.timestamp == (that = (Measurement) obj).timestamp
+                && this.value == that.value
+                && Objects.equals(this.tags, that.tags);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(String.format("%s %s", metric, value))
+                .append(" ")
+                .append(timestamp)
+                .append(" [");
+
+        String tags = this.tags.entrySet().stream()
+                .sorted(Comparator.comparing(Map.Entry::getKey))
+                .map(e -> e.getKey() + "=" + e.getValue())
+                .collect(Collectors.joining(", "));
+        sb.append(tags).append(']');
+
+        return sb.toString();
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MeasurementPredicates.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MeasurementPredicates.java
new file mode 100644
index 000000000..7f565ba6b
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MeasurementPredicates.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * Static utility class for creating various {@link Measurement} filtering
+ * predicates.
+ *
+ */
+public final class MeasurementPredicates {
+
+    private MeasurementPredicates() { }
+
+    /**
+     * Matches a {@link Measurement} which contain the specified tag.
+     *
+     * @param tag the tag of interest
+     * @return a filtering predicate
+     */
+    public static Predicate<Measurement> containsTag(String tag) {
+        return measurement -> measurement.tag(tag) != null;
+    }
+
+    /**
+     * Matches a {@link Measurement} which contains the specified tag and
+     * the tag has the specified value.
+     *
+     * @param tag   the tag to match
+     * @param value the value the tag has to have
+     * @return a filtering predicate
+     */
+    public static Predicate<Measurement> tagValueEquals(String tag, String 
value) {
+        return measurement -> value.equals(measurement.tag(tag));
+    }
+
+    /**
+     * Matches a {@link Measurement} which has this exact tag with a value
+     * matching the provided regular expression.
+     *
+     * @param tag         the tag to match
+     * @param valueRegexp regular expression to match the value against
+     * @return a filtering predicate
+     */
+    public static Predicate<Measurement> tagValueMatches(String tag, String 
valueRegexp) {
+        return measurement -> {
+            String value = measurement.tag(tag);
+            return value != null && 
Pattern.compile(valueRegexp).matcher(value).matches();
+        };
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
new file mode 100644
index 000000000..3e8c3e7c4
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Metric.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+public interface Metric {
+
+    /**
+     * Returns the name of the associated metric.
+     */
+
+    String name();
+
+    /**
+     * Return the measurement unit for the associated metric. Meant
+     * to provide further information on the type of value measured
+     * by the user-defined metric. Doesn't affect the functionality of the
+     * metric, it still remains a simple numeric value, but is used to
+     * populate the {@link MetricTags#UNIT} tag in the metric's description.
+     */
+
+    Unit unit();
+
+    /**
+     * Increments the current value by 1.
+     */
+    void increment();
+
+    /**
+     * Increments the current value by the specified amount.
+     */
+    void increment(long amount);
+
+    /**
+     * Decrements the current value by 1.
+     */
+    void decrement();
+
+    /**
+     * Decrements the current value by the specified amount.
+     */
+    void decrement(long amount);
+
+    /**
+     * Sets the current value.
+     */
+    void set(long newValue);
+
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
new file mode 100644
index 000000000..a3511d92b
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+public final class MetricNames {
+
+    private MetricNames() {}
+
+    public static final String RECEIVED_COUNT = "receivedCount";
+
+    public static final String RECEIVED_BATCHES = "receivedBatches";
+
+    public static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount";
+
+    public static final String SOURCE_RECEIVED_QPS = "SourceReceivedQPS";
+    public static final String SINK_WRITE_COUNT = "SinkWriteCount";
+    public static final String SINK_WRITE_QPS = "SinkWriteQPS";
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricTags.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricTags.java
new file mode 100644
index 000000000..8a3207a33
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricTags.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+public final class MetricTags {
+
+    private MetricTags() {
+    }
+
+    public static final String MEMBER = "member";
+
+    public static final String ADDRESS = "address";
+
+    public static final String JOB_ID = "jobId";
+
+    public static final String PIPELINE_ID = "pipelineId";
+
+    public static final String TASK_GROUP_ID = "taskGroupId";
+
+    public static final String TASK_ID = "taskID";
+
+    public static final String UNIT = "unit";
+
+    public static final String TASK_NAME = "taskName";
+
+    public static final String SERVICE = "service";
+
+    public static final String TASK_GROUP_LOCATION = "taskGroupLocation";
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/RawJobMetrics.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/RawJobMetrics.java
new file mode 100644
index 000000000..ddbdd89fa
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/RawJobMetrics.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+import java.util.Arrays;
+
+public final class RawJobMetrics {
+
+    private long timestamp;
+    private byte[] blob;
+
+    RawJobMetrics() {
+    }
+
+    private RawJobMetrics(long timestamp, byte[] blob) {
+        this.timestamp = timestamp;
+        this.blob = blob;
+    }
+
+    public static RawJobMetrics empty() {
+        return of(null);
+    }
+
+    public static RawJobMetrics of(byte[] blob) {
+        return new RawJobMetrics(System.currentTimeMillis(), blob);
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public byte[] getBlob() {
+        return blob;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) timestamp * 31 + Arrays.hashCode(blob);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+
+        if (obj == this) {
+            return true;
+        }
+
+        RawJobMetrics that;
+        return Arrays.equals(blob, (that = (RawJobMetrics) obj).blob)
+                && this.timestamp == that.timestamp;
+    }
+
+    @Override
+    public String toString() {
+        return Arrays.toString(blob) + " @ " + timestamp;
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Unit.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Unit.java
new file mode 100644
index 000000000..70217741b
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Unit.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common.metrics;
+
+public enum Unit {
+    /** Size, counter, represented in bytes */
+    BYTES,
+    /** Timestamp or duration represented in ms */
+    MS,
+    /** An integer in range 0..100 */
+    PERCENT,
+    /** Number of items: size, counter... */
+    COUNT,
+    /** 0 or 1 */
+    BOOLEAN,
+    /** 0..n, ordinal of an enum */
+    ENUM,
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 07c7de46d..9f8a9420b 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.client;
 import org.apache.seatunnel.engine.client.job.JobClient;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStateCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
@@ -80,4 +81,11 @@ public class SeaTunnelClient implements 
SeaTunnelClientInstance {
             SeaTunnelListJobStatusCodec::decodeResponse
         );
     }
+
+    public String getJobMetrics(Long jobId) {
+        return hazelcastClient.requestOnMasterAndDecodeResponse(
+            SeaTunnelGetJobMetricsCodec.encodeRequest(jobId),
+            SeaTunnelGetJobMetricsCodec::decodeResponse
+        );
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index f8c118cfe..9be38e537 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.engine.client;
 
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
 import static org.awaitility.Awaitility.await;
 
 import org.apache.seatunnel.common.config.Common;
@@ -33,8 +37,10 @@ import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
@@ -49,6 +55,7 @@ public class SeaTunnelClientTest {
 
     private static SeaTunnelConfig SEATUNNEL_CONFIG = 
ConfigProvider.locateAndGetSeaTunnelConfig();
     private static HazelcastInstance INSTANCE;
+    private static SeaTunnelClient CLIENT;
 
     @BeforeAll
     public static void beforeClass() throws Exception {
@@ -58,14 +65,17 @@ public class SeaTunnelClientTest {
             new 
SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
     }
 
-    @Test
-    public void testSayHello() {
+    @BeforeEach
+    void setUp() {
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         
clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+        CLIENT = new SeaTunnelClient(clientConfig);
+    }
 
+    @Test
+    public void testSayHello() {
         String msg = "Hello world";
-        String s = engineClient.printMessageToMaster(msg);
+        String s = CLIENT.printMessageToMaster(msg);
         Assertions.assertEquals(msg, s);
     }
 
@@ -76,10 +86,7 @@ public class SeaTunnelClientTest {
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName("fake_to_file");
 
-        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
-        
clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        JobExecutionEnvironment jobExecutionEnv = 
engineClient.createExecutionContext(filePath, jobConfig);
+        JobExecutionEnvironment jobExecutionEnv = 
CLIENT.createExecutionContext(filePath, jobConfig);
 
         try {
             final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
@@ -103,10 +110,7 @@ public class SeaTunnelClientTest {
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName("fake_to_console");
 
-        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
-        
clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        JobExecutionEnvironment jobExecutionEnv = 
engineClient.createExecutionContext(filePath, jobConfig);
+        JobExecutionEnvironment jobExecutionEnv = 
CLIENT.createExecutionContext(filePath, jobConfig);
 
         try {
             final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
@@ -117,17 +121,54 @@ public class SeaTunnelClientTest {
 
             await().atMost(30000, TimeUnit.MILLISECONDS)
                 .untilAsserted(() -> Assertions.assertTrue(
-                    engineClient.getJobState(jobId).contains("RUNNING") && 
engineClient.listJobStatus().contains("RUNNING")));
+                    CLIENT.getJobState(jobId).contains("RUNNING") && 
CLIENT.listJobStatus().contains("RUNNING")));
 
             await().atMost(30000, TimeUnit.MILLISECONDS)
                 .untilAsserted(() -> Assertions.assertTrue(
-                    engineClient.getJobState(jobId).contains("FINISHED") && 
engineClient.listJobStatus().contains("FINISHED")));
+                    CLIENT.getJobState(jobId).contains("FINISHED") && 
CLIENT.listJobStatus().contains("FINISHED")));
 
         } catch (ExecutionException | InterruptedException e) {
             throw new RuntimeException(e);
         }
     }
 
+    @Test
+    public void testGetJobMetrics() {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = TestUtils.getResource("/client_test.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("fake_to_console");
+
+        JobExecutionEnvironment jobExecutionEnv = 
CLIENT.createExecutionContext(filePath, jobConfig);
+
+        try {
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            CompletableFuture<JobStatus> objectCompletableFuture = 
CompletableFuture.supplyAsync(() -> {
+                return clientJobProxy.waitForJobComplete();
+            });
+            long jobId = clientJobProxy.getJobId();
+
+            await().atMost(30000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assertions.assertTrue(
+                    CLIENT.getJobState(jobId).contains("FINISHED") && 
CLIENT.listJobStatus().contains("FINISHED")));
+
+            String jobMetrics = CLIENT.getJobMetrics(jobId);
+
+            Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT));
+            Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS));
+            Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT));
+            Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_QPS));
+
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @AfterEach
+    void tearDown() {
+        CLIENT.close();
+    }
+
     @AfterAll
     public static void after() {
         INSTANCE.shutdown();
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index a7acbe876..5d00c5d0b 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -46,6 +46,8 @@ public class Constant {
 
     public static final String IMAP_FINISHED_JOB_STATE = "finishedJobState";
 
+    public static final String IMAP_FINISHED_JOB_METRICS = 
"finishedJobMetrics";
+
     public static final String IMAP_STATE_TIMESTAMPS = "stateTimestamps";
 
     public static final String IMAP_OWNED_SLOT_PROFILES = 
"ownedSlotProfilesIMap";
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobMetricsCodec.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobMetricsCodec.java
new file mode 100644
index 000000000..201d0f343
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobMetricsCodec.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+import com.hazelcast.client.impl.protocol.codec.builtin.*;
+
+import static com.hazelcast.client.impl.protocol.ClientMessage.*;
+import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.*;
+
+/*
+ * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
+ * To change this file, edit the templates or the protocol
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * and regenerate it.
+ */
+
+/**
+ */
+@Generated("41fec4e1cc038a9e9be1823f1d0955ef")
+public final class SeaTunnelGetJobMetricsCodec {
+    //hex: 0xDE0800
+    public static final int REQUEST_MESSAGE_TYPE = 14551040;
+    //hex: 0xDE0801
+    public static final int RESPONSE_MESSAGE_TYPE = 14551041;
+    private static final int REQUEST_JOB_ID_FIELD_OFFSET = 
PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+    private static final int REQUEST_INITIAL_FRAME_SIZE = 
REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
+    private static final int RESPONSE_INITIAL_FRAME_SIZE = 
RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+
+    private SeaTunnelGetJobMetricsCodec() {
+    }
+
+    public static ClientMessage encodeRequest(long jobId) {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        clientMessage.setRetryable(true);
+        clientMessage.setOperationName("SeaTunnel.GetJobMetrics");
+        ClientMessage.Frame initialFrame = new ClientMessage.Frame(new 
byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, 
REQUEST_MESSAGE_TYPE);
+        encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+        encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
+        clientMessage.add(initialFrame);
+        return clientMessage;
+    }
+
+    /**
+     */
+    public static long decodeRequest(ClientMessage clientMessage) {
+        ClientMessage.ForwardFrameIterator iterator = 
clientMessage.frameIterator();
+        ClientMessage.Frame initialFrame = iterator.next();
+        return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
+    }
+
+    public static ClientMessage encodeResponse(java.lang.String response) {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        ClientMessage.Frame initialFrame = new ClientMessage.Frame(new 
byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, 
RESPONSE_MESSAGE_TYPE);
+        clientMessage.add(initialFrame);
+
+        StringCodec.encode(clientMessage, response);
+        return clientMessage;
+    }
+
+    /**
+     */
+    public static java.lang.String decodeResponse(ClientMessage clientMessage) 
{
+        ClientMessage.ForwardFrameIterator iterator = 
clientMessage.frameIterator();
+        //empty initial frame
+        iterator.next();
+        return StringCodec.decode(iterator);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
 
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 6d599a747..ec40e9c92 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -154,3 +154,23 @@ methods:
           since: 2.0
           doc: ''
 
+  - id: 8
+    name: getJobMetrics
+    since: 2.0
+    doc: ''
+    request:
+      retryable: true
+      partitionIdentifier: -1
+      params:
+        - name: jobId
+          type: long
+          nullable: false
+          since: 2.0
+          doc: ''
+    response:
+      params:
+        - name: response
+          type: String
+          nullable: false
+          since: 2.0
+          doc: ''
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index c8203ccc5..2128ece81 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server;
 
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
@@ -34,6 +35,7 @@ import 
org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.master.JobHistoryService;
 import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import 
org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -169,7 +171,8 @@ public class CoordinatorService {
             runningJobStateIMap,
             logger,
             runningJobMasterMap,
-            
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE)
+            
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE),
+            
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_METRICS)
         );
 
         List<CompletableFuture<Void>> collect = 
runningJobInfoIMap.entrySet().stream().map(entry -> {
@@ -241,8 +244,7 @@ public class CoordinatorService {
                     jobMaster.run();
                 } finally {
                     // storage job state info to HistoryStorage
-                    removeJobIMap(jobMaster);
-                    runningJobMasterMap.remove(jobId);
+                    onJobDone(jobMaster, jobId);
                 }
             });
             return;
@@ -256,10 +258,7 @@ public class CoordinatorService {
                     
jobMaster.getPhysicalPlan().getPipelineList().forEach(SubPlan::restorePipelineState);
                     jobMaster.run();
                 } finally {
-                    // storage job state info to HistoryStorage
-                    jobHistoryService.storeFinishedJobState(jobMaster);
-                    removeJobIMap(jobMaster);
-                    runningJobMasterMap.remove(jobId);
+                    onJobDone(jobMaster, jobId);
                 }
             });
             return;
@@ -346,15 +345,20 @@ public class CoordinatorService {
             try {
                 jobMaster.run();
             } finally {
-                // storage job state info to HistoryStorage
-                jobHistoryService.storeFinishedJobState(jobMaster);
-                removeJobIMap(jobMaster);
-                runningJobMasterMap.remove(jobId);
+                onJobDone(jobMaster, jobId);
             }
         });
         return new PassiveCompletableFuture<>(voidCompletableFuture);
     }
 
+    private void onJobDone(JobMaster jobMaster, long jobId){
+        // storage job state and metrics to HistoryStorage
+        jobHistoryService.storeFinishedJobState(jobMaster);
+        jobHistoryService.storeFinishedJobMetrics(jobMaster);
+        removeJobIMap(jobMaster);
+        runningJobMasterMap.remove(jobId);
+    }
+
     private void removeJobIMap(JobMaster jobMaster) {
         Long jobId = jobMaster.getJobImmutableInformation().getJobId();
         runningJobStateTimestampsIMap.remove(jobId);
@@ -412,6 +416,16 @@ public class CoordinatorService {
         return runningJobMaster.getJobStatus();
     }
 
+    public JobMetrics getJobMetrics(long jobId){
+        JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
+        if (runningJobMaster == null) {
+            return jobHistoryService.getJobMetrics(jobId);
+        }
+        JobMetrics jobMetrics = 
JobMetricsUtil.toJobMetrics(runningJobMaster.getCurrJobMetrics());
+        JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId);
+        return jobMetricsImap != null ? jobMetricsImap : jobMetrics;
+    }
+
     /**
      * When TaskGroup ends, it is called by {@link TaskExecutionService} to 
notify JobMaster the TaskGroup's state.
      */
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 231869318..bb55db983 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -92,6 +92,7 @@ public class SeaTunnelServer implements ManagedService, 
MembershipAwareService,
         taskExecutionService = new TaskExecutionService(
             nodeEngine, nodeEngine.getProperties()
         );
+        
nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
         taskExecutionService.start();
         getSlotService();
         coordinatorService = new CoordinatorService(nodeEngine, this, 
seaTunnelConfig.getEngineConfig());
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index cf0867160..ad6a592e4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -17,13 +17,20 @@
 
 package org.apache.seatunnel.engine.server;
 
+import static org.apache.seatunnel.api.common.metrics.MetricTags.JOB_ID;
+import static org.apache.seatunnel.api.common.metrics.MetricTags.PIPELINE_ID;
+import static org.apache.seatunnel.api.common.metrics.MetricTags.TASK_GROUP_ID;
+import static 
org.apache.seatunnel.api.common.metrics.MetricTags.TASK_GROUP_LOCATION;
+import static org.apache.seatunnel.api.common.metrics.MetricTags.TASK_ID;
 import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
 import static com.hazelcast.jet.impl.util.Util.uncheckRun;
+import static java.lang.Thread.currentThread;
 import static java.util.Collections.emptyList;
 import static java.util.concurrent.Executors.newCachedThreadPool;
 import static java.util.stream.Collectors.partitioningBy;
 import static java.util.stream.Collectors.toList;
 
+import org.apache.seatunnel.api.common.metrics.MetricTags;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import 
org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -38,10 +45,15 @@ import 
org.apache.seatunnel.engine.server.execution.TaskGroupContext;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.execution.TaskTracker;
+import org.apache.seatunnel.engine.server.metrics.MetricsImpl;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import 
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
 
 import com.google.common.collect.Lists;
+import com.hazelcast.internal.metrics.DynamicMetricsProvider;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.MetricsCollectionContext;
+import com.hazelcast.internal.metrics.MetricsRegistry;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
 import com.hazelcast.logging.ILogger;
@@ -54,6 +66,7 @@ import org.apache.commons.collections4.CollectionUtils;
 
 import java.net.URL;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -69,11 +82,12 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
 
 /**
  * This class is responsible for the execution of the Task
  */
-public class TaskExecutionService {
+public class TaskExecutionService implements DynamicMetricsProvider {
 
     private final String hzInstanceName;
     private final NodeEngineImpl nodeEngine;
@@ -84,6 +98,7 @@ public class TaskExecutionService {
     private final RunBusWorkSupplier runBusWorkSupplier = new 
RunBusWorkSupplier(executorService, threadShareTaskQueue);
     // key: TaskID
     private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> 
executionContexts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> 
finishedExecutionContexts = new ConcurrentHashMap<>();
     private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> 
cancellationFutures =
         new ConcurrentHashMap<>();
 
@@ -91,6 +106,11 @@ public class TaskExecutionService {
         this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
         this.nodeEngine = nodeEngine;
         this.logger = 
nodeEngine.getLoggingService().getLogger(TaskExecutionService.class);
+
+        MetricsRegistry registry = nodeEngine.getMetricsRegistry();
+        MetricDescriptor descriptor = registry.newMetricDescriptor()
+            .withTag(MetricTags.SERVICE, this.getClass().getSimpleName());
+        registry.registerStaticMetrics(descriptor, this);
     }
 
     public void start() {
@@ -107,9 +127,23 @@ public class TaskExecutionService {
     }
 
     private void submitThreadShareTask(TaskGroupExecutionTracker 
taskGroupExecutionTracker, List<Task> tasks) {
-        tasks.stream()
-            .map(t -> new TaskTracker(t, taskGroupExecutionTracker))
-            .forEach(threadShareTaskQueue::add);
+        Stream<TaskTracker> taskTrackerStream = tasks.stream()
+            .map(t -> {
+                if 
(!taskGroupExecutionTracker.executionCompletedExceptionally()) {
+                    try {
+                        TaskTracker taskTracker = new TaskTracker(t, 
taskGroupExecutionTracker);
+                        taskTracker.task.init();
+                        return taskTracker;
+                    } catch (Exception e) {
+                        taskGroupExecutionTracker.exception(e);
+                        taskGroupExecutionTracker.taskDone();
+                    }
+                }
+                return null;
+            });
+        if (!taskGroupExecutionTracker.executionCompletedExceptionally()) {
+            taskTrackerStream.forEach(threadShareTaskQueue::add);
+        }
     }
 
     private void submitBlockingTask(TaskGroupExecutionTracker 
taskGroupExecutionTracker, List<Task> tasks) {
@@ -261,6 +295,34 @@ public class TaskExecutionService {
 
     }
 
+    public void notifyCleanTaskGroupContext(TaskGroupLocation 
taskGroupLocation){
+        finishedExecutionContexts.remove(taskGroupLocation);
+    }
+
+    @Override
+    public void provideDynamicMetrics(MetricDescriptor descriptor, 
MetricsCollectionContext context) {
+        try {
+            MetricDescriptor copy1 = 
descriptor.copy().withTag(MetricTags.SERVICE, this.getClass().getSimpleName());
+            Map<TaskGroupLocation, TaskGroupContext> contextMap = new 
HashMap<>();
+            contextMap.putAll(executionContexts);
+            contextMap.putAll(finishedExecutionContexts);
+            contextMap.forEach((taskGroupLocation, taskGroupContext) -> {
+                MetricDescriptor copy2 = 
copy1.copy().withTag(TASK_GROUP_LOCATION, taskGroupLocation.toString())
+                    .withTag(JOB_ID, 
String.valueOf(taskGroupLocation.getJobId()))
+                    .withTag(PIPELINE_ID, 
String.valueOf(taskGroupLocation.getPipelineId()))
+                    .withTag(TASK_GROUP_ID, 
String.valueOf(taskGroupLocation.getTaskGroupId()));
+                taskGroupContext.getTaskGroup().getTasks().forEach(task -> {
+                    Long taskID = task.getTaskID();
+                    MetricDescriptor copy3 = copy2.copy().withTag(TASK_ID, 
String.valueOf(taskID));
+                    task.provideDynamicMetrics(copy3, context);
+                });
+            });
+        } catch (Throwable t) {
+            logger.warning("Dynamic metric collection failed", t);
+            throw t;
+        }
+    }
+
     private final class BlockingWorker implements Runnable {
 
         private final TaskTracker tracker;
@@ -278,9 +340,11 @@ public class TaskExecutionService {
             ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
             Thread.currentThread().setContextClassLoader(classLoader);
             final Task t = tracker.task;
+            MetricsImpl.Container userMetricsContextContainer = 
MetricsImpl.container();
             try {
                 startedLatch.countDown();
                 t.init();
+                userMetricsContextContainer.setContext(t.getMetricsContext());
                 ProgressState result;
                 do {
                     result = t.call();
@@ -291,6 +355,7 @@ public class TaskExecutionService {
                 taskGroupExecutionTracker.exception(e);
             } finally {
                 taskGroupExecutionTracker.taskDone();
+                userMetricsContextContainer.setContext(null);
             }
             Thread.currentThread().setContextClassLoader(oldClassLoader);
         }
@@ -315,6 +380,8 @@ public class TaskExecutionService {
         AtomicBoolean keep = new AtomicBoolean(true);
         public AtomicReference<TaskTracker> exclusiveTaskTracker = new 
AtomicReference<>();
         final TaskCallTimer timer;
+        private Thread myThread;
+        private MetricsImpl.Container userMetricsContextContainer;
         public LinkedBlockingDeque<TaskTracker> taskqueue;
 
         @SuppressWarnings("checkstyle:MagicNumber")
@@ -328,6 +395,8 @@ public class TaskExecutionService {
         @SneakyThrows
         @Override
         public void run() {
+            myThread = currentThread();
+            userMetricsContextContainer = MetricsImpl.container();
             while (keep.get() && isRunning) {
                 TaskTracker taskTracker = null != exclusiveTaskTracker.get() ?
                     exclusiveTaskTracker.get() :
@@ -350,6 +419,8 @@ public class TaskExecutionService {
                 ProgressState call = null;
                 try {
                     //run task
+                    
myThread.setContextClassLoader(executionContexts.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader());
+                    
userMetricsContextContainer.setContext(taskTracker.task.getMetricsContext());
                     call = taskTracker.task.call();
                     synchronized (timer) {
                         timer.timerStop();
@@ -366,6 +437,7 @@ public class TaskExecutionService {
                 } finally {
                     //stop timer
                     timer.timerStop();
+                    userMetricsContextContainer.setContext(null);
                 }
                 //task call finished
                 if (null != call) {
@@ -449,6 +521,7 @@ public class TaskExecutionService {
             logger.info("taskDone: " + taskGroup.getTaskGroupLocation());
             if (completionLatch.decrementAndGet() == 0) {
                 TaskGroupLocation taskGroupLocation = 
taskGroup.getTaskGroupLocation();
+                finishedExecutionContexts.put(taskGroupLocation, 
executionContexts.get(taskGroupLocation));
                 executionContexts.remove(taskGroupLocation);
                 cancellationFutures.remove(taskGroupLocation);
                 Throwable ex = executionException.get();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
index eea42c9ee..88d76b016 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
@@ -20,15 +20,19 @@ package org.apache.seatunnel.engine.server.execution;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.checkpoint.Stateful;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
+import com.hazelcast.internal.metrics.DynamicMetricsProvider;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.MetricsCollectionContext;
 import lombok.NonNull;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
-public interface Task extends InternalCheckpointListener, Stateful, 
Serializable {
+public interface Task extends DynamicMetricsProvider, 
InternalCheckpointListener, Stateful, Serializable {
 
     default void init() throws Exception {
     }
@@ -53,4 +57,11 @@ public interface Task extends InternalCheckpointListener, 
Stateful, Serializable
 
     @Override
     default void restoreState(List<ActionSubtaskState> actionStateList) throws 
Exception {}
+
+    default MetricsContext getMetricsContext() {
+        return null;
+    }
+
+    default void provideDynamicMetrics(MetricDescriptor tagger, 
MetricsCollectionContext context) {
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 4d309be38..fffb3f369 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -17,11 +17,14 @@
 
 package org.apache.seatunnel.engine.server.master;
 
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -36,6 +39,7 @@ import lombok.Data;
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
@@ -67,18 +71,22 @@ public class JobHistoryService {
     //TODO need to limit the amount of storage
     private final IMap<Long, JobStateData> finishedJobStateImap;
 
+    private final IMap<Long, JobMetrics> finishedJobMetricsImap;
+
     private final ObjectMapper objectMapper;
 
     public JobHistoryService(
         IMap<Object, Object> runningJobStateIMap,
         ILogger logger,
         Map<Long, JobMaster> runningJobMasterMap,
-        IMap<Long, JobStateData> finishedJobStateImap
+        IMap<Long, JobStateData> finishedJobStateImap,
+        IMap<Long, JobMetrics> finishedJobMetricsImap
     ) {
         this.runningJobStateIMap = runningJobStateIMap;
         this.logger = logger;
         this.runningJobMasterMap = runningJobMasterMap;
         this.finishedJobStateImap = finishedJobStateImap;
+        this.finishedJobMetricsImap = finishedJobMetricsImap;
         this.objectMapper = new ObjectMapper();
         this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, 
false);
     }
@@ -104,6 +112,10 @@ public class JobHistoryService {
             finishedJobStateImap.getOrDefault(jobId, null);
     }
 
+    public JobMetrics getJobMetrics(Long jobId){
+        return finishedJobMetricsImap.getOrDefault(jobId, null);
+    }
+
     // Get detailed status of a single job as json
     public String getJobStatusAsString(Long jobId) {
         JobStateData jobStatus = getJobStatus(jobId);
@@ -128,6 +140,16 @@ public class JobHistoryService {
         finishedJobStateImap.put(jobStateData.jobId, jobStateData, 14, 
TimeUnit.DAYS);
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public void storeFinishedJobMetrics(JobMaster jobMaster) {
+        List<RawJobMetrics> currJobMetrics = jobMaster.getCurrJobMetrics();
+        JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(currJobMetrics);
+        Long jobId = jobMaster.getJobImmutableInformation().getJobId();
+        finishedJobMetricsImap.put(jobId, jobMetrics, 14, TimeUnit.DAYS);
+        //Clean TaskGroupContext for TaskExecutionServer
+        jobMaster.cleanTaskGroupContext();
+    }
+
     private JobStateData toJobStateMapper(JobMaster jobMaster) {
 
         Long jobId = jobMaster.getJobImmutableInformation().getJobId();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 3a2a8ca48..701a550d9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -19,9 +19,11 @@ package org.apache.seatunnel.engine.server.master;
 
 import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
 
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
@@ -32,6 +34,7 @@ import 
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
@@ -40,6 +43,8 @@ import 
org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import 
org.apache.seatunnel.engine.server.operation.CleanTaskGroupContextOperation;
+import 
org.apache.seatunnel.engine.server.operation.GetTaskGroupMetricsOperation;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
@@ -55,9 +60,12 @@ import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.NonNull;
 import org.apache.commons.collections4.CollectionUtils;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -275,6 +283,45 @@ public class JobMaster extends Thread {
         return physicalPlan.getJobStatus();
     }
 
+    public List<RawJobMetrics> getCurrJobMetrics() {
+        List<RawJobMetrics> metrics = new ArrayList<>();
+        ownedSlotProfilesIMap.forEach((pipelineLocation, 
taskGroupLocationSlotProfileMap) -> {
+            taskGroupLocationSlotProfileMap.forEach((taskGroupLocation, 
slotProfile) -> {
+                if (taskGroupLocation.getJobId() == 
this.getJobImmutableInformation().getJobId()) {
+                    Address worker = slotProfile.getWorker();
+                    InvocationFuture<Object> invoke = 
nodeEngine.getOperationService().createInvocationBuilder(
+                        SeaTunnelServer.SERVICE_NAME,
+                        new GetTaskGroupMetricsOperation(taskGroupLocation),
+                        worker).invoke();
+                    try {
+                        RawJobMetrics rawJobMetrics = (RawJobMetrics) 
invoke.get();
+                        metrics.add(rawJobMetrics);
+                    } catch (Exception e) {
+                        throw new SeaTunnelException(e.getMessage());
+                    }
+                }
+            });
+        });
+        return metrics;
+    }
+
+    public void cleanTaskGroupContext() {
+        ownedSlotProfilesIMap.forEach((pipelineLocation, 
taskGroupLocationSlotProfileMap) -> {
+            taskGroupLocationSlotProfileMap.forEach((taskGroupLocation, 
slotProfile) -> {
+                Address worker = slotProfile.getWorker();
+                InvocationFuture<Object> invoke = 
nodeEngine.getOperationService().createInvocationBuilder(
+                    SeaTunnelServer.SERVICE_NAME,
+                    new CleanTaskGroupContextOperation(taskGroupLocation),
+                    worker).invoke();
+                try {
+                    invoke.get();
+                } catch (Exception e) {
+                    throw new SeaTunnelException(e.getMessage());
+                }
+            });
+        });
+    }
+
     public PhysicalPlan getPhysicalPlan() {
         return physicalPlan;
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsCollector.java
new file mode 100644
index 000000000..d718c2e3b
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+
+import com.hazelcast.cluster.Member;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.collectors.MetricsCollector;
+import com.hazelcast.internal.metrics.impl.MetricsCompressor;
+import com.hazelcast.logging.ILogger;
+
+import java.util.Objects;
+import java.util.function.UnaryOperator;
+
+public class JobMetricsCollector implements MetricsCollector {
+
+    private final TaskGroupLocation taskGroupLocation;
+    private final MetricsCompressor compressor;
+    private final ILogger logger;
+    private final UnaryOperator<MetricDescriptor> addPrefixFn;
+
+    public JobMetricsCollector(TaskGroupLocation taskGroupLocation, Member 
member, ILogger logger) {
+        Objects.requireNonNull(member, "member");
+        this.logger = Objects.requireNonNull(logger, "logger");
+
+        this.taskGroupLocation = taskGroupLocation;
+        this.addPrefixFn = JobMetricsUtil.addMemberPrefixFn(member);
+        this.compressor = new MetricsCompressor();
+    }
+
+    @Override
+    public void collectLong(MetricDescriptor descriptor, long value) {
+        String taskGroupLocationStr = 
JobMetricsUtil.getTaskGroupLocationFromMetricsDescriptor(descriptor);
+        if (taskGroupLocation.toString().equals(taskGroupLocationStr)) {
+            compressor.addLong(addPrefixFn.apply(descriptor), value);
+        }
+    }
+
+    @Override
+    public void collectDouble(MetricDescriptor descriptor, double value) {
+        String taskGroupLocationStr = 
JobMetricsUtil.getTaskGroupLocationFromMetricsDescriptor(descriptor);
+        if (taskGroupLocation.toString().equals(taskGroupLocationStr)) {
+            compressor.addDouble(addPrefixFn.apply(descriptor), value);
+        }
+    }
+
+    @Override
+    public void collectException(MetricDescriptor descriptor, Exception e) {
+        String taskGroupLocationStr = 
JobMetricsUtil.getTaskGroupLocationFromMetricsDescriptor(descriptor);
+        if (taskGroupLocation.toString().equals(taskGroupLocationStr)) {
+            logger.warning("Exception when rendering job metrics: " + e, e);
+        }
+    }
+
+    @Override
+    public void collectNoValue(MetricDescriptor descriptor) {
+
+    }
+
+    public RawJobMetrics getMetrics() {
+        return RawJobMetrics.of(compressor.getBlobAndReset());
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
new file mode 100644
index 000000000..f53f7fec6
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import static org.apache.seatunnel.api.common.metrics.MetricTags.ADDRESS;
+import static org.apache.seatunnel.api.common.metrics.MetricTags.MEMBER;
+
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.api.common.metrics.Measurement;
+import org.apache.seatunnel.api.common.metrics.MetricTags;
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
+
+import com.hazelcast.cluster.Member;
+import com.hazelcast.internal.metrics.MetricConsumer;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.impl.MetricsCompressor;
+import com.hazelcast.internal.util.MapUtil;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+
+public final class JobMetricsUtil {
+
+    private JobMetricsUtil() {
+    }
+
+    public static String 
getTaskGroupLocationFromMetricsDescriptor(MetricDescriptor descriptor){
+        for (int i = 0; i < descriptor.tagCount(); i++) {
+            if (MetricTags.TASK_GROUP_LOCATION.equals(descriptor.tag(i))) {
+                return descriptor.tagValue(i);
+            }
+        }
+        return null;
+    }
+
+    public static UnaryOperator<MetricDescriptor> addMemberPrefixFn(Member 
member) {
+        String uuid = member.getUuid().toString();
+        String addr = member.getAddress().toString();
+        return d -> d.copy().withTag(MEMBER, uuid).withTag(ADDRESS, addr);
+    }
+
+    public static JobMetrics toJobMetrics(List<RawJobMetrics> rawJobMetrics) {
+        JobMetricsConsumer consumer = null;
+        for (RawJobMetrics metrics : rawJobMetrics) {
+            if (metrics.getBlob() == null) {
+                continue;
+            }
+            if (consumer == null) {
+                consumer = new JobMetricsConsumer();
+            }
+            consumer.timestamp = metrics.getTimestamp();
+            MetricsCompressor.extractMetrics(metrics.getBlob(), consumer);
+        }
+        return consumer == null ? JobMetrics.empty() : 
JobMetrics.of(consumer.metrics);
+
+    }
+
+    private static class JobMetricsConsumer implements MetricConsumer {
+
+        final Map<String, List<Measurement>> metrics = new HashMap<>();
+        long timestamp;
+
+        @Override
+        public void consumeLong(MetricDescriptor descriptor, long value) {
+            metrics.computeIfAbsent(descriptor.metric(), k -> new 
ArrayList<>())
+                   .add(measurement(descriptor, value));
+        }
+
+        @Override
+        public void consumeDouble(MetricDescriptor descriptor, double value) {
+            metrics.computeIfAbsent(descriptor.metric(), k -> new 
ArrayList<>())
+                .add(measurement(descriptor, value));
+        }
+
+        private Measurement measurement(MetricDescriptor descriptor, Object 
value) {
+            Map<String, String> tags = 
MapUtil.createHashMap(descriptor.tagCount());
+            for (int i = 0; i < descriptor.tagCount(); i++) {
+                tags.put(descriptor.tag(i), descriptor.tagValue(i));
+            }
+            if (descriptor.discriminator() != null || 
descriptor.discriminatorValue() != null) {
+                tags.put(descriptor.discriminator(), 
descriptor.discriminatorValue());
+            }
+            return Measurement.of(descriptor.metric(), value, timestamp, tags);
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/Metrics.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/Metrics.java
new file mode 100644
index 000000000..72a8b2e67
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/Metrics.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.Metric;
+import org.apache.seatunnel.api.common.metrics.Unit;
+
+public final class Metrics {
+
+    private Metrics() {
+    }
+
+    public static Metric metric(String name) {
+        return MetricsImpl.metric(name, Unit.COUNT);
+    }
+
+    /**
+     * Same as {@link #metric(String)}, but allows us to also specify the
+     * measurement {@link Unit} of the metric.
+     */
+    public static Metric metric(String name, Unit unit) {
+        return MetricsImpl.metric(name, unit);
+    }
+
+    public static Metric qpsMetric(String name, Unit unit) {
+        return MetricsImpl.qpsMetric(name, unit);
+    }
+
+    public static Metric threadSafeMetric(String name) {
+        return MetricsImpl.threadSafeMetric(name, Unit.COUNT);
+    }
+
+    /**
+     * Same as {@link #threadSafeMetric(String)}, but allows us to also
+     * specify the measurement {@link Unit} of the metric.
+     */
+    public static Metric threadSafeMetric(String name, Unit unit) {
+        return MetricsImpl.threadSafeMetric(name, unit);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
new file mode 100644
index 000000000..f54e02f79
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.Metric;
+import org.apache.seatunnel.api.common.metrics.Unit;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import com.hazelcast.internal.metrics.DynamicMetricsProvider;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.MetricsCollectionContext;
+import com.hazelcast.internal.metrics.ProbeLevel;
+import com.hazelcast.internal.metrics.ProbeUnit;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.BiFunction;
+
+public class MetricsContext implements DynamicMetricsProvider {
+
+    private static final BiFunction<String, Unit, AbstractMetric> 
CREATE_SINGLE_WRITER_METRIC = SingleWriterMetric::new;
+    private static final BiFunction<String, Unit, AbstractMetric> 
CREATE_THREAD_SAFE_METRICS = ThreadSafeMetric::new;
+
+    private static final BiFunction<String, Unit, AbstractMetric> 
CREATE_SINGLE_WRITER_QPS_METRIC = SingleWriterQPSMetric::new;
+
+    private volatile Map<String, AbstractMetric> metrics;
+
+    Metric metric(String name, Unit unit) {
+        return metric(name, unit, CREATE_SINGLE_WRITER_METRIC);
+    }
+
+    Metric qpsMetric(String name, Unit unit) {
+        return metric(name, unit, CREATE_SINGLE_WRITER_QPS_METRIC);
+    }
+
+    Metric threadSafeMetric(String name, Unit unit) {
+        return metric(name, unit, CREATE_THREAD_SAFE_METRICS);
+    }
+
+    private Metric metric(String name, Unit unit, BiFunction<String, Unit, 
AbstractMetric> metricSupplier) {
+        if (metrics == null) { //first metric being stored
+            metrics = new ConcurrentHashMap<>();
+        }
+
+        AbstractMetric metric = metrics.get(name);
+        if (metric != null) {
+            return metric;
+        }
+
+        metric = metricSupplier.apply(name, unit);
+        metrics.put(name, metric);
+
+        return metric;
+    }
+
+    @Override
+    public void provideDynamicMetrics(MetricDescriptor tagger, 
MetricsCollectionContext context) {
+        if (metrics != null) {
+            metrics.forEach((name, metric) -> {
+                if (metric.get() instanceof Long) {
+                    context.collect(tagger.copy(), name, ProbeLevel.INFO, 
toProbeUnit(metric.unit()),
+                        (Long) metric.get());
+                } else if (metric.get() instanceof Double) {
+                    context.collect(tagger.copy(), name, ProbeLevel.INFO, 
toProbeUnit(metric.unit()),
+                        (Double) metric.get());
+                } else {
+                    throw new SeaTunnelException("The value of Metric does not 
support " + metric.get().getClass().getSimpleName() + " data type");
+                }
+            });
+        }
+    }
+
+    private ProbeUnit toProbeUnit(Unit unit) {
+        return ProbeUnit.valueOf(unit.name());
+    }
+
+    private abstract static class AbstractMetric implements Metric {
+
+        private final String name;
+        private final Unit unit;
+
+        AbstractMetric(String name, Unit unit) {
+            this.name = name;
+            this.unit = unit;
+        }
+
+        @Override
+        public String name() {
+            return name;
+        }
+
+        @Override
+        public Unit unit() {
+            return unit;
+        }
+
+        protected abstract Object get();
+
+    }
+
+    private static final class SingleWriterQPSMetric extends AbstractMetric {
+
+        private static final AtomicLongFieldUpdater<SingleWriterQPSMetric> 
VOLATILE_VALUE_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(SingleWriterQPSMetric.class, 
"value");
+
+        private volatile long value;
+        private volatile long timestamp;
+
+        SingleWriterQPSMetric(String name, Unit unit) {
+            super(name, unit);
+        }
+
+        @Override
+        public void set(long newValue) {
+            checkAndSetStartTime();
+            VOLATILE_VALUE_UPDATER.lazySet(this, newValue);
+        }
+
+        @Override
+        public void increment() {
+            checkAndSetStartTime();
+            VOLATILE_VALUE_UPDATER.lazySet(this, value + 1);
+        }
+
+        @Override
+        public void increment(long increment) {
+            checkAndSetStartTime();
+            VOLATILE_VALUE_UPDATER.lazySet(this, value + increment);
+        }
+
+        @Override
+        public void decrement() {
+            checkAndSetStartTime();
+            VOLATILE_VALUE_UPDATER.lazySet(this, value - 1);
+        }
+
+        @Override
+        public void decrement(long decrement) {
+            checkAndSetStartTime();
+            VOLATILE_VALUE_UPDATER.lazySet(this, value - decrement);
+        }
+
+        @SuppressWarnings("checkstyle:MagicNumber")
+        @Override
+        protected Object get() {
+            long cost = System.currentTimeMillis() - timestamp;
+            return (double) value * 1000 / cost;
+        }
+
+        private void checkAndSetStartTime(){
+            if (timestamp == 0){
+                timestamp = System.currentTimeMillis();
+            }
+        }
+    }
+
+    private static final class SingleWriterMetric extends AbstractMetric {
+
+        private static final AtomicLongFieldUpdater<SingleWriterMetric> 
VOLATILE_VALUE_UPDATER =
+                AtomicLongFieldUpdater.newUpdater(SingleWriterMetric.class, 
"value");
+
+        private volatile long value;
+
+        SingleWriterMetric(String name, Unit unit) {
+            super(name, unit);
+        }
+
+        @Override
+        public void set(long newValue) {
+            VOLATILE_VALUE_UPDATER.lazySet(this, newValue);
+        }
+
+        @Override
+        public void increment() {
+            VOLATILE_VALUE_UPDATER.lazySet(this, value + 1);
+        }
+
+        @Override
+        public void increment(long increment) {
+            VOLATILE_VALUE_UPDATER.lazySet(this, value + increment);
+        }
+
+        @Override
+        public void decrement() {
+            VOLATILE_VALUE_UPDATER.lazySet(this, value - 1);
+        }
+
+        @Override
+        public void decrement(long decrement) {
+            VOLATILE_VALUE_UPDATER.lazySet(this, value - decrement);
+        }
+
+        @Override
+        protected Object get() {
+            return value;
+        }
+    }
+
+    private static final class ThreadSafeMetric extends AbstractMetric {
+
+        private static final AtomicLongFieldUpdater<ThreadSafeMetric> 
VOLATILE_VALUE_UPDATER =
+                AtomicLongFieldUpdater.newUpdater(ThreadSafeMetric.class, 
"value");
+
+        private volatile long value;
+
+        ThreadSafeMetric(String name, Unit unit) {
+            super(name, unit);
+        }
+
+        @Override
+        public void increment() {
+            VOLATILE_VALUE_UPDATER.incrementAndGet(this);
+        }
+
+        @Override
+        public void increment(long amount) {
+            VOLATILE_VALUE_UPDATER.addAndGet(this, amount);
+        }
+
+        @Override
+        public void decrement() {
+            VOLATILE_VALUE_UPDATER.decrementAndGet(this);
+        }
+
+        @Override
+        public void decrement(long amount) {
+            VOLATILE_VALUE_UPDATER.addAndGet(this, -amount);
+        }
+
+        @Override
+        public void set(long newValue) {
+            VOLATILE_VALUE_UPDATER.set(this, newValue);
+        }
+
+        @Override
+        protected Object get() {
+            return VOLATILE_VALUE_UPDATER.get(this);
+        }
+    }
+
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsImpl.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsImpl.java
new file mode 100644
index 000000000..6eea089a2
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.Metric;
+import org.apache.seatunnel.api.common.metrics.Unit;
+
+public final class MetricsImpl {
+
+    private static final ThreadLocal<Container> CONTEXT = 
ThreadLocal.withInitial(Container::new);
+
+    private MetricsImpl() {
+    }
+
+    public static Container container() {
+        return CONTEXT.get();
+    }
+
+    public static Metric metric(String name, Unit unit) {
+        return getContext().metric(name, unit);
+    }
+
+    public static Metric qpsMetric(String name, Unit unit) {
+        return getContext().qpsMetric(name, unit);
+    }
+
+    public static Metric threadSafeMetric(String name, Unit unit) {
+        return getContext().threadSafeMetric(name, unit);
+    }
+
+    private static org.apache.seatunnel.engine.server.metrics.MetricsContext 
getContext() {
+        Container container = CONTEXT.get();
+        org.apache.seatunnel.engine.server.metrics.MetricsContext context = 
container.getContext();
+        if (context == null) {
+            throw new RuntimeException("Thread %s has no metrics context set, 
this method can " +
+                    "be called only on threads executing the job's 
processors");
+        }
+        return context;
+    }
+
+    public static class Container {
+
+        private org.apache.seatunnel.engine.server.metrics.MetricsContext 
context;
+
+        Container() {
+        }
+
+        public org.apache.seatunnel.engine.server.metrics.MetricsContext 
getContext() {
+            return context;
+        }
+
+        public void setContext(MetricsContext context) {
+            this.context = context;
+        }
+    }
+
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
index 56701e88b..cb2443047 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
@@ -1,11 +1,12 @@
 /*
- * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CleanTaskGroupContextOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CleanTaskGroupContextOperation.java
new file mode 100644
index 000000000..ea4a7e4d7
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CleanTaskGroupContextOperation.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class CleanTaskGroupContextOperation extends Operation {
+
+    private TaskGroupLocation taskGroupLocation;
+
+    public CleanTaskGroupContextOperation(TaskGroupLocation taskGroupLocation) 
{
+        this.taskGroupLocation = taskGroupLocation;
+    }
+
+    @Override
+    public void run() {
+
+        //remove TaskGroupContext for TaskExecutionService
+        SeaTunnelServer service = getService();
+        
service.getTaskExecutionService().notifyCleanTaskGroupContext(taskGroupLocation);
+
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
new file mode 100644
index 000000000..b8ce44401
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import 
org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.AllowedDuringPassiveState;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public class GetJobMetricsOperation extends Operation implements 
IdentifiedDataSerializable, AllowedDuringPassiveState {
+    private long jobId;
+
+    private String response;
+
+    public GetJobMetricsOperation() {
+    }
+
+    public GetJobMetricsOperation(long jobId) {
+        this.jobId = jobId;
+    }
+
+    @Override
+    public final int getFactoryId() {
+        return OperationDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return OperationDataSerializerHook.GET_JOB_METRICS_OPERATOR;
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        out.writeLong(jobId);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        jobId = in.readLong();
+    }
+
+    @Override
+    public void run() {
+        SeaTunnelServer service = getService();
+        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> 
{
+            return 
service.getCoordinatorService().getJobMetrics(jobId).toJsonString();
+        });
+
+        try {
+            response = future.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Object getResponse() {
+        return response;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetTaskGroupMetricsOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetTaskGroupMetricsOperation.java
new file mode 100644
index 000000000..181cf5048
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetTaskGroupMetricsOperation.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.metrics.JobMetricsCollector;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class GetTaskGroupMetricsOperation extends Operation {
+
+    private TaskGroupLocation taskGroupLocation;
+    private RawJobMetrics response;
+
+    public GetTaskGroupMetricsOperation(TaskGroupLocation taskGroupLocation) {
+        this.taskGroupLocation = taskGroupLocation;
+    }
+
+    @Override
+    public void run() {
+        ILogger logger = getLogger();
+
+        Address callerAddress = getCallerAddress();
+
+        NodeEngineImpl nodeEngine = (NodeEngineImpl) getNodeEngine();
+        Address masterAddress = getNodeEngine().getMasterAddress();
+        if (!callerAddress.equals(masterAddress)) {
+            throw new IllegalStateException("Caller " + callerAddress + " 
cannot get taskGroupLocation metrics"
+                    + taskGroupLocation.toString() + " because it is not 
master. Master is: " + masterAddress);
+        }
+
+        JobMetricsCollector
+            metricsRenderer = new JobMetricsCollector(taskGroupLocation, 
nodeEngine.getLocalMember(), logger);
+        nodeEngine.getMetricsRegistry().collect(metricsRenderer);
+        response = metricsRenderer.getMetrics();
+    }
+
+    @Override
+    public Object getResponse() {
+        return response;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
new file mode 100644
index 000000000..923234f5d
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
+import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class GetJobMetricsTask extends AbstractSeaTunnelMessageTask<Long, 
String> {
+
+    protected GetJobMetricsTask(ClientMessage clientMessage, Node node, 
Connection connection) {
+        super(clientMessage, node, connection,
+            SeaTunnelGetJobMetricsCodec::decodeRequest,
+            SeaTunnelGetJobMetricsCodec::encodeResponse);
+    }
+
+    @Override
+    protected Operation prepareOperation() {
+        return new GetJobMetricsOperation(parameters);
+    }
+
+    @Override
+    public String getMethodName() {
+        return "getJobStatus";
+    }
+
+    @Override
+    public Object[] getParameters() {
+        return new Object[0];
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index 7910bcd76..130fe0f36 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.protocol.task;
 
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStateCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
@@ -61,5 +62,7 @@ public class SeaTunnelMessageTaskFactoryProvider implements 
MessageTaskFactoryPr
             (clientMessage, connection) -> new GetJobStateTask(clientMessage, 
node, connection));
         factories.put(SeaTunnelListJobStatusCodec.REQUEST_MESSAGE_TYPE,
             (clientMessage, connection) -> new 
ListJobStatusTask(clientMessage, node, connection));
+        factories.put(SeaTunnelGetJobMetricsCodec.REQUEST_MESSAGE_TYPE,
+            (clientMessage, connection) -> new 
GetJobMetricsTask(clientMessage, node, connection));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index 42a240b0e..bfa6fef31 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.serializable;
 
 import 
org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
 import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
+import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
 import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
 import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
 import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
@@ -49,6 +50,8 @@ public final class OperationDataSerializerHook implements 
DataSerializerHook {
 
     public static final int GET_JOB_STATUS_OPERATOR = 5;
 
+    public static final int GET_JOB_METRICS_OPERATOR = 6;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
         
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
@@ -81,6 +84,8 @@ public final class OperationDataSerializerHook implements 
DataSerializerHook {
                     return new CancelJobOperation();
                 case GET_JOB_STATUS_OPERATOR:
                     return new GetJobStatusOperation();
+                case GET_JOB_METRICS_OPERATOR:
+                    return new GetJobMetricsOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + 
typeId);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 9e1280117..d1269266a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -17,8 +17,13 @@
 
 package org.apache.seatunnel.engine.server.task;
 
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
+
+import org.apache.seatunnel.api.common.metrics.Unit;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.engine.server.metrics.Metrics;
 import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
 
 import java.io.IOException;
@@ -39,6 +44,8 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
     public void collect(T row) {
         try {
             sendRecordToNext(new Record<>(row));
+            Metrics.qpsMetric(SOURCE_RECEIVED_QPS, Unit.COUNT).increment();
+            Metrics.metric(SOURCE_RECEIVED_COUNT, Unit.COUNT).increment();
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 69123b334..a735307a9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -27,6 +27,7 @@ import static 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTask
 import static 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.STARTING;
 import static 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.WAITING_RESTORE;
 
+import org.apache.seatunnel.api.common.metrics.MetricTags;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.utils.function.ConsumerWithException;
@@ -49,6 +50,7 @@ import 
org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlo
 import 
org.apache.seatunnel.engine.server.dag.physical.flow.UnknownFlowException;
 import org.apache.seatunnel.engine.server.execution.TaskGroup;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
 import org.apache.seatunnel.engine.server.task.flow.ActionFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle;
 import 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
@@ -62,6 +64,8 @@ import 
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateQu
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.MetricsCollectionContext;
 import lombok.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,6 +106,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
 
     private TaskGroup taskBelongGroup;
 
+    private MetricsContext metricsContext;
+
     public SeaTunnelTask(long jobID, TaskLocation taskID, int indexID, Flow 
executionFlow) {
         super(jobID, taskID);
         this.indexID = indexID;
@@ -116,6 +122,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
         flowFutures = new ArrayList<>();
         allCycles = new ArrayList<>();
         startFlowLifeCycle = convertFlowToActionLifeCycle(executionFlow);
+        metricsContext = new MetricsContext();
         for (FlowLifeCycle cycle : allCycles) {
             cycle.init();
         }
@@ -319,4 +326,17 @@ public abstract class SeaTunnelTask extends AbstractTask {
             });
         restoreComplete = true;
     }
+
+    @Override
+    public MetricsContext getMetricsContext() {
+        return metricsContext;
+    }
+
+    @Override
+    public void provideDynamicMetrics(MetricDescriptor descriptor, 
MetricsCollectionContext context) {
+        if (null != metricsContext) {
+            metricsContext.provideDynamicMetrics(
+                descriptor.copy().withTag(MetricTags.TASK_NAME, 
this.getClass().getSimpleName()), context);
+        }
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 3a548105d..7458cd641 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -17,9 +17,12 @@
 
 package org.apache.seatunnel.engine.server.task.flow;
 
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
 import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
 import static 
org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates;
 
+import org.apache.seatunnel.api.common.metrics.Unit;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
@@ -29,6 +32,7 @@ import 
org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.metrics.Metrics;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
 import 
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
@@ -151,6 +155,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                     return;
                 }
                 writer.write((T) record.getData());
+                Metrics.qpsMetric(SINK_WRITE_QPS, Unit.COUNT).increment();
+                Metrics.metric(SINK_WRITE_COUNT, Unit.COUNT).increment();
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
new file mode 100644
index 000000000..f9130e747
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master;
+
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.TestUtils;
+
+import com.hazelcast.internal.serialization.Data;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+@DisabledOnOs(OS.WINDOWS)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class JobMetricsTest extends AbstractSeaTunnelServerTest {
+
+    private static final Long JOB_1 = 1L;
+    private static final Long JOB_2 = 2L;
+    private static final Long JOB_3 = 3L;
+
+    @Test
+    public void testGetJobMetrics() throws Exception {
+        startJob(JOB_1, "fake_to_console_job_metrics.conf");
+        startJob(JOB_2, "fake_to_console_job_metrics.conf");
+
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> {
+                JobMetrics jobMetrics = 
server.getCoordinatorService().getJobMetrics(JOB_1);
+                if (jobMetrics.get(SINK_WRITE_COUNT).size() > 0) {
+                    assertTrue((Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(0).value() > 0);
+                    assertTrue((Long) 
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value() > 0);
+                }
+                else {
+                    fail();
+                }
+            });
+
+        // waiting for JOB_1 status turn to FINISHED
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertTrue(
+                
server.getCoordinatorService().getJobHistoryService().listAllJob().contains(String.format("{\"jobId\":%s,\"jobStatus\":\"FINISHED\"}",
 JOB_1))));
+
+        JobMetrics jobMetrics = 
server.getCoordinatorService().getJobMetrics(JOB_1);
+        assertEquals(30, (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
+        assertEquals(30, (Long) 
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
+        assertTrue((Double) jobMetrics.get(SOURCE_RECEIVED_QPS).get(0).value() 
> 0);
+        assertTrue((Double) jobMetrics.get(SINK_WRITE_QPS).get(0).value() > 0);
+    }
+
+    private void startJob(Long jobid, String path){
+        LogicalDag testLogicalDag =
+            TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid);
+
+        JobImmutableInformation jobImmutableInformation = new 
JobImmutableInformation(jobid,
+            nodeEngine.getSerializationService().toData(testLogicalDag), 
testLogicalDag.getJobConfig(),
+            Collections.emptyList());
+
+        Data data = 
nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+            server.getCoordinatorService().submitJob(jobid, data);
+        voidPassiveCompletableFuture.join();
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
new file mode 100644
index 000000000..7edeb4ef7
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  execution.checkpoint.interval = 5000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    parallelism = 1
+    split.num = 3
+    row.num = 30
+    split.read-interval=120
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+
+}
+
+transform {
+}
+
+sink {
+  console {
+    source_table_name="fake"
+  }
+}
\ No newline at end of file

Reply via email to