This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new 85fc5c1 [FLINK-19901][metrics] Fix caching offset for variables 85fc5c1 is described below commit 85fc5c1adb1a4e4fe2427f05f8493459099c1825 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Tue Nov 3 18:56:20 2020 +0100 [FLINK-19901][metrics] Fix caching offset for variables --- .../metrics/groups/AbstractMetricGroup.java | 9 ++++--- .../metrics/groups/AbstractMetricGroupTest.java | 30 +++++++++++++++++++++- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index 8f7521f..73cf6ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -120,11 +120,14 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl } public Map<String, String> getAllVariables(int reporterIndex, Set<String> excludedVariables) { - // offset cache location to account for general cache at position 0 - reporterIndex += 1; if (reporterIndex < 0 || reporterIndex >= logicalScopeStrings.length) { - reporterIndex = 0; + // invalid reporter index; either a programming mistake, or we try to retrieve variables outside of a reporter + reporterIndex = -1; } + + // offset cache location to account for general cache at position 0 + reporterIndex += 1; + // if no variables are excluded (which is the default!) we re-use the general variables map to save space return internalGetAllVariables(excludedVariables.isEmpty() ? 0 : reporterIndex, excludedVariables); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index 2bbfae3..b14c452 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -43,8 +43,10 @@ import org.junit.Test; import java.util.Arrays; import java.util.Collections; +import java.util.Map; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsNot.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -92,6 +94,27 @@ public class AbstractMetricGroupTest extends TestLogger { assertEquals(group.getAllVariables(-1, Collections.singleton(ScopeFormat.SCOPE_HOST)).size(), 0); } + @Test + public void testGetAllVariablesWithExclusionsForReporters() { + TestMetricRegistry registry = new TestMetricRegistry(); + registry.setNumReporters(2); + + AbstractMetricGroup<?> group = new GenericMetricGroup(registry, null, "test") { + @Override + protected void putVariables(Map<String, String> variables) { + variables.put("k1", "v1"); + variables.put("k2", "v2"); + } + }; + + group.getAllVariables(-1, Collections.emptySet()); + + assertThat(group.getAllVariables(0, Collections.singleton("k1")), not(IsMapContaining.hasKey("k1"))); + assertThat(group.getAllVariables(0, Collections.singleton("k1")), IsMapContaining.hasKey("k2")); + assertThat(group.getAllVariables(1, Collections.singleton("k2")), IsMapContaining.hasKey("k1")); + assertThat(group.getAllVariables(1, Collections.singleton("k2")), not(IsMapContaining.hasKey("k2"))); + } + // ======================================================================== // Scope Caching // ======================================================================== @@ -337,6 +360,11 @@ public class AbstractMetricGroupTest extends TestLogger { private static final class TestMetricRegistry implements MetricRegistry { private Runnable onRegistrationAction; + private int numReporters = 0; + + void setNumReporters(int numReporters) { + this.numReporters = numReporters; + } void setOnRegistrationAction(Runnable onRegistrationAction) { this.onRegistrationAction = onRegistrationAction; @@ -349,7 +377,7 @@ public class AbstractMetricGroupTest extends TestLogger { @Override public int getNumberReporters() { - return 0; + return this.numReporters; } @Override