Enforce waitUntilFlushed timeout per bucket (as well as overall).
Rework JUnit tests for new implementation.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d0c44de5
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d0c44de5
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d0c44de5

Branch: refs/heads/feature/GEODE-2852
Commit: d0c44de5f0bc2d1de276fc25c5cecaf61908fa7c
Parents: 6eb100d
Author: Lynn Hughes-Godfrey <[email protected]>
Authored: Tue May 2 13:33:49 2017 -0700
Committer: Lynn Hughes-Godfrey <[email protected]>
Committed: Tue May 2 13:33:49 2017 -0700

----------------------------------------------------------------------
 ...ParallelGatewaySenderFlushedCoordinator.java | 46 ++++++++------
 ...atewaySenderFlushedCoordinatorJUnitTest.java | 67 +++++++++++---------
 2 files changed, 64 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/d0c44de5/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
index 3bb220f..42ce68c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
@@ -23,8 +23,10 @@ import 
org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordina
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.*;
 
 public class WaitUntilParallelGatewaySenderFlushedCoordinator
@@ -46,28 +48,30 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinator
       sender.getCancelCriterion().checkCancelInProgress(null);
     }
 
-    // Create callables for local buckets
-    List<WaitUntilBucketRegionQueueFlushedCallable> callables =
-        buildWaitUntilBucketRegionQueueFlushedCallables(pr);
-
-    // Submit local callables for execution
     ExecutorService service = 
this.sender.getDistributionManager().getWaitingThreadPool();
     List<Future<Boolean>> callableFutures = new ArrayList<>();
     int callableCount = 0;
-    if (logger.isDebugEnabled()) {
-      logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Created 
and being submitted "
-          + callables.size() + " callables=" + callables);
-    }
-    long endTime = System.nanoTime() + unit.toNanos(timeout);
-    for (Callable<Boolean> callable : callables) {
+    long nanosRemaining = unit.toNanos(timeout);
+    long endTime = System.nanoTime() + nanosRemaining;
+    Set<BucketRegion> localBucketRegions = getLocalBucketRegions(pr);
+    for (BucketRegion br : localBucketRegions) {
       // timeout exceeded, do not submit more callables, return localResult 
false
       if (System.nanoTime() >= endTime) {
         localResult = false;
         break;
       }
+      // create and submit callable with updated timeout
+      Callable<Boolean> callable = 
createWaitUntilBucketRegionQueueFlushedCallable(
+          (BucketRegionQueue) br, nanosRemaining, TimeUnit.NANOSECONDS);
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "WaitUntilParallelGatewaySenderFlushedCoordinator: Submitting 
callable for bucket "
+                + br.getId() + " callable=" + callable + " nanosRemaining=" + 
nanosRemaining);
+      }
       callableFutures.add(service.submit(callable));
       callableCount++;
-      if ((callableCount % CALLABLES_CHUNK_SIZE) == 0 || callableCount == 
callables.size()) {
+      if ((callableCount % CALLABLES_CHUNK_SIZE) == 0
+          || callableCount == localBucketRegions.size()) {
         CallablesChunkResults callablesChunkResults =
             new CallablesChunkResults(localResult, exceptionToThrow, 
callableFutures).invoke();
         localResult = callablesChunkResults.getLocalResult();
@@ -80,6 +84,7 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator
           throw exceptionToThrow;
         }
       }
+      nanosRemaining = endTime - System.nanoTime();
     }
 
     // Return the full result
@@ -90,16 +95,17 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinator
     return localResult;
   }
 
-  protected List<WaitUntilBucketRegionQueueFlushedCallable> 
buildWaitUntilBucketRegionQueueFlushedCallables(
-      PartitionedRegion pr) {
-    List<WaitUntilBucketRegionQueueFlushedCallable> callables = new 
ArrayList<>();
+  protected Set<BucketRegion> getLocalBucketRegions(PartitionedRegion pr) {
+    Set<BucketRegion> localBucketRegions = new HashSet<BucketRegion>();
     if (pr.isDataStore()) {
-      for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
-        callables.add(new 
WaitUntilBucketRegionQueueFlushedCallable((BucketRegionQueue) br,
-            this.timeout, this.unit));
-      }
+      localBucketRegions = pr.getDataStore().getAllLocalBucketRegions();
     }
-    return callables;
+    return localBucketRegions;
+  }
+
+  protected WaitUntilBucketRegionQueueFlushedCallable 
createWaitUntilBucketRegionQueueFlushedCallable(
+      BucketRegionQueue br, long timeout, TimeUnit unit) {
+    return new WaitUntilBucketRegionQueueFlushedCallable(br, timeout, unit);
   }
 
   public static class WaitUntilBucketRegionQueueFlushedCallable implements 
Callable<Boolean> {

http://git-wip-us.apache.org/repos/asf/geode/blob/d0c44de5/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
index d957f91..77902f8 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.internal.cache.wan.parallel;
 
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.BucketRegionQueue;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import 
org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordinatorJUnitTest;
@@ -23,7 +25,9 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertFalse;
@@ -32,12 +36,14 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 @Category(IntegrationTest.class)
 public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     extends WaitUntilGatewaySenderFlushedCoordinatorJUnitTest {
 
   private PartitionedRegion region;
+  private BucketRegionQueue brq;
 
   protected void createGatewaySender() {
     super.createGatewaySender();
@@ -46,6 +52,7 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     doReturn(queue).when(this.sender).getQueue();
     this.region = mock(PartitionedRegion.class);
     doReturn(this.region).when(queue).getRegion();
+    this.brq = mock(BucketRegionQueue.class);
   }
 
   protected AbstractGatewaySenderEventProcessor getEventProcessor() {
@@ -56,69 +63,71 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
 
   @Test
   public void 
testWaitUntilParallelGatewaySenderFlushedSuccessfulNotInitiator() throws 
Throwable {
+    long timeout = 5000;
+    TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
1000l,
-            TimeUnit.MILLISECONDS, false);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, false);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
-    doReturn(getSuccessfulCallables(true)).when(coordinatorSpy)
-        .buildWaitUntilBucketRegionQueueFlushedCallables(this.region);
+    
doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
+    doReturn(getCallableResult(true)).when(coordinatorSpy)
+        .createWaitUntilBucketRegionQueueFlushedCallable(any(), anyLong(), 
any());
     boolean result = coordinatorSpy.waitUntilFlushed();
     assertTrue(result);
   }
 
   @Test
   public void 
testWaitUntilParallelGatewaySenderFlushedUnsuccessfulNotInitiator() throws 
Throwable {
+    long timeout = 5000;
+    TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
1000l,
-            TimeUnit.MILLISECONDS, false);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, false);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
-    doReturn(getUnsuccessfulCallables()).when(coordinatorSpy)
-        .buildWaitUntilBucketRegionQueueFlushedCallables(this.region);
+    
doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
+    doReturn(getCallableResult(false)).when(coordinatorSpy)
+        .createWaitUntilBucketRegionQueueFlushedCallable(any(), anyLong(), 
any());
     boolean result = coordinatorSpy.waitUntilFlushed();
     assertFalse(result);
   }
 
   @Test
   public void testWaitUntilParallelGatewaySenderFlushedSuccessfulInitiator() 
throws Throwable {
+    long timeout = 5000;
+    TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
1000l,
-            TimeUnit.MILLISECONDS, true);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, true);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
-    doReturn(getSuccessfulCallables(true)).when(coordinatorSpy)
-        .buildWaitUntilBucketRegionQueueFlushedCallables(this.region);
+    
doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
+    doReturn(getCallableResult(true)).when(coordinatorSpy)
+        .createWaitUntilBucketRegionQueueFlushedCallable(any(), anyLong(), 
any());
     boolean result = coordinatorSpy.waitUntilFlushed();
     assertTrue(result);
   }
 
   @Test
   public void testWaitUntilParallelGatewaySenderFlushedUnsuccessfulInitiator() 
throws Throwable {
+    long timeout = 5000;
+    TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
1000l,
-            TimeUnit.MILLISECONDS, true);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, true);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
-    doReturn(getSuccessfulCallables(false)).when(coordinatorSpy)
-        .buildWaitUntilBucketRegionQueueFlushedCallables(this.region);
+    
doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
+    doReturn(getCallableResult(false)).when(coordinatorSpy)
+        .createWaitUntilBucketRegionQueueFlushedCallable(any(), anyLong(), 
any());
     boolean result = coordinatorSpy.waitUntilFlushed();
     assertFalse(result);
   }
 
-  private List<WaitUntilBucketRegionQueueFlushedCallable> 
getSuccessfulCallables(
-      boolean expectedResult) throws Exception {
-    List callables = new ArrayList();
+  private WaitUntilBucketRegionQueueFlushedCallable getCallableResult(boolean 
expectedResult)
+      throws Exception {
     WaitUntilBucketRegionQueueFlushedCallable callable =
         mock(WaitUntilBucketRegionQueueFlushedCallable.class);
     when(callable.call()).thenReturn(expectedResult);
-    callables.add(callable);
-    return callables;
+    return callable;
   }
 
-  private List<WaitUntilBucketRegionQueueFlushedCallable> 
getUnsuccessfulCallables()
-      throws Exception {
-    List callables = new ArrayList();
-    WaitUntilBucketRegionQueueFlushedCallable callable =
-        mock(WaitUntilBucketRegionQueueFlushedCallable.class);
-    when(callable.call()).thenReturn(false);
-    callables.add(callable);
-    return callables;
+  private Set<BucketRegionQueue> getLocalBucketRegions() {
+    Set<BucketRegionQueue> localBucketRegions = new 
HashSet<BucketRegionQueue>();
+    localBucketRegions.add(this.brq);
+    return localBucketRegions;
   }
 }

Reply via email to