fix-6
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d0e8a5a8 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d0e8a5a8 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d0e8a5a8 Branch: refs/heads/feature/GEM-1299 Commit: d0e8a5a83f6a164b3af43a73852b7fb78d385f87 Parents: a192c9f Author: zhouxh <gz...@pivotal.io> Authored: Wed Apr 26 23:23:13 2017 -0700 Committer: zhouxh <gz...@pivotal.io> Committed: Thu Apr 27 11:47:01 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/geode/internal/cache/BucketRegion.java | 2 +- .../org/apache/geode/internal/cache/BucketRegionQueue.java | 6 +++++- .../java/org/apache/geode/internal/cache/LocalRegion.java | 2 +- .../cache/wan/parallel/ParallelGatewaySenderQueue.java | 1 + .../lucene/internal/LuceneIndexForPartitionedRegion.java | 9 +++++---- .../internal/distributed/PokeLuceneAsyncQueueFunction.java | 3 +++ 6 files changed, 16 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index 136d7b9..cde7cf4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -668,7 +668,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { } } - protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { + public void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { // We don't need to clone the event for new Gateway Senders. // Preserve the bucket reference for resetting it later. LocalRegion bucketRegion = event.getRegion(); http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index bcc1d8d..56ae3f1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -384,7 +384,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { /** * Does a get that gets the value without fault values in from disk. */ - private Object optimalGet(Object k) { + public Object optimalGet(Object k) { // Get the object at that key (to remove the index). Object object = null; try { @@ -588,6 +588,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { return this.eventSeqNumQueue.peek(); } + public BlockingQueue getEventSeqNumQueue() { + return eventSeqNumQueue; + } + public boolean isReadyForPeek() { return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty() && !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary(); http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 8c061b0..3f0d6b3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -6338,7 +6338,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return false; } - protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { + public void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { if (isPdxTypesRegion() || event.isConcurrencyConflict() /* usually concurrent cache modification problem */) { http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 9696b90..87feb21 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1133,6 +1133,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { bucketId, prQ.getFullPath()); } } + brq.getEventSeqNumQueue().add(key); addRemovedEvent(prQ, bucketId, key); } http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java index a60ca01..6e3dce0 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java @@ -290,16 +290,17 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { try { for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) { if (!br.getBucketAdvisor().isPrimary()) { - AsyncEvent currentFirst = (AsyncEvent) ((BucketRegionQueue) br).firstEventSeqNum(); - AsyncEvent lastPeek = (AsyncEvent) lastPeekedEvents.put(br, currentFirst); + Long currentFirst = (Long) ((BucketRegionQueue) br).firstEventSeqNum(); + Long lastPeek = (Long) lastPeekedEvents.put(br, currentFirst); if (currentFirst != null && currentFirst.equals(lastPeek)) { - redistributeEvents(lastPeek); + redistributeEvents((AsyncEvent) ((BucketRegionQueue) br).optimalGet(currentFirst)); + lastPeekedEvents.put(br, ((BucketRegionQueue) br).firstEventSeqNum()); } } else { lastPeekedEvents.put(br, null); } } - Thread.sleep(10000); + Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java index 992972b..10c6888 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java @@ -52,6 +52,7 @@ public class PokeLuceneAsyncQueueFunction implements Function, InternalEntity { PartitionedRegion pr = (PartitionedRegion) ctx.getDataSet(); Cache cache = pr.getCache(); String queueId = (String) pr.getAttributes().getAsyncEventQueueIds().iterator().next(); + // PR could have many AEQs, not just AEQ for lucene AsyncEventQueueImpl queue = (AsyncEventQueueImpl) cache.getAsyncEventQueue(queueId); // Get the GatewaySender @@ -60,6 +61,8 @@ public class PokeLuceneAsyncQueueFunction implements Function, InternalEntity { // Update the shadow key BucketRegion br = pr.getBucketRegion(key); if (br.getBucketAdvisor().isPrimary()) { + // only do it for primary? how about failover again to secondary? + // why not br.notifyGatewaySender(operation, event); try { List<ParallelGatewaySenderEventProcessor> processors = ((ConcurrentParallelGatewaySenderEventProcessor) sender.getEventProcessor())