Repository: incubator-distributedlog Updated Branches: refs/heads/master 643963ce7 -> a5dd5adce
DL-21: Fix DL flaky test cases DL-21: Fixed a few DL flaky test cases. Author: Yiming Zang <[email protected]> Reviewers: Sijie Guo <[email protected]> Closes #5 from yzang/yzang/fix_flaky_test Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/a5dd5adc Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/a5dd5adc Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/a5dd5adc Branch: refs/heads/master Commit: a5dd5adce87bee8bbce9faefc25f29c5b8b4d7df Parents: 643963c Author: Yiming Zang <[email protected]> Authored: Tue Aug 16 12:02:06 2016 -0700 Committer: Sijie Guo <[email protected]> Committed: Tue Aug 16 12:02:06 2016 -0700 ---------------------------------------------------------------------- .../distributedlog/BKDistributedLogManager.java | 2 ++ .../distributedlog/TestAsyncReaderWriter.java | 5 ++- .../distributedlog/TestNonBlockingReads.java | 38 ++++++++++++++------ .../config/TestConfigurationSubscription.java | 16 ++++++++- ...TestDynamicConfigurationFeatureProvider.java | 16 +++++++++ .../service/TestDistributedLogService.java | 10 +++--- 6 files changed, 71 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java index fd8ec2d..9c19381 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java @@ -969,6 +969,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL false, dynConf.getDeserializeRecordSetOnReads(), statsLogger); + pendingReaders.add(reader); return Future.value(reader); } @@ -1095,6 +1096,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL true, dynConf.getDeserializeRecordSetOnReads(), statsLogger); + pendingReaders.add(asyncReader); return new BKSyncLogReaderDLSN(conf, asyncReader, scheduler, fromTxnId); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java index a6a89ba..06cf079 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java @@ -885,8 +885,11 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { } } + /** + * Flaky test fixed: readers need to be added to the pendingReaders + * @throws Exception + */ @Test(timeout = 300000) - @DistributedLogAnnotations.FlakyTest public void testSimpleAsyncReadWriteSimulateErrors() throws Exception { String name = runtime.getMethodName(); DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java index 90a33e8..58863c4 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java @@ -17,6 +17,7 @@ */ package com.twitter.distributedlog; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -33,13 +34,12 @@ import static org.junit.Assert.*; public class TestNonBlockingReads extends TestDistributedLogBase { static final Logger LOG = LoggerFactory.getLogger(TestNonBlockingReads.class); - // TODO: investigate why long poll read makes test flaky static { conf.setOutputBufferSize(0); conf.setImmediateFlushEnabled(true); } - @Test(timeout = 60000) + @Test(timeout = 100000) public void testNonBlockingRead() throws Exception { String name = "distrlog-non-blocking-reader"; final DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); @@ -49,9 +49,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase { confLocal.setReaderIdleWarnThresholdMillis(100); final DistributedLogManager dlm = createNewDLM(confLocal, name); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + ScheduledFuture writerClosedFuture = null; try { final Thread currentThread = Thread.currentThread(); - executor.schedule( + writerClosedFuture = executor.schedule( new Runnable() { @Override public void run() { @@ -67,12 +68,16 @@ public class TestNonBlockingReads extends TestDistributedLogBase { readNonBlocking(dlm, false); assertFalse(currentThread.isInterrupted()); } finally { + if (writerClosedFuture != null){ + // ensure writer.closeAndComplete is done before we close dlm + writerClosedFuture.get(); + } executor.shutdown(); dlm.close(); } } - @Test(timeout = 60000) + @Test(timeout = 100000) public void testNonBlockingReadRecovery() throws Exception { String name = "distrlog-non-blocking-reader-recovery"; final DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); @@ -81,9 +86,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase { confLocal.setReadAheadMaxRecords(10); final DistributedLogManager dlm = createNewDLM(confLocal, name); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + ScheduledFuture writerClosedFuture = null; try { final Thread currentThread = Thread.currentThread(); - executor.schedule( + writerClosedFuture = executor.schedule( new Runnable() { @Override public void run() { @@ -100,12 +106,16 @@ public class TestNonBlockingReads extends TestDistributedLogBase { readNonBlocking(dlm, false); assertFalse(currentThread.isInterrupted()); } finally { + if (writerClosedFuture != null){ + // ensure writer.closeAndComplete is done before we close dlm + writerClosedFuture.get(); + } executor.shutdown(); dlm.close(); } } - @Test(timeout = 60000) + @Test(timeout = 100000) public void testNonBlockingReadIdleError() throws Exception { String name = "distrlog-non-blocking-reader-error"; final DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); @@ -116,10 +126,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase { confLocal.setReaderIdleErrorThresholdMillis(100); final DistributedLogManager dlm = createNewDLM(confLocal, name); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); - + ScheduledFuture writerClosedFuture = null; try { final Thread currentThread = Thread.currentThread(); - executor.schedule( + writerClosedFuture = executor.schedule( new Runnable() { @Override public void run() { @@ -141,6 +151,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase { assertTrue(exceptionEncountered); assertFalse(currentThread.isInterrupted()); } finally { + if (writerClosedFuture != null){ + // ensure writer.closeAndComplete is done before we close dlm + writerClosedFuture.get(); + } executor.shutdown(); dlm.close(); } @@ -157,10 +171,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase { confLocal.setReaderIdleErrorThresholdMillis(30000); final DistributedLogManager dlm = createNewDLM(confLocal, name); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); - + ScheduledFuture writerClosedFuture = null; try { final Thread currentThread = Thread.currentThread(); - executor.schedule( + writerClosedFuture = executor.schedule( new Runnable() { @Override public void run() { @@ -183,6 +197,10 @@ public class TestNonBlockingReads extends TestDistributedLogBase { assertFalse(exceptionEncountered); assertFalse(currentThread.isInterrupted()); } finally { + if (writerClosedFuture != null){ + // ensure writer.closeAndComplete is done before we close dlm + writerClosedFuture.get(); + } executor.shutdown(); dlm.close(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java index 278bf29..24733a4 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java @@ -42,6 +42,17 @@ import static org.junit.Assert.*; public class TestConfigurationSubscription { static final Logger LOG = LoggerFactory.getLogger(TestConfigurationSubscription.class); + /** + * Give FileChangedReloadingStrategy some time to start reloading + * Make sure now!=lastChecked + * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()} + */ + private void ensureConfigReloaded() throws InterruptedException { + // sleep 1 ms so that System.currentTimeMillis() != + // lastChecked (the time we construct FileChangedReloadingStrategy + Thread.sleep(1); + } + @Test(timeout = 60000) public void testReloadConfiguration() throws Exception { PropertiesWriter writer = new PropertiesWriter(); @@ -63,7 +74,8 @@ public class TestConfigurationSubscription { // add writer.setProperty("prop1", "1"); writer.save(); - + // ensure the file change reloading event can be triggered + ensureConfigReloaded(); // reload the config confSub.reload(); assertNotNull(confHolder.get()); @@ -85,6 +97,8 @@ public class TestConfigurationSubscription { // add writer.setProperty("prop1", "1"); writer.save(); + // ensure the file change reloading event can be triggered + ensureConfigReloaded(); mockScheduler.tick(100, TimeUnit.MILLISECONDS); assertEquals("1", conf.getProperty("prop1")); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java index 35fac65..46c1880 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java @@ -30,6 +30,19 @@ import static org.junit.Assert.*; */ public class TestDynamicConfigurationFeatureProvider { + /** + * Make sure config is reloaded + * + * Give FileChangedReloadingStrategy some time to allow reloading + * Make sure now!=lastChecked + * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()} + */ + private void ensureConfigReloaded() throws InterruptedException { + // sleep 1 ms so that System.currentTimeMillis() != + // lastChecked (the time we construct FileChangedReloadingStrategy + Thread.sleep(1); + } + @Test(timeout = 60000) public void testLoadFeaturesFromBase() throws Exception { PropertiesWriter writer = new PropertiesWriter(); @@ -43,6 +56,7 @@ public class TestDynamicConfigurationFeatureProvider { DynamicConfigurationFeatureProvider provider = new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE); provider.start(); + ensureConfigReloaded(); Feature feature1 = provider.getFeature("feature_1"); assertTrue(feature1.isAvailable()); @@ -79,6 +93,7 @@ public class TestDynamicConfigurationFeatureProvider { DynamicConfigurationFeatureProvider provider = new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE); provider.start(); + ensureConfigReloaded(); Feature feature1 = provider.getFeature("feature_1"); assertTrue(feature1.isAvailable()); @@ -118,6 +133,7 @@ public class TestDynamicConfigurationFeatureProvider { DynamicConfigurationFeatureProvider provider = new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE); provider.start(); + ensureConfigReloaded(); Feature feature1 = provider.getFeature("feature_1"); assertTrue(feature1.isAvailable()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a5dd5adc/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java index 61fb808..ed456b9 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java @@ -742,12 +742,14 @@ public class TestDistributedLogService extends TestDistributedLogBase { StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode()); } - assertTrue("There should be no streams in the cache", - streamManager.getCachedStreams().isEmpty()); + // acquired streams should all been removed after we close them assertTrue("There should be no streams in the acquired cache", - streamManager.getAcquiredStreams().isEmpty()); - + streamManager.getAcquiredStreams().isEmpty()); localService.shutdown(); + // cached streams wouldn't be removed immediately after streams are closed + // but they should be removed after we shutdown the service + assertTrue("There should be no streams in the cache after shutting down the service", + streamManager.getCachedStreams().isEmpty()); } @Test(timeout = 60000)
