This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c57c22095cc [FLINK-37596][metrics] Close metric group of a finished
split
c57c22095cc is described below
commit c57c22095ccb91a10f1f089d888421adcced76d1
Author: Efrat Levitan <[email protected]>
AuthorDate: Wed Apr 2 09:32:44 2025 +0300
[FLINK-37596][metrics] Close metric group of a finished split
[FLINK-37596] Remove redundant metricGroup initialization
---
.../groups/InternalSourceSplitMetricGroup.java | 21 +++-
.../streaming/api/operators/SourceOperator.java | 1 +
.../SourceOperatorSplitWatermarkAlignmentTest.java | 106 ++++++++++++++++++++-
.../operators/source/TestingSourceOperator.java | 2 -
4 files changed, 124 insertions(+), 6 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
index c6007b9db6c..706606a1598 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
@@ -47,6 +47,7 @@ public class InternalSourceSplitMetricGroup extends
ProxyMetricGroup<MetricGroup
private static final String WATERMARK = "watermark";
private static final long SPLIT_NOT_STARTED = -1L;
private long splitStartTime = SPLIT_NOT_STARTED;
+ private final MetricGroup splitWatermarkMetricGroup;
private InternalSourceSplitMetricGroup(
MetricGroup parentMetricGroup,
@@ -55,8 +56,7 @@ public class InternalSourceSplitMetricGroup extends
ProxyMetricGroup<MetricGroup
Gauge<Long> currentWatermark) {
super(parentMetricGroup);
this.clock = clock;
- MetricGroup splitWatermarkMetricGroup =
- parentMetricGroup.addGroup(SPLIT, splitId).addGroup(WATERMARK);
+ splitWatermarkMetricGroup = parentMetricGroup.addGroup(SPLIT,
splitId).addGroup(WATERMARK);
pausedTimePerSecond =
splitWatermarkMetricGroup.gauge(
MetricNames.SPLIT_PAUSED_TIME, new TimerGauge(clock));
@@ -191,4 +191,21 @@ public class InternalSourceSplitMetricGroup extends
ProxyMetricGroup<MetricGroup
public Boolean isActive() {
return !isPaused() && !isIdle();
}
+
+ public void onSplitFinished() {
+ if (splitWatermarkMetricGroup instanceof AbstractMetricGroup) {
+ ((AbstractMetricGroup) splitWatermarkMetricGroup).close();
+ } else {
+ if (splitWatermarkMetricGroup != null) {
+ LOG.warn(
+ "Split watermark metric group can not be closed,
expecting an instance of AbstractMetricGroup but got: ",
+ splitWatermarkMetricGroup.getClass().getName());
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public MetricGroup getSplitWatermarkMetricGroup() {
+ return splitWatermarkMetricGroup;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index efef41ff46d..b5fff8915da 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -729,6 +729,7 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
@Override
public void splitFinished(String splitId) {
splitCurrentWatermarks.remove(splitId);
+ getOrCreateSplitMetricGroup(splitId).onSplitFinished();
this.splitMetricGroups.remove(splitId);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index d3226c651c2..060689f0a55 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -19,6 +19,7 @@ limitations under the License.
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
@@ -28,8 +29,15 @@ import
org.apache.flink.api.connector.source.mocks.MockSourceReader.WaitingForSp
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
@@ -55,10 +63,14 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.assertj.core.api.Assertions.assertThat;
/** Unit test for split alignment in {@link SourceOperator}. */
@@ -306,6 +318,44 @@ class SourceOperatorSplitWatermarkAlignmentTest {
assertThat(sourceReader.getPausedSplits()).isEmpty();
}
+ @Test
+ void testMetricGroupIsClosedForFinishedSplitAndMetricsAreUnregistered()
throws Exception {
+ long idleTimeout = 100;
+ Collection<String> expectedMetricNames =
+ Arrays.asList(
+ MetricNames.SPLIT_IDLE_TIME,
+ MetricNames.ACC_SPLIT_IDLE_TIME,
+ MetricNames.SPLIT_ACTIVE_TIME,
+ MetricNames.ACC_SPLIT_ACTIVE_TIME,
+ MetricNames.SPLIT_PAUSED_TIME,
+ MetricNames.ACC_SPLIT_PAUSED_TIME,
+ MetricNames.SPLIT_CURRENT_WATERMARK);
+ final Map<String, Metric> registry = new ConcurrentHashMap<>();
+ MockSourceReader sourceReader =
+ new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS,
false, true);
+ TestProcessingTimeService processingTimeService = new
TestProcessingTimeService();
+ SourceOperator<Integer, MockSourceSplit> operator =
+ createAndOpenSourceOperatorWithIdlenessAndRegistry(
+ sourceReader, processingTimeService, idleTimeout,
registry);
+
+ MockSourceSplit split0 = new MockSourceSplit(0, 0, 1);
+ split0.addRecord(5);
+
+ operator.handleOperatorEvent(
+ new AddSplitEvent<>(Arrays.asList(split0), new
MockSourceSplitSerializer()));
+ CollectingDataOutput<Integer> dataOutput = new
CollectingDataOutput<>();
+ AbstractMetricGroup metricGroup =
+ (AbstractMetricGroup)
+ operator.getSplitMetricGroup(split0.splitId())
+ .getSplitWatermarkMetricGroup();
+ expectedMetricNames.forEach(metric ->
assertThat(registry.containsKey(metric)).isTrue());
+ while (operator.emitNext(dataOutput) ==
DataInputStatus.MORE_AVAILABLE) {
+ // split0 emits records until finished/released
+ }
+ assertThat(metricGroup.isClosed()).isTrue();
+ expectedMetricNames.forEach(metric ->
assertThat(registry.containsKey(metric)).isFalse());
+ }
+
@Test
void testStateReportingForMultiSplitWatermarkAlignmentAndIdleness() throws
Exception {
long idleTimeout = 100;
@@ -448,7 +498,37 @@ class SourceOperatorSplitWatermarkAlignmentTest {
long idleTimeout)
throws Exception {
- Environment env = getTestingEnvironment();
+ return createAndOpenSourceOperatorWithIdlenessAndEnv(
+ sourceReader, processingTimeService, idleTimeout,
getTestingEnvironment());
+ }
+
+ private SourceOperator<Integer, MockSourceSplit>
+ createAndOpenSourceOperatorWithIdlenessAndRegistry(
+ MockSourceReader sourceReader,
+ TestProcessingTimeService processingTimeService,
+ long idleTimeout,
+ Map<String, Metric> registry)
+ throws Exception {
+
+ StreamMockEnvironment env = getTestingEnvironment();
+ TaskMetricGroup metricGroup =
+ TaskManagerMetricGroup.createTaskManagerMetricGroup(
+ new TestMetricRegistry(registry),
+ "localhost",
+ ResourceID.generate())
+ .addJob(new JobID(), "jobName")
+ .addTask(createExecutionAttemptId(), "test");
+ env.setTaskMetricGroup(metricGroup);
+ return createAndOpenSourceOperatorWithIdlenessAndEnv(
+ sourceReader, processingTimeService, idleTimeout, env);
+ }
+
+ private SourceOperator<Integer, MockSourceSplit>
createAndOpenSourceOperatorWithIdlenessAndEnv(
+ MockSourceReader sourceReader,
+ TestProcessingTimeService processingTimeService,
+ long idleTimeout,
+ Environment env)
+ throws Exception {
SourceOperator<Integer, MockSourceSplit> operator =
new TestingSourceOperator<>(
new StreamOperatorParameters<>(
@@ -474,7 +554,7 @@ class SourceOperatorSplitWatermarkAlignmentTest {
return operator;
}
- private Environment getTestingEnvironment() {
+ private StreamMockEnvironment getTestingEnvironment() {
return new StreamMockEnvironment(
new Configuration(),
new Configuration(),
@@ -529,4 +609,26 @@ class SourceOperatorSplitWatermarkAlignmentTest {
"any watermark");
}
}
+
+ /** The metric registry for storing the registered metrics to verify in
tests. */
+ static class TestMetricRegistry extends NoOpMetricRegistry {
+ private final Map<String, Metric> metrics;
+
+ TestMetricRegistry(Map<String, Metric> metrics) {
+ super();
+ this.metrics = metrics;
+ }
+
+ @Override
+ public void register(Metric metric, String metricName,
AbstractMetricGroup<?> group) {
+ metrics.put(metricName, metric);
+ }
+
+ @Override
+ public void unregister(Metric metric, String metricName,
AbstractMetricGroup<?> group) {
+ if (metrics.get(metricName) != null) {
+ metrics.remove(metricName);
+ }
+ }
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 3cd58e4c8b9..491106d2f42 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -27,7 +27,6 @@ import
org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
@@ -103,7 +102,6 @@ public class TestingSourceOperator<T> extends
SourceOperator<T, MockSourceSplit>
this.subtaskIndex = subtaskIndex;
this.parallelism = parallelism;
- this.metrics =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
initSourceMetricGroup();
// unchecked wrapping is okay to keep tests simpler