This is an automated email from the ASF dual-hosted git repository. jihoonson pushed a commit to branch 0.12.2 in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.2 by this push: new 0c1b82a Synchronize scheduled poll() calls in SQLMetadataSegmentManager (#6041) (#6056) 0c1b82a is described below commit 0c1b82ad73b6b6ec81d4411b4328ddf54e8dc016 Author: Jihoon Son <jihoon...@apache.org> AuthorDate: Fri Jul 27 13:16:26 2018 -0700 Synchronize scheduled poll() calls in SQLMetadataSegmentManager (#6041) (#6056) Similar issue to https://github.com/apache/incubator-druid/issues/6028. --- .../io/druid/metadata/SQLMetadataRuleManager.java | 14 ++-- .../druid/metadata/SQLMetadataSegmentManager.java | 96 ++++++++++++++++------ .../metadata/SQLMetadataSegmentManagerTest.java | 5 ++ 3 files changed, 82 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java index c1cb98f..1c00c68 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java @@ -148,10 +148,12 @@ public class SQLMetadataRuleManager implements MetadataRuleManager * This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent * the theoretical situation of two tasks scheduled in {@link #start()} calling {@link #poll()} concurrently, if * the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions occurs quickly. + * + * {@link SQLMetadataSegmentManager} also have a similar issue. */ private long currentStartOrder = -1; private ScheduledExecutorService exec = null; - private long retryStartTime = 0; + private long failStartTimeMs = 0; @Inject public SQLMetadataRuleManager( @@ -311,17 +313,17 @@ public class SQLMetadataRuleManager implements MetadataRuleManager log.info("Polled and found rules for %,d datasource(s)", newRules.size()); rules.set(newRules); - retryStartTime = 0; + failStartTimeMs = 0; } catch (Exception e) { - if (retryStartTime == 0) { - retryStartTime = System.currentTimeMillis(); + if (failStartTimeMs == 0) { + failStartTimeMs = System.currentTimeMillis(); } - if (System.currentTimeMillis() - retryStartTime > config.getAlertThreshold().toStandardDuration().getMillis()) { + if (System.currentTimeMillis() - failStartTimeMs > config.getAlertThreshold().toStandardDuration().getMillis()) { log.makeAlert(e, "Exception while polling for rules") .emit(); - retryStartTime = 0; + failStartTimeMs = 0; } else { log.error(e, "Exception while polling for rules"); } diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java index 36f628c..203bcea 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java @@ -29,11 +29,7 @@ import com.google.common.collect.Interner; import com.google.common.collect.Interners; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningScheduledExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.client.DruidDataSource; import io.druid.client.ImmutableDruidDataSource; import io.druid.guice.ManageLifecycle; @@ -44,6 +40,7 @@ import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -73,8 +70,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** @@ -85,10 +85,16 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager private static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner(); private static final EmittingLogger log = new EmittingLogger(SQLMetadataSegmentManager.class); - // Use to synchronize start() and stop(). These methods should be synchronized to prevent from being called at the - // same time if two different threads are calling them. This might be possible if a druid coordinator gets and drops - // leadership repeatedly in quick succession. - private final Object lock = new Object(); + /** + * Use to synchronize {@link #start()}, {@link #stop()}, {@link #poll()}, and {@link #isStarted()}. These methods + * should be synchronized to prevent from being called at the same time if two different threads are calling them. + * This might be possible if a druid coordinator gets and drops leadership repeatedly in quick succession. + */ + private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + /** {@link #poll()} and {@link #isStarted()} use readLock. */ + private final Lock readLock = readWriteLock.readLock(); + /** {@link #start()} and {@link #stop()} use writeLock. */ + private final Lock writeLock = readWriteLock.writeLock(); private final ObjectMapper jsonMapper; private final Supplier<MetadataSegmentManagerConfig> config; @@ -96,9 +102,21 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSourcesRef; private final SQLMetadataConnector connector; - private volatile ListeningScheduledExecutorService exec = null; - private volatile ListenableFuture<?> future = null; - private volatile boolean started; + /** The number of times this SQLMetadataSegmentManager was started. */ + private long startCount = 0; + /** + * Equal to the current {@link #startCount} value, if the SQLMetadataSegmentManager is currently started; -1 if + * currently stopped. + * + * This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent + * the theoretical situation of two or more tasks scheduled in {@link #start()} calling {@link #isStarted()} and + * {@link #poll()} concurrently, if the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions + * occurs quickly. + * + * {@link SQLMetadataRuleManager} also have a similar issue. + */ + private long currentStartOrder = -1; + private ScheduledExecutorService exec = null; @Inject public SQLMetadataSegmentManager( @@ -121,34 +139,52 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager @LifecycleStart public void start() { - synchronized (lock) { - if (started) { + writeLock.lock(); + try { + if (isStarted()) { return; } - exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d")); + startCount++; + currentStartOrder = startCount; + final long localStartOrder = currentStartOrder; + + exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"); final Duration delay = config.get().getPollDuration().toStandardDuration(); - future = exec.scheduleWithFixedDelay( + exec.scheduleWithFixedDelay( new Runnable() { @Override public void run() { + // poll() is synchronized together with start(), stop() and isStarted() to ensure that when stop() exists, + // poll() won't actually run anymore after that (it could only enter the syncrhonized section and exit + // immediately because the localStartedOrder doesn't match the new currentStartOrder). It's needed + // to avoid flakiness in SQLMetadataSegmentManagerTest. + // See https://github.com/apache/incubator-druid/issues/6028 + readLock.lock(); try { - poll(); + if (localStartOrder == currentStartOrder) { + poll(); + } } catch (Exception e) { log.makeAlert(e, "uncaught exception in segment manager polling thread").emit(); } + finally { + readLock.unlock(); + } } }, 0, delay.getMillis(), TimeUnit.MILLISECONDS ); - started = true; + } + finally { + writeLock.unlock(); } } @@ -156,8 +192,9 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager @LifecycleStop public void stop() { - synchronized (lock) { - if (!started) { + writeLock.lock(); + try { + if (!isStarted()) { return; } @@ -167,11 +204,12 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager current = dataSourcesRef.get(); } while (!dataSourcesRef.compareAndSet(current, emptyMap)); - future.cancel(false); - future = null; + currentStartOrder = -1; exec.shutdownNow(); exec = null; - started = false; + } + finally { + writeLock.unlock(); } } @@ -367,7 +405,15 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager @Override public boolean isStarted() { - return started; + // isStarted() is synchronized together with start(), stop() and poll() to ensure that the latest currentStartOrder + // is always visible. readLock should be used to avoid unexpected performance degradation of DruidCoordinator. + readLock.lock(); + try { + return currentStartOrder >= 0; + } + finally { + readLock.unlock(); + } } @Override @@ -421,10 +467,6 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager public void poll() { try { - if (!started) { - return; - } - ConcurrentHashMap<String, DruidDataSource> newDataSources = new ConcurrentHashMap<>(); log.debug("Starting polling of segment table"); diff --git a/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java b/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java index 9278945..b6cc0c7 100644 --- a/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java +++ b/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java @@ -118,6 +118,7 @@ public class SQLMetadataSegmentManagerTest { manager.start(); manager.poll(); + Assert.assertTrue(manager.isStarted()); Assert.assertEquals( ImmutableList.of("wikipedia"), manager.getAllDatasourceNames() @@ -149,6 +150,7 @@ public class SQLMetadataSegmentManagerTest EmittingLogger.registerEmitter(new NoopServiceEmitter()); manager.start(); manager.poll(); + Assert.assertTrue(manager.isStarted()); Assert.assertEquals( "wikipedia", Iterables.getOnlyElement(manager.getInventory()).getName() @@ -160,6 +162,7 @@ public class SQLMetadataSegmentManagerTest { manager.start(); manager.poll(); + Assert.assertTrue(manager.isStarted()); Assert.assertTrue(manager.removeDatasource("wikipedia")); Assert.assertEquals( @@ -178,6 +181,7 @@ public class SQLMetadataSegmentManagerTest { manager.start(); manager.poll(); + Assert.assertTrue(manager.isStarted()); final String newDataSource = "wikipedia2"; final DataSegment newSegment = new DataSegment( @@ -207,6 +211,7 @@ public class SQLMetadataSegmentManagerTest { manager.start(); manager.poll(); + Assert.assertTrue(manager.isStarted()); final String newDataSource = "wikipedia2"; final DataSegment newSegment = new DataSegment( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org