This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.2 by this push:
     new 0c1b82a  Synchronize scheduled poll() calls in 
SQLMetadataSegmentManager (#6041) (#6056)
0c1b82a is described below

commit 0c1b82ad73b6b6ec81d4411b4328ddf54e8dc016
Author: Jihoon Son <jihoon...@apache.org>
AuthorDate: Fri Jul 27 13:16:26 2018 -0700

    Synchronize scheduled poll() calls in SQLMetadataSegmentManager (#6041) 
(#6056)
    
    Similar issue to https://github.com/apache/incubator-druid/issues/6028.
---
 .../io/druid/metadata/SQLMetadataRuleManager.java  | 14 ++--
 .../druid/metadata/SQLMetadataSegmentManager.java  | 96 ++++++++++++++++------
 .../metadata/SQLMetadataSegmentManagerTest.java    |  5 ++
 3 files changed, 82 insertions(+), 33 deletions(-)

diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java 
b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
index c1cb98f..1c00c68 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
@@ -148,10 +148,12 @@ public class SQLMetadataRuleManager implements 
MetadataRuleManager
    * This field is used to implement a simple stamp mechanism instead of just 
a boolean "started" flag to prevent
    * the theoretical situation of two tasks scheduled in {@link #start()} 
calling {@link #poll()} concurrently, if
    * the sequence of {@link #start()} - {@link #stop()} - {@link #start()} 
actions occurs quickly.
+   *
+   * {@link SQLMetadataSegmentManager} also have a similar issue.
    */
   private long currentStartOrder = -1;
   private ScheduledExecutorService exec = null;
-  private long retryStartTime = 0;
+  private long failStartTimeMs = 0;
 
   @Inject
   public SQLMetadataRuleManager(
@@ -311,17 +313,17 @@ public class SQLMetadataRuleManager implements 
MetadataRuleManager
       log.info("Polled and found rules for %,d datasource(s)", 
newRules.size());
 
       rules.set(newRules);
-      retryStartTime = 0;
+      failStartTimeMs = 0;
     }
     catch (Exception e) {
-      if (retryStartTime == 0) {
-        retryStartTime = System.currentTimeMillis();
+      if (failStartTimeMs == 0) {
+        failStartTimeMs = System.currentTimeMillis();
       }
 
-      if (System.currentTimeMillis() - retryStartTime > 
config.getAlertThreshold().toStandardDuration().getMillis()) {
+      if (System.currentTimeMillis() - failStartTimeMs > 
config.getAlertThreshold().toStandardDuration().getMillis()) {
         log.makeAlert(e, "Exception while polling for rules")
            .emit();
-        retryStartTime = 0;
+        failStartTimeMs = 0;
       } else {
         log.error(e, "Exception while polling for rules");
       }
diff --git 
a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java 
b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java
index 36f628c..203bcea 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java
@@ -29,11 +29,7 @@ import com.google.common.collect.Interner;
 import com.google.common.collect.Interners;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
-import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.client.DruidDataSource;
 import io.druid.client.ImmutableDruidDataSource;
 import io.druid.guice.ManageLifecycle;
@@ -44,6 +40,7 @@ import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.lifecycle.LifecycleStart;
 import io.druid.java.util.common.lifecycle.LifecycleStop;
+import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.timeline.DataSegment;
 import io.druid.timeline.TimelineObjectHolder;
 import io.druid.timeline.VersionedIntervalTimeline;
@@ -73,8 +70,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 /**
@@ -85,10 +85,16 @@ public class SQLMetadataSegmentManager implements 
MetadataSegmentManager
   private static final Interner<DataSegment> DATA_SEGMENT_INTERNER = 
Interners.newWeakInterner();
   private static final EmittingLogger log = new 
EmittingLogger(SQLMetadataSegmentManager.class);
 
-  // Use to synchronize start() and stop(). These methods should be 
synchronized to prevent from being called at the
-  // same time if two different threads are calling them. This might be 
possible if a druid coordinator gets and drops
-  // leadership repeatedly in quick succession.
-  private final Object lock = new Object();
+  /**
+   * Use to synchronize {@link #start()}, {@link #stop()}, {@link #poll()}, 
and {@link #isStarted()}. These methods
+   * should be synchronized to prevent from being called at the same time if 
two different threads are calling them.
+   * This might be possible if a druid coordinator gets and drops leadership 
repeatedly in quick succession.
+   */
+  private final ReentrantReadWriteLock readWriteLock = new 
ReentrantReadWriteLock();
+  /** {@link #poll()} and {@link #isStarted()} use readLock. */
+  private final Lock readLock = readWriteLock.readLock();
+  /** {@link #start()} and {@link #stop()} use writeLock. */
+  private final Lock writeLock = readWriteLock.writeLock();
 
   private final ObjectMapper jsonMapper;
   private final Supplier<MetadataSegmentManagerConfig> config;
@@ -96,9 +102,21 @@ public class SQLMetadataSegmentManager implements 
MetadataSegmentManager
   private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> 
dataSourcesRef;
   private final SQLMetadataConnector connector;
 
-  private volatile ListeningScheduledExecutorService exec = null;
-  private volatile ListenableFuture<?> future = null;
-  private volatile boolean started;
+  /** The number of times this SQLMetadataSegmentManager was started. */
+  private long startCount = 0;
+  /**
+   * Equal to the current {@link #startCount} value, if the 
SQLMetadataSegmentManager is currently started; -1 if
+   * currently stopped.
+   *
+   * This field is used to implement a simple stamp mechanism instead of just 
a boolean "started" flag to prevent
+   * the theoretical situation of two or more tasks scheduled in {@link 
#start()} calling {@link #isStarted()} and
+   * {@link #poll()} concurrently, if the sequence of {@link #start()} - 
{@link #stop()} - {@link #start()} actions
+   * occurs quickly.
+   *
+   * {@link SQLMetadataRuleManager} also have a similar issue.
+   */
+  private long currentStartOrder = -1;
+  private ScheduledExecutorService exec = null;
 
   @Inject
   public SQLMetadataSegmentManager(
@@ -121,34 +139,52 @@ public class SQLMetadataSegmentManager implements 
MetadataSegmentManager
   @LifecycleStart
   public void start()
   {
-    synchronized (lock) {
-      if (started) {
+    writeLock.lock();
+    try {
+      if (isStarted()) {
         return;
       }
 
-      exec = 
MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"));
+      startCount++;
+      currentStartOrder = startCount;
+      final long localStartOrder = currentStartOrder;
+
+      exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d");
 
       final Duration delay = 
config.get().getPollDuration().toStandardDuration();
-      future = exec.scheduleWithFixedDelay(
+      exec.scheduleWithFixedDelay(
           new Runnable()
           {
             @Override
             public void run()
             {
+              // poll() is synchronized together with start(), stop() and 
isStarted() to ensure that when stop() exists,
+              // poll() won't actually run anymore after that (it could only 
enter the syncrhonized section and exit
+              // immediately because the localStartedOrder doesn't match the 
new currentStartOrder). It's needed
+              // to avoid flakiness in SQLMetadataSegmentManagerTest.
+              // See https://github.com/apache/incubator-druid/issues/6028
+              readLock.lock();
               try {
-                poll();
+                if (localStartOrder == currentStartOrder) {
+                  poll();
+                }
               }
               catch (Exception e) {
                 log.makeAlert(e, "uncaught exception in segment manager 
polling thread").emit();
 
               }
+              finally {
+                readLock.unlock();
+              }
             }
           },
           0,
           delay.getMillis(),
           TimeUnit.MILLISECONDS
       );
-      started = true;
+    }
+    finally {
+      writeLock.unlock();
     }
   }
 
@@ -156,8 +192,9 @@ public class SQLMetadataSegmentManager implements 
MetadataSegmentManager
   @LifecycleStop
   public void stop()
   {
-    synchronized (lock) {
-      if (!started) {
+    writeLock.lock();
+    try {
+      if (!isStarted()) {
         return;
       }
 
@@ -167,11 +204,12 @@ public class SQLMetadataSegmentManager implements 
MetadataSegmentManager
         current = dataSourcesRef.get();
       } while (!dataSourcesRef.compareAndSet(current, emptyMap));
 
-      future.cancel(false);
-      future = null;
+      currentStartOrder = -1;
       exec.shutdownNow();
       exec = null;
-      started = false;
+    }
+    finally {
+      writeLock.unlock();
     }
   }
 
@@ -367,7 +405,15 @@ public class SQLMetadataSegmentManager implements 
MetadataSegmentManager
   @Override
   public boolean isStarted()
   {
-    return started;
+    // isStarted() is synchronized together with start(), stop() and poll() to 
ensure that the latest currentStartOrder
+    // is always visible. readLock should be used to avoid unexpected 
performance degradation of DruidCoordinator.
+    readLock.lock();
+    try {
+      return currentStartOrder >= 0;
+    }
+    finally {
+      readLock.unlock();
+    }
   }
 
   @Override
@@ -421,10 +467,6 @@ public class SQLMetadataSegmentManager implements 
MetadataSegmentManager
   public void poll()
   {
     try {
-      if (!started) {
-        return;
-      }
-
       ConcurrentHashMap<String, DruidDataSource> newDataSources = new 
ConcurrentHashMap<>();
 
       log.debug("Starting polling of segment table");
diff --git 
a/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java 
b/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java
index 9278945..b6cc0c7 100644
--- a/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java
+++ b/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java
@@ -118,6 +118,7 @@ public class SQLMetadataSegmentManagerTest
   {
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
     Assert.assertEquals(
         ImmutableList.of("wikipedia"),
         manager.getAllDatasourceNames()
@@ -149,6 +150,7 @@ public class SQLMetadataSegmentManagerTest
     EmittingLogger.registerEmitter(new NoopServiceEmitter());
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
 
     Assert.assertEquals(
         "wikipedia", Iterables.getOnlyElement(manager.getInventory()).getName()
@@ -160,6 +162,7 @@ public class SQLMetadataSegmentManagerTest
   {
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
     Assert.assertTrue(manager.removeDatasource("wikipedia"));
 
     Assert.assertEquals(
@@ -178,6 +181,7 @@ public class SQLMetadataSegmentManagerTest
   {
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
 
     final String newDataSource = "wikipedia2";
     final DataSegment newSegment = new DataSegment(
@@ -207,6 +211,7 @@ public class SQLMetadataSegmentManagerTest
   {
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
 
     final String newDataSource = "wikipedia2";
     final DataSegment newSegment = new DataSegment(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to