GEODE-155: - Fixes intermittent failure in RegionWithHDFS*DUnitTest about incorrect number of hoplog files getting created. - Caused due to an earlier test not cleaning up static variables in ParallelGatewaySenderQueue. - These are indeed cleaned up during GemFireCacheImpl.close() if there are any GatewaySenders running in the system. - But if a region with gateway-senders associated with it, is destroyed first before a cache.close(), then the gateway senders are only stopped, and removed from allGatewaySenders list. But the static variables are not cleaned up. - Later, during GemFireCacheImpl.close(), as the allGatewaySenders list is empty, it skips cleaning the static variables. - As a fix, invoking a static clean-up method of ParallelGatewaySenderQueue explicitly during cache.close(). - Retained the non-static ParallelGatewaySenderQueue.cleanUp(), if required in future development for performing instance-specific clean-up. - Minor formatting changes in the unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/cb735a20 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/cb735a20 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/cb735a20 Branch: refs/heads/feature/GEODE-137 Commit: cb735a2045f124ee7c5c6ba84dbcafe4383008d7 Parents: 71f00b5 Author: ashetkar <ashet...@pivotal.io> Authored: Tue Aug 4 14:06:51 2015 +0530 Committer: Qihong Chen <qc...@pivotal.io> Committed: Thu Aug 6 10:07:48 2015 -0700 ---------------------------------------------------------------------- .../internal/cache/GemFireCacheImpl.java | 1 + .../parallel/ParallelGatewaySenderQueue.java | 41 +++++++++++++------- .../internal/RegionWithHDFSBasicDUnitTest.java | 22 +++++------ .../RegionWithHDFSOffHeapBasicDUnitTest.java | 10 +++-- .../hdfs/internal/RegionWithHDFSTestBase.java | 6 +-- 5 files changed, 49 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cb735a20/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java index 5487000..f5be144 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java @@ -2018,6 +2018,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer advisor.close(); } } + ParallelGatewaySenderQueue.cleanUpStatics(null); } catch (CancelException ce) { } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cb735a20/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index f4f9528..9141905 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -633,15 +633,20 @@ public class ParallelGatewaySenderQueue implements RegionQueue { * Wait a while for existing tasks to terminate. If the existing tasks still don't * complete, cancel them by calling shutdownNow. */ - private void cleanupConflationThreadPool() { + private static void cleanupConflationThreadPool(AbstractGatewaySender sender) { conflationExecutor.shutdown();// Disable new tasks from being submitted try { if (!conflationExecutor.awaitTermination(1, TimeUnit.SECONDS)) { conflationExecutor.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled - if (!conflationExecutor.awaitTermination(1, TimeUnit.SECONDS)) - logger.warn(LocalizedMessage.create(LocalizedStrings.ParallelGatewaySenderQueue_COULD_NOT_TERMINATE_CONFLATION_THREADPOOL, this.sender)); + if (!conflationExecutor.awaitTermination(1, TimeUnit.SECONDS)) { + logger + .warn(LocalizedMessage + .create( + LocalizedStrings.ParallelGatewaySenderQueue_COULD_NOT_TERMINATE_CONFLATION_THREADPOOL, + (sender == null ? "all" : sender))); + } } } catch (InterruptedException e) { // (Re-)Cancel if current thread also interrupted @@ -1508,25 +1513,33 @@ public class ParallelGatewaySenderQueue implements RegionQueue { * by the queue. Note that this cleanup doesn't clean the data held by the queue. */ public void cleanUp() { - if(buckToDispatchLock != null){ - this.buckToDispatchLock = null; + cleanUpStatics(this.sender); + } + + /** + * @param sender + * can be null. + */ + public static void cleanUpStatics(AbstractGatewaySender sender) { + if (buckToDispatchLock != null) { + buckToDispatchLock = null; } - if(regionToDispatchedKeysMapEmpty != null) { - this.regionToDispatchedKeysMapEmpty = null; + if (regionToDispatchedKeysMapEmpty != null) { + regionToDispatchedKeysMapEmpty = null; } - this.regionToDispatchedKeysMap.clear(); + regionToDispatchedKeysMap.clear(); synchronized (ParallelGatewaySenderQueue.class) { - if (this.removalThread != null) { - this.removalThread.shutdown(); - this.removalThread = null; + if (removalThread != null) { + removalThread.shutdown(); + removalThread = null; } } if (conflationExecutor != null) { - cleanupConflationThreadPool(); - this.conflationExecutor = null; + cleanupConflationThreadPool(sender); + conflationExecutor = null; } } - + @Override public void close() { // Because of bug 49060 do not close the regions of a parallel queue http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cb735a20/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java index cf10b24..162e529 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java @@ -86,7 +86,7 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase { final String uniqueName, final int batchInterval, final boolean queuePersistent, final boolean writeonly, final long timeForRollover, final long maxFileSize) { - SerializableCallable createRegion = new SerializableCallable() { + SerializableCallable createRegion = new SerializableCallable("Create HDFS region") { public Object call() throws Exception { AttributesFactory af = new AttributesFactory(); af.setDataPolicy(DataPolicy.HDFS_PARTITION); @@ -95,8 +95,8 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase { paf.setRedundantCopies(1); af.setHDFSStoreName(uniqueName); - af.setPartitionAttributes(paf.create()); + HDFSStoreFactory hsf = getCache().createHDFSStoreFactory(); // Going two level up to avoid home directories getting created in // VM-specific directory. This avoids failures in those tests where @@ -860,7 +860,7 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase { protected AsyncInvocation doAsyncPuts(VM vm, final String regionName, final int start, final int end, final String suffix, final String value) throws Exception { - return vm.invokeAsync(new SerializableCallable() { + return vm.invokeAsync(new SerializableCallable("doAsyncPuts") { public Object call() throws Exception { Region r = getRootRegion(regionName); String v = "V"; @@ -976,8 +976,8 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase { } /** - * create server with file rollover time as 2 secs. Insert few entries and - * then sleep for 2 sec. A file should be created. Do it again At th end, two + * Create server with file rollover time as 5 seconds. Insert few entries and + * then sleep for 7 seconds. A file should be created. Do it again. At the end, two * files with inserted entries should be created. * * @throws Throwable @@ -991,8 +991,8 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase { String homeDir = "./testWOTimeForRollOverParam"; final String uniqueName = getName(); - createServerRegion(vm0, 1, 1, 500, homeDir, uniqueName, 5, true, false, 4, 1); - createServerRegion(vm1, 1, 1, 500, homeDir, uniqueName, 5, true, false, 4, 1); + createServerRegion(vm0, 1, 1, 500, homeDir, uniqueName, 5, true, false, 5, 1); + createServerRegion(vm1, 1, 1, 500, homeDir, uniqueName, 5, true, false, 5, 1); AsyncInvocation a1 = doAsyncPuts(vm0, uniqueName, 1, 8, "vm0"); AsyncInvocation a2 = doAsyncPuts(vm1, uniqueName, 4, 10, "vm1"); @@ -1000,7 +1000,7 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase { a1.join(); a2.join(); - Thread.sleep(8000); + Thread.sleep(7000); a1 = doAsyncPuts(vm0, uniqueName, 10, 18, "vm0"); a2 = doAsyncPuts(vm1, uniqueName, 14, 20, "vm1"); @@ -1008,13 +1008,13 @@ public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase { a1.join(); a2.join(); - Thread.sleep(8000); + Thread.sleep(7000); cacheClose(vm0, false); cacheClose(vm1, false); - AsyncInvocation async1 = createServerRegionAsync(vm0, 1, 1, 500, homeDir, uniqueName, 5, true, false, 4, 1); - AsyncInvocation async2 = createServerRegionAsync(vm1, 1, 1, 500, homeDir, uniqueName, 5, true, false, 4, 1); + AsyncInvocation async1 = createServerRegionAsync(vm0, 1, 1, 500, homeDir, uniqueName, 5, true, false, 5, 1); + AsyncInvocation async2 = createServerRegionAsync(vm1, 1, 1, 500, homeDir, uniqueName, 5, true, false, 5, 1); async1.getResult(); async2.getResult(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cb735a20/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java index 21e2986..ee517d2 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java @@ -45,9 +45,13 @@ public class RegionWithHDFSOffHeapBasicDUnitTest extends } } }; - checkOrphans.run(); - invokeInEveryVM(checkOrphans); - super.tearDown2(); + try { + checkOrphans.run(); + invokeInEveryVM(checkOrphans); + } finally { + // proceed with tearDown2 anyway. + super.tearDown2(); + } } public void testDelta() { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cb735a20/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java index 8ad57c9..92687ed 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java @@ -85,8 +85,9 @@ public abstract class RegionWithHDFSTestBase extends CacheTestCase { } public SerializableCallable cleanUpStoresAndDisconnect() throws Exception { - SerializableCallable cleanUp = new SerializableCallable() { + SerializableCallable cleanUp = new SerializableCallable("cleanUpStoresAndDisconnect") { public Object call() throws Exception { + disconnectFromDS(); File file; if (homeDir != null) { file = new File(homeDir); @@ -95,7 +96,6 @@ public abstract class RegionWithHDFSTestBase extends CacheTestCase { } file = new File(tmpDir); FileUtil.delete(file); - disconnectFromDS(); return 0; } }; @@ -576,7 +576,7 @@ public abstract class RegionWithHDFSTestBase extends CacheTestCase { return entriesToFileMap; } protected SerializableCallable validateEmpty(VM vm0, final int numEntries, final String uniqueName) { - SerializableCallable validateEmpty = new SerializableCallable("validate") { + SerializableCallable validateEmpty = new SerializableCallable("validateEmpty") { public Object call() throws Exception { Region r = getRootRegion(uniqueName);