zentol closed pull request #7347: [FLINK-10761][metrics] Do not acquire lock for getAllVariables() URL: https://github.com/apache/flink/pull/7347
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index 4400b14a10d..fb321303222 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -111,17 +111,13 @@ public AbstractMetricGroup(MetricRegistry registry, String[] scope, A parent) { } public Map<String, String> getAllVariables() { - if (variables == null) { // avoid synchronization for common case - synchronized (this) { - if (variables == null) { - Map<String, String> tmpVariables = new HashMap<>(); - putVariables(tmpVariables); - if (parent != null) { // not true for Job-/TaskManagerMetricGroup - tmpVariables.putAll(parent.getAllVariables()); - } - variables = tmpVariables; - } + if (variables == null) { + Map<String, String> tmpVariables = new HashMap<>(); + putVariables(tmpVariables); + if (parent != null) { // not true for Job-/TaskManagerMetricGroup + tmpVariables.putAll(parent.getAllVariables()); } + variables = tmpVariables; } return variables; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index f3f8b42b851..d750f63cfd8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -21,17 +21,22 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.testutils.BlockerSync; import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.apache.flink.runtime.metrics.util.TestReporter; import org.junit.Test; +import javax.annotation.Nullable; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -252,4 +257,89 @@ public void testScopeGenerationWithoutReporters() throws Exception { testRegistry.shutdown().get(); } } + + @Test + public void testGetAllVariablesDoesNotDeadlock() throws InterruptedException { + final TestMetricRegistry registry = new TestMetricRegistry(); + + final MetricGroup parent = new GenericMetricGroup(registry, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), "parent"); + final MetricGroup child = parent.addGroup("child"); + + final Thread parentRegisteringThread = new Thread(() -> parent.counter("parent_counter")); + final Thread childRegisteringThread = new Thread(() -> child.counter("child_counter")); + + final BlockerSync parentSync = new BlockerSync(); + final BlockerSync childSync = new BlockerSync(); + + try { + // start both threads and have them block in the registry, so they acquire the lock of their respective group + registry.setOnRegistrationAction(childSync::blockNonInterruptible); + childRegisteringThread.start(); + childSync.awaitBlocker(); + + registry.setOnRegistrationAction(parentSync::blockNonInterruptible); + parentRegisteringThread.start(); + parentSync.awaitBlocker(); + + // the parent thread remains blocked to simulate the child thread holding some lock in the registry/reporter + // the child thread continues execution and calls getAllVariables() + // in the past this would block indefinitely since the method acquires the locks of all parent groups + childSync.releaseBlocker(); + // wait with a timeout to ensure the finally block is executed _at some point_, un-blocking the parent + childRegisteringThread.join(1000 * 10); + + parentSync.releaseBlocker(); + parentRegisteringThread.join(); + } finally { + parentSync.releaseBlocker(); + childSync.releaseBlocker(); + parentRegisteringThread.join(); + childRegisteringThread.join(); + } + } + + private static final class TestMetricRegistry implements MetricRegistry { + + private Runnable onRegistrationAction; + + void setOnRegistrationAction(Runnable onRegistrationAction) { + this.onRegistrationAction = onRegistrationAction; + } + + @Override + public char getDelimiter() { + return 0; + } + + @Override + public char getDelimiter(int index) { + return 0; + } + + @Override + public int getNumberReporters() { + return 0; + } + + @Override + public void register(Metric metric, String metricName, AbstractMetricGroup group) { + onRegistrationAction.run(); + group.getAllVariables(); + } + + @Override + public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { + } + + @Override + public ScopeFormats getScopeFormats() { + return null; + } + + @Nullable + @Override + public String getMetricQueryServicePath() { + return null; + } + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services