This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 3aa267b [FLINK-26076] Fix ArchUnit violations in Source(Sink)MetricsITCase 3aa267b is described below commit 3aa267bf511287e39b0a9d781b9aaf38843e1e91 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Thu Feb 10 16:15:04 2022 +0100 [FLINK-26076] Fix ArchUnit violations in Source(Sink)MetricsITCase This closes #18709 --- .../8bad5118-af5d-4976-ac57-382ed16f7f7e | 1 - .../base/source/reader/SourceMetricsITCase.java | 42 ++++++++------------ .../flink/runtime/testutils/InMemoryReporter.java | 41 ++++++++++++-------- .../test/streaming/runtime/SinkMetricsITCase.java | 45 ++++++++-------------- 4 files changed, 60 insertions(+), 69 deletions(-) diff --git a/flink-connectors/flink-connector-base/archunit-violations/8bad5118-af5d-4976-ac57-382ed16f7f7e b/flink-connectors/flink-connector-base/archunit-violations/8bad5118-af5d-4976-ac57-382ed16f7f7e index e82c8cd..1f0ea28 100644 --- a/flink-connectors/flink-connector-base/archunit-violations/8bad5118-af5d-4976-ac57-382ed16f7f7e +++ b/flink-connectors/flink-connector-base/archunit-violations/8bad5118-af5d-4976-ac57-382ed16f7f7e @@ -1,4 +1,3 @@ org.apache.flink.connector.base.sink.AsyncSinkBaseITCase does not satisfy: contain any fields that are public, static, and of type MiniClusterExtension and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and pu [...] org.apache.flink.connector.base.source.reader.CoordinatedSourceITCase does not satisfy: contain any fields that are public, static, and of type MiniClusterExtension and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientRe [...] org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase does not satisfy: contain any fields that are public, static, and of type MiniClusterExtension and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithC [...] -org.apache.flink.connector.base.source.reader.SourceMetricsITCase does not satisfy: contain any fields that are public, static, and of type MiniClusterExtension and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResour [...] \ No newline at end of file diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java index 508e1ef..d298a26 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.base.source.reader; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkOutput; @@ -44,8 +45,7 @@ import org.apache.flink.testutils.junit.SharedReference; import org.apache.flink.util.TestLogger; import org.hamcrest.Matcher; -import org.junit.After; -import org.junit.Before; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -74,29 +74,16 @@ public class SourceMetricsITCase extends TestLogger { // this basically is the time a build is allowed to be frozen before the test fails private static final long WATERMARK_EPSILON = Duration.ofHours(6).toMillis(); @Rule public final SharedObjects sharedObjects = SharedObjects.create(); - private InMemoryReporter reporter; + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); - private MiniClusterWithClientResource miniClusterResource; - - @Before - public void setup() throws Exception { - reporter = InMemoryReporter.createWithRetainedMetrics(); - Configuration configuration = new Configuration(); - reporter.addToConfiguration(configuration); - miniClusterResource = - new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(1) - .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) - .setConfiguration(configuration) - .build()); - miniClusterResource.before(); - } - - @After - public void teardown() { - miniClusterResource.after(); - } + @ClassRule + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .build()); @Test public void testMetricsWithTimestamp() throws Exception { @@ -145,9 +132,11 @@ public class SourceMetricsITCase extends TestLogger { }); stream.addSink(new DiscardingSink<>()); JobClient jobClient = env.executeAsync(); + final JobID jobId = jobClient.getJobID(); beforeBarrier.get().await(); assertSourceMetrics( + jobId, reporter, stopAtRecord1 + 1, numRecordsPerSplit, @@ -158,6 +147,7 @@ public class SourceMetricsITCase extends TestLogger { beforeBarrier.get().await(); assertSourceMetrics( + jobId, reporter, stopAtRecord2 + 1, numRecordsPerSplit, @@ -170,13 +160,15 @@ public class SourceMetricsITCase extends TestLogger { } private void assertSourceMetrics( + JobID jobId, InMemoryReporter reporter, long processedRecordsPerSubtask, long numTotalPerSubtask, int parallelism, int numSplits, boolean hasTimestamps) { - List<OperatorMetricGroup> groups = reporter.findOperatorMetricGroups("MetricTestingSource"); + List<OperatorMetricGroup> groups = + reporter.findOperatorMetricGroups(jobId, "MetricTestingSource"); assertThat(groups, hasSize(parallelism)); int subtaskWithMetrics = 0; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java index 2254841..1a992e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.LogicalScopeProvider; @@ -27,6 +28,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.MetricReporterFactory; +import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -102,9 +105,9 @@ public class InMemoryReporter implements MetricReporter { } } - public Map<String, Metric> getMetricsByIdentifiers() { + public Map<String, Metric> getMetricsByIdentifiers(JobID jobId) { synchronized (this) { - return getMetricStream().collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + return getMetricStream(jobId).collect(Collectors.toMap(Entry::getKey, Entry::getValue)); } } @@ -123,16 +126,16 @@ public class InMemoryReporter implements MetricReporter { } } - public Map<String, Metric> findMetrics(String identifierPattern) { + public Map<String, Metric> findMetrics(JobID jobId, String identifierPattern) { synchronized (this) { - return getMetricStream(identifierPattern) + return getMetricStream(jobId, identifierPattern) .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); } } - public Optional<Metric> findMetric(String patternString) { + public Optional<Metric> findMetric(JobID jobId, String patternString) { synchronized (this) { - return getMetricStream(patternString).map(Entry::getValue).findFirst(); + return getMetricStream(jobId, patternString).map(Entry::getValue).findFirst(); } } @@ -148,14 +151,15 @@ public class InMemoryReporter implements MetricReporter { } } - public List<OperatorMetricGroup> findOperatorMetricGroups(String operatorPattern) { + public List<OperatorMetricGroup> findOperatorMetricGroups(JobID jobId, String operatorPattern) { Pattern pattern = Pattern.compile(operatorPattern); synchronized (this) { return metrics.keySet().stream() .filter( g -> g instanceof OperatorMetricGroup - && pattern.matcher(getOperatorName(g)).find()) + && pattern.matcher(getOperatorName(g)).find() + && getJobId(g).equals(jobId.toString())) .map(OperatorMetricGroup.class::cast) .sorted(Comparator.comparing(this::getSubtaskId)) .collect(Collectors.toList()); @@ -163,11 +167,15 @@ public class InMemoryReporter implements MetricReporter { } private String getSubtaskId(OperatorMetricGroup g) { - return g.getScopeComponents()[g.getScopeComponents().length - 1]; + return g.getAllVariables().get(ScopeFormat.SCOPE_TASK_SUBTASK_INDEX); } private String getOperatorName(MetricGroup g) { - return g.getScopeComponents()[g.getScopeComponents().length - 2]; + return g.getAllVariables().get(ScopeFormat.SCOPE_OPERATOR_NAME); + } + + private String getJobId(MetricGroup g) { + return g.getAllVariables().get(ScopeFormat.SCOPE_JOB_ID); } @Override @@ -195,13 +203,15 @@ public class InMemoryReporter implements MetricReporter { } } - private Stream<Entry<String, Metric>> getMetricStream(String identifierPattern) { + private Stream<Entry<String, Metric>> getMetricStream(JobID jobID, String identifierPattern) { Pattern pattern = Pattern.compile(identifierPattern); - return getMetricStream().filter(m -> pattern.matcher(m.getKey()).find()); + return getMetricStream(jobID).filter(m -> pattern.matcher(m.getKey()).find()); } - private Stream<Entry<String, Metric>> getMetricStream() { - return metrics.entrySet().stream().flatMap(this::getGroupMetricStream); + private Stream<Entry<String, Metric>> getMetricStream(JobID jobId) { + return metrics.entrySet().stream() + .filter(gr -> Objects.equals(getJobId(gr.getKey()), jobId.toString())) + .flatMap(this::getGroupMetricStream); } private Stream<MetricGroup> getGroupStream(String groupPattern) { @@ -231,7 +241,7 @@ public class InMemoryReporter implements MetricReporter { : group; } - public void addToConfiguration(Configuration configuration) { + public Configuration addToConfiguration(Configuration configuration) { configuration.setString( ConfigConstants.METRICS_REPORTER_PREFIX + "mini_cluster_resource_reporter." @@ -240,6 +250,7 @@ public class InMemoryReporter implements MetricReporter { configuration.setString( ConfigConstants.METRICS_REPORTER_PREFIX + "mini_cluster_resource_reporter." + ID, id.toString()); + return configuration; } /** The factory for the {@link InMemoryReporter}. */ diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java index f91a013..e2e4203 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java @@ -17,6 +17,7 @@ package org.apache.flink.test.streaming.runtime; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.configuration.Configuration; @@ -35,8 +36,7 @@ import org.apache.flink.testutils.junit.SharedObjects; import org.apache.flink.testutils.junit.SharedReference; import org.apache.flink.util.TestLogger; -import org.junit.After; -import org.junit.Before; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -55,29 +55,16 @@ import static org.hamcrest.Matchers.hasSize; public class SinkMetricsITCase extends TestLogger { private static final int DEFAULT_PARALLELISM = 4; @Rule public final SharedObjects sharedObjects = SharedObjects.create(); - private InMemoryReporter reporter; - - private MiniClusterWithClientResource miniClusterResource; - - @Before - public void setup() throws Exception { - reporter = InMemoryReporter.createWithRetainedMetrics(); - Configuration configuration = new Configuration(); - reporter.addToConfiguration(configuration); - miniClusterResource = - new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(1) - .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) - .setConfiguration(configuration) - .build()); - miniClusterResource.before(); - } + private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); - @After - public void teardown() { - miniClusterResource.after(); - } + @ClassRule + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .setConfiguration(reporter.addToConfiguration(new Configuration())) + .build()); @Test public void testMetrics() throws Exception { @@ -112,21 +99,23 @@ public class SinkMetricsITCase extends TestLogger { .sinkTo(TestSink.newBuilder().setWriter(new MetricWriter()).build()) .name("MetricTestSink"); JobClient jobClient = env.executeAsync(); + final JobID jobId = jobClient.getJobID(); beforeBarrier.get().await(); - assertSinkMetrics(stopAtRecord1, env.getParallelism(), numSplits); + assertSinkMetrics(jobId, stopAtRecord1, env.getParallelism(), numSplits); afterBarrier.get().await(); beforeBarrier.get().await(); - assertSinkMetrics(stopAtRecord2, env.getParallelism(), numSplits); + assertSinkMetrics(jobId, stopAtRecord2, env.getParallelism(), numSplits); afterBarrier.get().await(); jobClient.getJobExecutionResult().get(); } private void assertSinkMetrics( - long processedRecordsPerSubtask, int parallelism, int numSplits) { - List<OperatorMetricGroup> groups = reporter.findOperatorMetricGroups("MetricTestSink"); + JobID jobId, long processedRecordsPerSubtask, int parallelism, int numSplits) { + List<OperatorMetricGroup> groups = + reporter.findOperatorMetricGroups(jobId, "MetricTestSink"); assertThat(groups, hasSize(parallelism)); int subtaskWithMetrics = 0;