This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3aa267b  [FLINK-26076] Fix ArchUnit violations in 
Source(Sink)MetricsITCase
3aa267b is described below

commit 3aa267bf511287e39b0a9d781b9aaf38843e1e91
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Thu Feb 10 16:15:04 2022 +0100

    [FLINK-26076] Fix ArchUnit violations in Source(Sink)MetricsITCase
    
    This closes #18709
---
 .../8bad5118-af5d-4976-ac57-382ed16f7f7e           |  1 -
 .../base/source/reader/SourceMetricsITCase.java    | 42 ++++++++------------
 .../flink/runtime/testutils/InMemoryReporter.java  | 41 ++++++++++++--------
 .../test/streaming/runtime/SinkMetricsITCase.java  | 45 ++++++++--------------
 4 files changed, 60 insertions(+), 69 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/archunit-violations/8bad5118-af5d-4976-ac57-382ed16f7f7e
 
b/flink-connectors/flink-connector-base/archunit-violations/8bad5118-af5d-4976-ac57-382ed16f7f7e
index e82c8cd..1f0ea28 100644
--- 
a/flink-connectors/flink-connector-base/archunit-violations/8bad5118-af5d-4976-ac57-382ed16f7f7e
+++ 
b/flink-connectors/flink-connector-base/archunit-violations/8bad5118-af5d-4976-ac57-382ed16f7f7e
@@ -1,4 +1,3 @@
 org.apache.flink.connector.base.sink.AsyncSinkBaseITCase does not satisfy: 
contain any fields that are public, static, and of type MiniClusterExtension 
and final and contain any fields that are public, static, and of type 
AllCallbackWrapper and final and annotated with @RegisterExtension or contain 
any fields that are public, static, and of type MiniClusterWithClientResource 
and final and annotated with @ClassRule or contain any fields that is of type 
MiniClusterWithClientResource and pu [...]
 org.apache.flink.connector.base.source.reader.CoordinatedSourceITCase does not 
satisfy: contain any fields that are public, static, and of type 
MiniClusterExtension and final and contain any fields that are public, static, 
and of type AllCallbackWrapper and final and annotated with @RegisterExtension 
or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientRe [...]
 org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase 
does not satisfy: contain any fields that are public, static, and of type 
MiniClusterExtension and final and contain any fields that are public, static, 
and of type AllCallbackWrapper and final and annotated with @RegisterExtension 
or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithC [...]
-org.apache.flink.connector.base.source.reader.SourceMetricsITCase does not 
satisfy: contain any fields that are public, static, and of type 
MiniClusterExtension and final and contain any fields that are public, static, 
and of type AllCallbackWrapper and final and annotated with @RegisterExtension 
or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResour [...]
\ No newline at end of file
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
index 508e1ef..d298a26 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks;
 import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
@@ -44,8 +45,7 @@ import org.apache.flink.testutils.junit.SharedReference;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Matcher;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -74,29 +74,16 @@ public class SourceMetricsITCase extends TestLogger {
     // this basically is the time a build is allowed to be frozen before the 
test fails
     private static final long WATERMARK_EPSILON = 
Duration.ofHours(6).toMillis();
     @Rule public final SharedObjects sharedObjects = SharedObjects.create();
-    private InMemoryReporter reporter;
+    private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
 
-    private MiniClusterWithClientResource miniClusterResource;
-
-    @Before
-    public void setup() throws Exception {
-        reporter = InMemoryReporter.createWithRetainedMetrics();
-        Configuration configuration = new Configuration();
-        reporter.addToConfiguration(configuration);
-        miniClusterResource =
-                new MiniClusterWithClientResource(
-                        new MiniClusterResourceConfiguration.Builder()
-                                .setNumberTaskManagers(1)
-                                
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
-                                .setConfiguration(configuration)
-                                .build());
-        miniClusterResource.before();
-    }
-
-    @After
-    public void teardown() {
-        miniClusterResource.after();
-    }
+    @ClassRule
+    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+                            .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+                            .build());
 
     @Test
     public void testMetricsWithTimestamp() throws Exception {
@@ -145,9 +132,11 @@ public class SourceMetricsITCase extends TestLogger {
                                 });
         stream.addSink(new DiscardingSink<>());
         JobClient jobClient = env.executeAsync();
+        final JobID jobId = jobClient.getJobID();
 
         beforeBarrier.get().await();
         assertSourceMetrics(
+                jobId,
                 reporter,
                 stopAtRecord1 + 1,
                 numRecordsPerSplit,
@@ -158,6 +147,7 @@ public class SourceMetricsITCase extends TestLogger {
 
         beforeBarrier.get().await();
         assertSourceMetrics(
+                jobId,
                 reporter,
                 stopAtRecord2 + 1,
                 numRecordsPerSplit,
@@ -170,13 +160,15 @@ public class SourceMetricsITCase extends TestLogger {
     }
 
     private void assertSourceMetrics(
+            JobID jobId,
             InMemoryReporter reporter,
             long processedRecordsPerSubtask,
             long numTotalPerSubtask,
             int parallelism,
             int numSplits,
             boolean hasTimestamps) {
-        List<OperatorMetricGroup> groups = 
reporter.findOperatorMetricGroups("MetricTestingSource");
+        List<OperatorMetricGroup> groups =
+                reporter.findOperatorMetricGroups(jobId, 
"MetricTestingSource");
         assertThat(groups, hasSize(parallelism));
 
         int subtaskWithMetrics = 0;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
index 2254841..1a992e8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.LogicalScopeProvider;
@@ -27,6 +28,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +43,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -102,9 +105,9 @@ public class InMemoryReporter implements MetricReporter {
         }
     }
 
-    public Map<String, Metric> getMetricsByIdentifiers() {
+    public Map<String, Metric> getMetricsByIdentifiers(JobID jobId) {
         synchronized (this) {
-            return getMetricStream().collect(Collectors.toMap(Entry::getKey, 
Entry::getValue));
+            return 
getMetricStream(jobId).collect(Collectors.toMap(Entry::getKey, 
Entry::getValue));
         }
     }
 
@@ -123,16 +126,16 @@ public class InMemoryReporter implements MetricReporter {
         }
     }
 
-    public Map<String, Metric> findMetrics(String identifierPattern) {
+    public Map<String, Metric> findMetrics(JobID jobId, String 
identifierPattern) {
         synchronized (this) {
-            return getMetricStream(identifierPattern)
+            return getMetricStream(jobId, identifierPattern)
                     .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
         }
     }
 
-    public Optional<Metric> findMetric(String patternString) {
+    public Optional<Metric> findMetric(JobID jobId, String patternString) {
         synchronized (this) {
-            return 
getMetricStream(patternString).map(Entry::getValue).findFirst();
+            return getMetricStream(jobId, 
patternString).map(Entry::getValue).findFirst();
         }
     }
 
@@ -148,14 +151,15 @@ public class InMemoryReporter implements MetricReporter {
         }
     }
 
-    public List<OperatorMetricGroup> findOperatorMetricGroups(String 
operatorPattern) {
+    public List<OperatorMetricGroup> findOperatorMetricGroups(JobID jobId, 
String operatorPattern) {
         Pattern pattern = Pattern.compile(operatorPattern);
         synchronized (this) {
             return metrics.keySet().stream()
                     .filter(
                             g ->
                                     g instanceof OperatorMetricGroup
-                                            && 
pattern.matcher(getOperatorName(g)).find())
+                                            && 
pattern.matcher(getOperatorName(g)).find()
+                                            && 
getJobId(g).equals(jobId.toString()))
                     .map(OperatorMetricGroup.class::cast)
                     .sorted(Comparator.comparing(this::getSubtaskId))
                     .collect(Collectors.toList());
@@ -163,11 +167,15 @@ public class InMemoryReporter implements MetricReporter {
     }
 
     private String getSubtaskId(OperatorMetricGroup g) {
-        return g.getScopeComponents()[g.getScopeComponents().length - 1];
+        return g.getAllVariables().get(ScopeFormat.SCOPE_TASK_SUBTASK_INDEX);
     }
 
     private String getOperatorName(MetricGroup g) {
-        return g.getScopeComponents()[g.getScopeComponents().length - 2];
+        return g.getAllVariables().get(ScopeFormat.SCOPE_OPERATOR_NAME);
+    }
+
+    private String getJobId(MetricGroup g) {
+        return g.getAllVariables().get(ScopeFormat.SCOPE_JOB_ID);
     }
 
     @Override
@@ -195,13 +203,15 @@ public class InMemoryReporter implements MetricReporter {
         }
     }
 
-    private Stream<Entry<String, Metric>> getMetricStream(String 
identifierPattern) {
+    private Stream<Entry<String, Metric>> getMetricStream(JobID jobID, String 
identifierPattern) {
         Pattern pattern = Pattern.compile(identifierPattern);
-        return getMetricStream().filter(m -> 
pattern.matcher(m.getKey()).find());
+        return getMetricStream(jobID).filter(m -> 
pattern.matcher(m.getKey()).find());
     }
 
-    private Stream<Entry<String, Metric>> getMetricStream() {
-        return metrics.entrySet().stream().flatMap(this::getGroupMetricStream);
+    private Stream<Entry<String, Metric>> getMetricStream(JobID jobId) {
+        return metrics.entrySet().stream()
+                .filter(gr -> Objects.equals(getJobId(gr.getKey()), 
jobId.toString()))
+                .flatMap(this::getGroupMetricStream);
     }
 
     private Stream<MetricGroup> getGroupStream(String groupPattern) {
@@ -231,7 +241,7 @@ public class InMemoryReporter implements MetricReporter {
                 : group;
     }
 
-    public void addToConfiguration(Configuration configuration) {
+    public Configuration addToConfiguration(Configuration configuration) {
         configuration.setString(
                 ConfigConstants.METRICS_REPORTER_PREFIX
                         + "mini_cluster_resource_reporter."
@@ -240,6 +250,7 @@ public class InMemoryReporter implements MetricReporter {
         configuration.setString(
                 ConfigConstants.METRICS_REPORTER_PREFIX + 
"mini_cluster_resource_reporter." + ID,
                 id.toString());
+        return configuration;
     }
 
     /** The factory for the {@link InMemoryReporter}. */
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index f91a013..e2e4203 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.test.streaming.runtime;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.configuration.Configuration;
@@ -35,8 +36,7 @@ import org.apache.flink.testutils.junit.SharedObjects;
 import org.apache.flink.testutils.junit.SharedReference;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
-import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -55,29 +55,16 @@ import static org.hamcrest.Matchers.hasSize;
 public class SinkMetricsITCase extends TestLogger {
     private static final int DEFAULT_PARALLELISM = 4;
     @Rule public final SharedObjects sharedObjects = SharedObjects.create();
-    private InMemoryReporter reporter;
-
-    private MiniClusterWithClientResource miniClusterResource;
-
-    @Before
-    public void setup() throws Exception {
-        reporter = InMemoryReporter.createWithRetainedMetrics();
-        Configuration configuration = new Configuration();
-        reporter.addToConfiguration(configuration);
-        miniClusterResource =
-                new MiniClusterWithClientResource(
-                        new MiniClusterResourceConfiguration.Builder()
-                                .setNumberTaskManagers(1)
-                                
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
-                                .setConfiguration(configuration)
-                                .build());
-        miniClusterResource.before();
-    }
+    private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
 
-    @After
-    public void teardown() {
-        miniClusterResource.after();
-    }
+    @ClassRule
+    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+                            .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+                            .build());
 
     @Test
     public void testMetrics() throws Exception {
@@ -112,21 +99,23 @@ public class SinkMetricsITCase extends TestLogger {
                 .sinkTo(TestSink.newBuilder().setWriter(new 
MetricWriter()).build())
                 .name("MetricTestSink");
         JobClient jobClient = env.executeAsync();
+        final JobID jobId = jobClient.getJobID();
 
         beforeBarrier.get().await();
-        assertSinkMetrics(stopAtRecord1, env.getParallelism(), numSplits);
+        assertSinkMetrics(jobId, stopAtRecord1, env.getParallelism(), 
numSplits);
         afterBarrier.get().await();
 
         beforeBarrier.get().await();
-        assertSinkMetrics(stopAtRecord2, env.getParallelism(), numSplits);
+        assertSinkMetrics(jobId, stopAtRecord2, env.getParallelism(), 
numSplits);
         afterBarrier.get().await();
 
         jobClient.getJobExecutionResult().get();
     }
 
     private void assertSinkMetrics(
-            long processedRecordsPerSubtask, int parallelism, int numSplits) {
-        List<OperatorMetricGroup> groups = 
reporter.findOperatorMetricGroups("MetricTestSink");
+            JobID jobId, long processedRecordsPerSubtask, int parallelism, int 
numSplits) {
+        List<OperatorMetricGroup> groups =
+                reporter.findOperatorMetricGroups(jobId, "MetricTestSink");
         assertThat(groups, hasSize(parallelism));
 
         int subtaskWithMetrics = 0;

Reply via email to