This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 85fc5c1  [FLINK-19901][metrics] Fix caching offset for variables
85fc5c1 is described below

commit 85fc5c1adb1a4e4fe2427f05f8493459099c1825
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Tue Nov 3 18:56:20 2020 +0100

    [FLINK-19901][metrics] Fix caching offset for variables
---
 .../metrics/groups/AbstractMetricGroup.java        |  9 ++++---
 .../metrics/groups/AbstractMetricGroupTest.java    | 30 +++++++++++++++++++++-
 2 files changed, 35 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 8f7521f..73cf6ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -120,11 +120,14 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
        }
 
        public Map<String, String> getAllVariables(int reporterIndex, 
Set<String> excludedVariables) {
-               // offset cache location to account for general cache at 
position 0
-               reporterIndex += 1;
                if (reporterIndex < 0 || reporterIndex >= 
logicalScopeStrings.length) {
-                       reporterIndex = 0;
+                       // invalid reporter index; either a programming 
mistake, or we try to retrieve variables outside of a reporter
+                       reporterIndex = -1;
                }
+
+               // offset cache location to account for general cache at 
position 0
+               reporterIndex += 1;
+
                // if no variables are excluded (which is the default!) we 
re-use the general variables map to save space
                return internalGetAllVariables(excludedVariables.isEmpty() ? 0 
: reporterIndex, excludedVariables);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index 2bbfae3..b14c452 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -43,8 +43,10 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -92,6 +94,27 @@ public class AbstractMetricGroupTest extends TestLogger {
                assertEquals(group.getAllVariables(-1, 
Collections.singleton(ScopeFormat.SCOPE_HOST)).size(), 0);
        }
 
+       @Test
+       public void testGetAllVariablesWithExclusionsForReporters() {
+               TestMetricRegistry registry = new TestMetricRegistry();
+               registry.setNumReporters(2);
+
+               AbstractMetricGroup<?> group = new GenericMetricGroup(registry, 
null, "test") {
+                       @Override
+                       protected void putVariables(Map<String, String> 
variables) {
+                               variables.put("k1", "v1");
+                               variables.put("k2", "v2");
+                       }
+               };
+
+               group.getAllVariables(-1, Collections.emptySet());
+
+               assertThat(group.getAllVariables(0, 
Collections.singleton("k1")), not(IsMapContaining.hasKey("k1")));
+               assertThat(group.getAllVariables(0, 
Collections.singleton("k1")), IsMapContaining.hasKey("k2"));
+               assertThat(group.getAllVariables(1, 
Collections.singleton("k2")), IsMapContaining.hasKey("k1"));
+               assertThat(group.getAllVariables(1, 
Collections.singleton("k2")), not(IsMapContaining.hasKey("k2")));
+       }
+
        // 
========================================================================
        // Scope Caching
        // 
========================================================================
@@ -337,6 +360,11 @@ public class AbstractMetricGroupTest extends TestLogger {
        private static final class TestMetricRegistry implements MetricRegistry 
{
 
                private Runnable onRegistrationAction;
+               private int numReporters = 0;
+
+               void setNumReporters(int numReporters) {
+                       this.numReporters = numReporters;
+               }
 
                void setOnRegistrationAction(Runnable onRegistrationAction) {
                        this.onRegistrationAction = onRegistrationAction;
@@ -349,7 +377,7 @@ public class AbstractMetricGroupTest extends TestLogger {
 
                @Override
                public int getNumberReporters() {
-                       return 0;
+                       return this.numReporters;
                }
 
                @Override

Reply via email to