This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-6899 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 0011e1405caa93c0fafc1a87b15c5fd58679f56d Author: zhouxh <gz...@pivotal.io> AuthorDate: Fri Jun 21 17:14:00 2019 -0700 GEODE-6899: retried client should set last try's version tag if found Co-authored-by: Xiaojian Zhou <gz...@pivotal.io> Co-authored-by: Eric Shu <e...@pivotal.io> --- .../geode/internal/cache/DistributedRegion.java | 3 +++ .../internal/cache/InitialImageOperation.java | 29 +++++++++++++++++++--- .../apache/geode/internal/cache/LocalRegion.java | 6 +++++ 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index 557a999..63929a3 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -569,6 +569,9 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute return false; } if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) { + if (logger.isDebugEnabled()) { + logger.debug("Not to create a new version tag for retried event {}", event); + } return false; } if (event.getOperation().isLocal()) { // bug #45402 - localDestroy generated a version tag diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java index d0811c3..d2216c0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java @@ -68,6 +68,7 @@ import org.apache.geode.internal.ByteArrayDataInput; import org.apache.geode.internal.DataSerializableFixedID; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.NullDataOutputStream; +import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.InitialImageFlowControl.FlowControlPermitMessage; import org.apache.geode.internal.cache.entries.DiskEntry; @@ -1885,10 +1886,8 @@ public class InitialImageOperation { if (lostMemberVersionID == null) { lostMemberVersionID = lostMemberID; } - // check to see if the region in this cache needs to synchronize with others - // it is possible that the cache is recover/restart of a member and not - // scheduled to synchronize with others - synchronizeIfNotScheduled(targetRegion, lostMemberID, lostMemberVersionID); + waitInAnotherThreadToCheckIfSynchronizationScheduled(targetRegion, lostMemberID, + lostMemberVersionID); } if (internalAfterSentImageReply != null @@ -1920,11 +1919,33 @@ public class InitialImageOperation { // if region synchronization has not been scheduled or performed, // we do synchronization with no delay as we received the synchronization request // indicating timed task has been triggered on other nodes + if (logger.isDebugEnabled()) { + logger.debug("Newly joined member is triggered to schedule SynchronizeForLostMember"); + } region.scheduleSynchronizeForLostMember(lostMember, lostVersionSource, 0); + } else { + if (logger.isDebugEnabled()) { + logger.debug( + "Live member has been scheduled SynchronizeForLostMember by membership listener."); + } } } /** + * Pause 1 second in anther thread to wait for membership listener to trigger syncWithLostMember + * operation. Otherwise, this is a newly started member, do the syncWithLostMember here. + */ + void waitInAnotherThreadToCheckIfSynchronizationScheduled(DistributedRegion region, + InternalDistributedMember lostMember, VersionSource lostVersionSource) { + region.getCache().getCCPTimer().schedule(new SystemTimer.SystemTimerTask() { + @Override + public void run2() { + synchronizeIfNotScheduled(region, lostMember, lostVersionSource); + } + }, 1000); + } + + /** * Serialize the entries into byte[] chunks, calling proc for each one. proc args: the byte[] * chunk and an int indicating whether it is the last chunk (positive means last chunk, zero * otherwise). The return value of proc indicates whether to continue to the next chunk (true) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index dfd2c26..9f34c06 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -10647,6 +10647,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, final EntryEventImpl event = entryEventFactory.create(this, Operation.REPLACE, key, null, callbackArg, false, client.getDistributedMember(), true, eventId); + // if this is a replayed operation we may already have a version tag + event.setVersionTag(clientEvent.getVersionTag()); + event.setPossibleDuplicate(clientEvent.isPossibleDuplicate()); try { event.setContext(client); @@ -10701,6 +10704,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, final EntryEventImpl event = entryEventFactory.create(this, Operation.REPLACE, key, null, callbackArg, false, client.getDistributedMember(), true, eventId); + // if this is a replayed operation we may already have a version tag + event.setVersionTag(clientEvent.getVersionTag()); + event.setPossibleDuplicate(clientEvent.isPossibleDuplicate()); try { event.setContext(client);