zentol closed pull request #7347: [FLINK-10761][metrics] Do not acquire lock 
for getAllVariables()
URL: https://github.com/apache/flink/pull/7347
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 4400b14a10d..fb321303222 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -111,17 +111,13 @@ public AbstractMetricGroup(MetricRegistry registry, 
String[] scope, A parent) {
        }
 
        public Map<String, String> getAllVariables() {
-               if (variables == null) { // avoid synchronization for common 
case
-                       synchronized (this) {
-                               if (variables == null) {
-                                       Map<String, String> tmpVariables = new 
HashMap<>();
-                                       putVariables(tmpVariables);
-                                       if (parent != null) { // not true for 
Job-/TaskManagerMetricGroup
-                                               
tmpVariables.putAll(parent.getAllVariables());
-                                       }
-                                       variables = tmpVariables;
-                               }
+               if (variables == null) {
+                       Map<String, String> tmpVariables = new HashMap<>();
+                       putVariables(tmpVariables);
+                       if (parent != null) { // not true for 
Job-/TaskManagerMetricGroup
+                               tmpVariables.putAll(parent.getAllVariables());
                        }
+                       variables = tmpVariables;
                }
                return variables;
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index f3f8b42b851..d750f63cfd8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -21,17 +21,22 @@
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.testutils.BlockerSync;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.runtime.metrics.util.TestReporter;
 
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -252,4 +257,89 @@ public void testScopeGenerationWithoutReporters() throws 
Exception {
                        testRegistry.shutdown().get();
                }
        }
+
+       @Test
+       public void testGetAllVariablesDoesNotDeadlock() throws 
InterruptedException {
+               final TestMetricRegistry registry = new TestMetricRegistry();
+
+               final MetricGroup parent = new GenericMetricGroup(registry, 
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), "parent");
+               final MetricGroup child = parent.addGroup("child");
+
+               final Thread parentRegisteringThread = new Thread(() -> 
parent.counter("parent_counter"));
+               final Thread childRegisteringThread = new Thread(() -> 
child.counter("child_counter"));
+
+               final BlockerSync parentSync = new BlockerSync();
+               final BlockerSync childSync = new BlockerSync();
+
+               try {
+                       // start both threads and have them block in the 
registry, so they acquire the lock of their respective group
+                       
registry.setOnRegistrationAction(childSync::blockNonInterruptible);
+                       childRegisteringThread.start();
+                       childSync.awaitBlocker();
+
+                       
registry.setOnRegistrationAction(parentSync::blockNonInterruptible);
+                       parentRegisteringThread.start();
+                       parentSync.awaitBlocker();
+
+                       // the parent thread remains blocked to simulate the 
child thread holding some lock in the registry/reporter
+                       // the child thread continues execution and calls 
getAllVariables()
+                       // in the past this would block indefinitely since the 
method acquires the locks of all parent groups
+                       childSync.releaseBlocker();
+                       // wait with a timeout to ensure the finally block is 
executed _at some point_, un-blocking the parent
+                       childRegisteringThread.join(1000 * 10);
+
+                       parentSync.releaseBlocker();
+                       parentRegisteringThread.join();
+               } finally {
+                       parentSync.releaseBlocker();
+                       childSync.releaseBlocker();
+                       parentRegisteringThread.join();
+                       childRegisteringThread.join();
+               }
+       }
+
+       private static final class TestMetricRegistry implements MetricRegistry 
{
+
+               private Runnable onRegistrationAction;
+
+               void setOnRegistrationAction(Runnable onRegistrationAction) {
+                       this.onRegistrationAction = onRegistrationAction;
+               }
+
+               @Override
+               public char getDelimiter() {
+                       return 0;
+               }
+
+               @Override
+               public char getDelimiter(int index) {
+                       return 0;
+               }
+
+               @Override
+               public int getNumberReporters() {
+                       return 0;
+               }
+
+               @Override
+               public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
+                       onRegistrationAction.run();
+                       group.getAllVariables();
+               }
+
+               @Override
+               public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
+               }
+
+               @Override
+               public ScopeFormats getScopeFormats() {
+                       return null;
+               }
+
+               @Nullable
+               @Override
+               public String getMetricQueryServicePath() {
+                       return null;
+               }
+       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to