GEODE-2860: Refactor use of EventTracker

  * change EventTracker to an interface with two implementations
  * move as much logic out of LocalRegion down into subclasses that
    make use EventTracker
  * move and refactor static inner classes in EventTracker into own
    class files
  * migrate some of event-focused classes into a new sub package
  * add tests for existing logic from EventTracker

This closes #638


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/3f9be9a7
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/3f9be9a7
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/3f9be9a7

Branch: refs/heads/feature/GEM-1483
Commit: 3f9be9a74fcdb1152692d72720ed2d78efec41d4
Parents: d02bf86
Author: Nick Reich <nre...@pivotal.io>
Authored: Fri Jun 30 16:05:58 2017 -0700
Committer: Ken Howe <kh...@pivotal.io>
Committed: Tue Jul 18 07:11:33 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/BucketRegion.java      |  36 +-
 .../internal/cache/CreateRegionProcessor.java   |  22 +-
 .../geode/internal/cache/DistributedRegion.java |  46 +-
 .../geode/internal/cache/EventStateHelper.java  |  10 +-
 .../geode/internal/cache/EventTracker.java      | 790 -------------------
 .../internal/cache/FindVersionTagOperation.java |   2 +-
 .../geode/internal/cache/GemFireCacheImpl.java  |  24 +-
 .../apache/geode/internal/cache/HARegion.java   |   5 +-
 .../geode/internal/cache/InternalCache.java     |   3 +-
 .../geode/internal/cache/LocalRegion.java       | 167 +---
 .../geode/internal/cache/PartitionedRegion.java |   9 +-
 .../cache/event/BulkOperationHolder.java        |  79 ++
 .../cache/event/DistributedEventTracker.java    | 523 ++++++++++++
 .../cache/event/EventSequenceNumberHolder.java  | 124 +++
 .../internal/cache/event/EventTracker.java      | 136 ++++
 .../cache/event/EventTrackerExpiryTask.java     |  97 +++
 .../cache/event/NonDistributedEventTracker.java | 135 ++++
 .../cache/tier/sockets/BaseCommand.java         |   4 +-
 .../wan/serial/SerialGatewaySenderQueue.java    |   6 +-
 .../internal/cache/xmlcache/CacheCreation.java  |   4 +-
 .../internal/cache/BucketRegionJUnitTest.java   |   4 +
 .../cache/DistributedRegionJUnitTest.java       |  11 +-
 .../internal/cache/EventTrackerDUnitTest.java   | 486 ------------
 .../geode/internal/cache/EventTrackerTest.java  |  94 ---
 .../geode/internal/cache/IteratorDUnitTest.java |   2 +-
 .../cache/PartitionedRegionDUnitTestCase.java   |   4 +-
 ...onedRegionSingleNodeOperationsJUnitTest.java |  11 +-
 .../event/DistributedEventTrackerTest.java      | 328 ++++++++
 .../cache/event/EventTrackerDUnitTest.java      | 489 ++++++++++++
 .../cache/event/EventTrackerExpiryTaskTest.java |  94 +++
 .../event/NonDistributedEventTrackerTest.java   |  89 +++
 .../sanctionedDataSerializables.txt             |   8 +-
 32 files changed, 2262 insertions(+), 1580 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/3f9be9a7/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 31b341a..30ce9e7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -30,7 +30,7 @@ import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.BucketAdvisor.BucketProfile;
 import 
org.apache.geode.internal.cache.CreateRegionProcessor.CreateRegionReplyProcessor;
-import org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder;
+import org.apache.geode.internal.cache.event.EventSequenceNumberHolder;
 import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
 import org.apache.geode.internal.cache.control.MemoryEvent;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
@@ -280,12 +280,6 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
   }
 
   @Override
-  public void createEventTracker() {
-    this.eventTracker = new EventTracker(this);
-    this.eventTracker.start();
-  }
-
-  @Override
   public void registerCreateRegionReplyProcessor(CreateRegionReplyProcessor 
processor) {
     this.createRegionReplyProcessor = processor;
   }
@@ -293,7 +287,7 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
   @Override
   protected void recordEventStateFromImageProvider(InternalDistributedMember 
provider) {
     if (this.createRegionReplyProcessor != null) {
-      Map<ThreadIdentifier, EventSeqnoHolder> providerEventStates =
+      Map<ThreadIdentifier, EventSequenceNumberHolder> providerEventStates =
           this.createRegionReplyProcessor.getEventState(provider);
       if (providerEventStates != null) {
         recordEventState(provider, providerEventStates);
@@ -1632,7 +1626,7 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
     if (this.isInitialized()) {
       boolean callThem = callDispatchListenerEvent;
       if (event.isPossibleDuplicate()
-          && 
this.eventTracker.isInitialImageProvider(event.getDistributedMember())) {
+          && 
getEventTracker().isInitialImageProvider(event.getDistributedMember())) {
         callThem = false;
       }
       super.invokeTXCallbacks(eventType, event, callThem);
@@ -1664,7 +1658,7 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
     if (this.isInitialized()) {
       boolean callThem = callDispatchListenerEvent;
       if (event.isPossibleDuplicate()
-          && 
this.eventTracker.isInitialImageProvider(event.getDistributedMember())) {
+          && 
this.getEventTracker().isInitialImageProvider(event.getDistributedMember())) {
         callThem = false;
       }
       super.invokeDestroyCallbacks(eventType, event, callThem, notifyGateways);
@@ -1695,7 +1689,7 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
     if (this.isInitialized()) {
       boolean callThem = callDispatchListenerEvent;
       if (event.isPossibleDuplicate()
-          && 
this.eventTracker.isInitialImageProvider(event.getDistributedMember())) {
+          && 
this.getEventTracker().isInitialImageProvider(event.getDistributedMember())) {
         callThem = false;
       }
       super.invokeInvalidateCallbacks(eventType, event, callThem);
@@ -1729,7 +1723,7 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
     if (this.isInitialized()) {
       boolean callThem = callDispatchListenerEvent;
       if (callThem && event.isPossibleDuplicate()
-          && 
this.eventTracker.isInitialImageProvider(event.getDistributedMember())) {
+          && 
this.getEventTracker().isInitialImageProvider(event.getDistributedMember())) {
         callThem = false;
       }
       super.invokePutCallbacks(eventType, event, callThem, notifyGateways);
@@ -2439,5 +2433,23 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
     return getPartitionedRegion().notifiesMultipleSerialGateways();
   }
 
+  @Override
+  public boolean hasSeenEvent(EntryEventImpl event) {
+    ensureEventTrackerInitialization();
+    return super.hasSeenEvent(event);
+  }
+
+  // bug 41289 - wait for event tracker to be initialized before checkin
+  // so that an operation intended for a previous version of a bucket
+  // is not prematurely applied to a new version of the bucket
+  private void ensureEventTrackerInitialization() {
+    try {
+      getEventTracker().waitOnInitialization();
+    } catch (InterruptedException ie) {
+      stopper.checkCancelInProgress(ie);
+      Thread.currentThread().interrupt();
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/3f9be9a7/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java
index 1e38065..ee4e2df 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java
@@ -49,7 +49,7 @@ import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import 
org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
-import org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder;
+import org.apache.geode.internal.cache.event.EventSequenceNumberHolder;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
@@ -91,10 +91,8 @@ public class CreateRegionProcessor implements 
ProfileExchangeProcessor {
           logger.debug("CreateRegionProcessor.initializeRegion, no recipients, 
msg not sent");
         }
         this.newRegion.getDistributionAdvisor().setInitialized();
-        EventTracker tracker = ((LocalRegion) 
this.newRegion).getEventTracker();
-        if (tracker != null) {
-          tracker.setInitialized();
-        }
+
+        ((LocalRegion) this.newRegion).getEventTracker().setInitialized();
         return;
       }
 
@@ -138,13 +136,11 @@ public class CreateRegionProcessor implements 
ProfileExchangeProcessor {
         }
       } finally {
         replyProc.cleanup();
-        EventTracker tracker = ((LocalRegion) 
this.newRegion).getEventTracker();
-        if (tracker != null) {
-          tracker.setInitialized();
-        }
+        ((LocalRegion) this.newRegion).getEventTracker().setInitialized();
         if (((LocalRegion) this.newRegion).isUsedForPartitionedRegionBucket()) 
{
           if (logger.isDebugEnabled()) {
-            logger.debug("initialized bucket event tracker: {}", tracker);
+            logger.debug("initialized bucket event tracker: {}",
+                ((LocalRegion) this.newRegion).getEventTracker());
           }
         }
       }
@@ -203,12 +199,12 @@ public class CreateRegionProcessor implements 
ProfileExchangeProcessor {
           .getDistributedSystem(), members);
     }
 
-    private final Map<DistributedMember, Map<ThreadIdentifier, 
EventSeqnoHolder>> remoteEventStates =
+    private final Map<DistributedMember, Map<ThreadIdentifier, 
EventSequenceNumberHolder>> remoteEventStates =
         new ConcurrentHashMap<>();
 
     private boolean allMembersSkippedChecks = true;
 
-    public Map<ThreadIdentifier, EventSeqnoHolder> getEventState(
+    public Map<ThreadIdentifier, EventSequenceNumberHolder> getEventState(
         InternalDistributedMember provider) {
       return this.remoteEventStates.get(provider);
     }
@@ -254,7 +250,7 @@ public class CreateRegionProcessor implements 
ProfileExchangeProcessor {
           // Save all event states, need to initiate the event tracker from 
the GII provider
           if (reply.eventState != null) {
             remoteEventStates.put(reply.getSender(),
-                (Map<ThreadIdentifier, EventSeqnoHolder>) reply.eventState);
+                (Map<ThreadIdentifier, EventSequenceNumberHolder>) 
reply.eventState);
           }
 
           if (lr.isUsedForPartitionedRegionBucket()) {

http://git-wip-us.apache.org/repos/asf/geode/blob/3f9be9a7/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index eb18134..d9ed4ed 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -90,6 +90,8 @@ import 
org.apache.geode.internal.cache.InitialImageOperation.GIIStatus;
 import 
org.apache.geode.internal.cache.RemoteFetchVersionMessage.FetchVersionResponse;
 import 
org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
 import org.apache.geode.internal.cache.control.MemoryEvent;
+import org.apache.geode.internal.cache.event.DistributedEventTracker;
+import org.apache.geode.internal.cache.event.EventTracker;
 import 
org.apache.geode.internal.cache.execute.DistributedRegionFunctionExecutor;
 import 
org.apache.geode.internal.cache.execute.DistributedRegionFunctionResultSender;
 import 
org.apache.geode.internal.cache.execute.DistributedRegionFunctionResultWaiter;
@@ -98,7 +100,6 @@ import 
org.apache.geode.internal.cache.execute.LocalResultCollector;
 import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
 import 
org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
 import org.apache.geode.internal.cache.lru.LRUEntry;
-import org.apache.geode.internal.cache.partitioned.Bucket;
 import 
org.apache.geode.internal.cache.persistence.CreatePersistentRegionProcessor;
 import org.apache.geode.internal.cache.persistence.PersistenceAdvisor;
 import org.apache.geode.internal.cache.persistence.PersistenceAdvisorImpl;
@@ -255,9 +256,10 @@ public class DistributedRegion extends LocalRegion 
implements CacheDistributionA
   }
 
   @Override
-  public void createEventTracker() {
-    this.eventTracker = new EventTracker(this);
-    this.eventTracker.start();
+  protected EventTracker createEventTracker() {
+    EventTracker tracker = new DistributedEventTracker(cache, stopper, 
getName());
+    tracker.start();
+    return tracker;
   }
 
   /**
@@ -493,6 +495,38 @@ public class DistributedRegion extends LocalRegion 
implements CacheDistributionA
     }
   }
 
+  @Override
+  public boolean hasSeenEvent(EntryEventImpl event) {
+    boolean isDuplicate = false;
+
+    isDuplicate = getEventTracker().hasSeenEvent(event);
+    if (isDuplicate) {
+      markEventAsDuplicate(event);
+    } else {
+      // bug #48205 - a retried PR operation may already have a version 
assigned to it
+      // in another VM
+      if (event.isPossibleDuplicate() && 
event.getRegion().concurrencyChecksEnabled
+          && event.getVersionTag() == null && event.getEventId() != null) {
+        boolean isBulkOp = event.getOperation().isPutAll() || 
event.getOperation().isRemoveAll();
+        VersionTag tag =
+            FindVersionTagOperation.findVersionTag(event.getRegion(), 
event.getEventId(), isBulkOp);
+        event.setVersionTag(tag);
+      }
+    }
+    return isDuplicate;
+  }
+
+  private void markEventAsDuplicate(EntryEventImpl event) {
+    event.setPossibleDuplicate(true);
+    if (concurrencyChecksEnabled && event.getVersionTag() == null) {
+      if (event.isBulkOpInProgress()) {
+        
event.setVersionTag(getEventTracker().findVersionTagForBulkOp(event.getEventId()));
+      } else {
+        
event.setVersionTag(getEventTracker().findVersionTagForSequence(event.getEventId()));
+      }
+    }
+  }
+
   void setGeneratedVersionTag(boolean generateVersionTag) {
     // there is at-least one other persistent member, so turn on 
concurrencyChecks
     enableConcurrencyChecks();
@@ -1038,9 +1072,7 @@ public class DistributedRegion extends LocalRegion 
implements CacheDistributionA
       // makes sure all latches are released if they haven't been already
       super.initialize(null, null, null);
     } finally {
-      if (this.eventTracker != null) {
-        this.eventTracker.setInitialized();
-      }
+      getEventTracker().setInitialized();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/3f9be9a7/geode-core/src/main/java/org/apache/geode/internal/cache/EventStateHelper.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/EventStateHelper.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/EventStateHelper.java
index 9bc5e3a..ec427bb 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/EventStateHelper.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/EventStateHelper.java
@@ -19,7 +19,7 @@ import 
org.apache.geode.distributed.internal.InternalDistributedSystem;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.InternalDataSerializer;
 import 
org.apache.geode.internal.cache.CreateRegionProcessor.CreateRegionReplyMessage;
-import org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder;
+import org.apache.geode.internal.cache.event.EventSequenceNumberHolder;
 import 
org.apache.geode.internal.cache.InitialImageOperation.RegionStateMessage;
 import 
org.apache.geode.internal.cache.ha.HARegionQueue.DispatchedAndCurrentEvents;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
@@ -89,7 +89,7 @@ public class EventStateHelper {
           DispatchedAndCurrentEvents value = (DispatchedAndCurrentEvents) 
entry.getValue();
           InternalDataSerializer.invokeToData(value, dop);
         } else {
-          EventSeqnoHolder value = (EventSeqnoHolder) entry.getValue();
+          EventSequenceNumberHolder value = (EventSequenceNumberHolder) 
entry.getValue();
           InternalDataSerializer.invokeToData(value, dop);
         }
       }
@@ -130,11 +130,11 @@ public class EventStateHelper {
           InternalDataSerializer.invokeFromData(value, dip);
           eventState.put(key, value);
         } else {
-          EventSeqnoHolder value = new EventSeqnoHolder();
+          EventSequenceNumberHolder value = new EventSequenceNumberHolder();
           InternalDataSerializer.invokeFromData(value, dip);
           eventState.put(key, value);
-          if (value.versionTag != null) {
-            value.versionTag.replaceNullIDs(senderId);
+          if (value.getVersionTag() != null) {
+            value.getVersionTag().replaceNullIDs(senderId);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/geode/blob/3f9be9a7/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
deleted file mode 100644
index b919043..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
+++ /dev/null
@@ -1,790 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.DataSerializable;
-import org.apache.geode.DataSerializer;
-import org.apache.geode.cache.client.PoolFactory;
-import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.SystemTimer.SystemTimerTask;
-import org.apache.geode.internal.cache.ha.ThreadIdentifier;
-import org.apache.geode.internal.cache.versions.RegionVersionVector;
-import org.apache.geode.internal.cache.versions.VersionTag;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.log4j.LogMarker;
-import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch;
-
-/**
- * EventTracker tracks the last sequence number for a particular 
memberID:threadID. It is used to
- * avoid replaying events in client/server and partitioned-region 
configurations.
- * 
- * @since GemFire 6.0
- */
-public class EventTracker {
-  private static final Logger logger = LogService.getLogger();
-
-  /**
-   * a mapping of originator to the last event applied to this cache
-   *
-   * Keys are instances of {@link ThreadIdentifier}, values are instances of
-   * {@link org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder}.
-   */
-  protected final ConcurrentMap<ThreadIdentifier, EventSeqnoHolder> 
recordedEvents =
-      new ConcurrentHashMap<ThreadIdentifier, EventSeqnoHolder>(100);
-
-  /**
-   * a mapping of originator to bulkOps
-   *
-   * Keys are instances of @link {@link ThreadIdentifier}
-   */
-  private final ConcurrentMap<ThreadIdentifier, Object> recordedBulkOps =
-      new ConcurrentHashMap<ThreadIdentifier, Object>(100);
-
-  /**
-   * a mapping of originator to bulkOperation's last version tags. This map 
differs from
-   * {@link #recordedBulkOps} in that the thread identifier used here is the 
base member id and
-   * thread id of the bulk op, as opposed to the fake thread id which is 
assigned for each bucket.
-   * 
-   * recordedBulkOps are also only tracked on the secondary for partitioned 
regions
-   * recordedBulkOpVersionTags are tracked on both the primary and secondary.
-   *
-   * Keys are instances of @link {@link ThreadIdentifier}, values are 
instances of
-   * {@link BulkOpHolder}.
-   */
-  private final ConcurrentMap<ThreadIdentifier, BulkOpHolder> 
recordedBulkOpVersionTags =
-      new ConcurrentHashMap<ThreadIdentifier, BulkOpHolder>(100);
-
-  /**
-   * The member that the region corresponding to this tracker (if any) 
received its initial image
-   * from (if a replicate)
-   */
-  private volatile InternalDistributedMember initialImageProvider;
-
-  /**
-   * The cache associated with this tracker
-   */
-  InternalCache cache;
-
-  /**
-   * The name of this tracker
-   */
-  String name;
-
-  /**
-   * whether or not this tracker has been initialized to allow entry 
operation. replicate region
-   * does not initiate event tracker from its replicates.
-   */
-  volatile boolean initialized;
-
-  /**
-   * object used to wait for initialization
-   */
-  final StoppableCountDownLatch initializationLatch;
-
-  /**
-   * Initialize the EventTracker's timer task. This is stored in the cache for 
tracking and shutdown
-   * purposes
-   *
-   * @param cache the cache to schedule tasks with
-   */
-  public static ExpiryTask startTrackerServices(InternalCache cache) {
-    long expiryTime = Long.getLong(DistributionConfig.GEMFIRE_PREFIX + 
"messageTrackingTimeout",
-        PoolFactory.DEFAULT_SUBSCRIPTION_MESSAGE_TRACKING_TIMEOUT / 3);
-    ExpiryTask result = new ExpiryTask(cache, expiryTime);
-    cache.getCCPTimer().scheduleAtFixedRate(result, expiryTime, expiryTime);
-    // schedule(result, expiryTime);
-    return result;
-  }
-
-  /**
-   * Terminate the tracker's timer task
-   *
-   * @param cache the cache holding the tracker task
-   */
-  public static void stopTrackerServices(InternalCache cache) {
-    cache.getEventTrackerTask().cancel();
-  }
-
-  /**
-   * Create an event tracker
-   * 
-   * @param region the cache region to associate with this tracker
-   */
-  public EventTracker(LocalRegion region) {
-    this.cache = region.cache;
-    this.name = "Event Tracker for " + region.getName();
-    this.initializationLatch = new 
StoppableCountDownLatch(region.getStopper(), 1);
-  }
-
-  /** start this event tracker */
-  public void start() {
-    if (this.cache.getEventTrackerTask() != null) {
-      this.cache.getEventTrackerTask().addTracker(this);
-    }
-  }
-
-  /** stop this event tracker */
-  public void stop() {
-    if (this.cache.getEventTrackerTask() != null) {
-      this.cache.getEventTrackerTask().removeTracker(this);
-    }
-  }
-
-  /**
-   * retrieve a deep copy of the state of the event tracker. Synchronization 
is not used while
-   * copying the tracker's state.
-   */
-  public Map<ThreadIdentifier, EventSeqnoHolder> getState() {
-    Map<ThreadIdentifier, EventSeqnoHolder> result =
-        new HashMap<ThreadIdentifier, EventSeqnoHolder>(recordedEvents.size());
-    for (Iterator<Map.Entry<ThreadIdentifier, EventSeqnoHolder>> it =
-        recordedEvents.entrySet().iterator(); it.hasNext();) {
-      Map.Entry<ThreadIdentifier, EventSeqnoHolder> entry = it.next();
-      EventSeqnoHolder holder = entry.getValue();
-      result.put(entry.getKey(), new EventSeqnoHolder(holder.lastSeqno, 
null)); // don't transfer
-                                                                               
 // version tags -
-                                                                               
 // adds too much
-                                                                               
 // bulk just so we
-                                                                               
 // can do client tag
-                                                                               
 // recovery
-    }
-    return result;
-  }
-
-  /**
-   * record the given state in the tracker.
-   * 
-   * @param provider the member that provided this state
-   * @param state a Map obtained from getState();
-   */
-  public void recordState(InternalDistributedMember provider,
-      Map<ThreadIdentifier, EventSeqnoHolder> state) {
-    this.initialImageProvider = provider;
-    StringBuffer sb = null;
-    if (logger.isDebugEnabled()) {
-      sb = new StringBuffer(200);
-      sb.append("Recording initial state for ").append(this.name).append(": ");
-    }
-    for (Iterator<Map.Entry<ThreadIdentifier, EventSeqnoHolder>> it =
-        state.entrySet().iterator(); it.hasNext();) {
-      Map.Entry<ThreadIdentifier, EventSeqnoHolder> entry = it.next();
-      if (sb != null) {
-        sb.append("\n  ").append(entry.getKey().expensiveToString()).append("; 
sequenceID=")
-            .append(entry.getValue());
-      }
-      // record only if we haven't received an event that is newer
-      recordSeqno(entry.getKey(), entry.getValue(), true);
-    }
-    if (sb != null) {
-      logger.debug(sb);
-    }
-    // fix for bug 41622 - hang in GII. This keeps ops from waiting for the
-    // full GII to complete
-    setInitialized();
-  }
-
-  /**
-   * Use this method to ensure that the tracker is put in an initialized state
-   */
-  public void setInitialized() {
-    this.initializationLatch.countDown();
-    this.initialized = true;
-  }
-
-  /**
-   * Wait for the tracker to finishe being initialized
-   */
-  public void waitOnInitialization() throws InterruptedException {
-    this.initializationLatch.await();
-  }
-
-  /**
-   * Record an event sequence id if it is higher than what we currently have. 
This is intended for
-   * use during initial image transfer.
-   * 
-   * @param membershipID the key of an entry in the map obtained from 
getEventState()
-   * @param evhObj the value of an entry in the map obtained from 
getEventState()
-   */
-  protected void recordSeqno(ThreadIdentifier membershipID, EventSeqnoHolder 
evhObj) {
-    recordSeqno(membershipID, evhObj, false);
-  }
-
-  /**
-   * Record an event sequence id if it is higher than what we currently have. 
This is intended for
-   * use during initial image transfer.
-   * 
-   * @param threadID the key of an entry in the map obtained from 
getEventState()
-   * @param evh the value of an entry in the map obtained from getEventState()
-   * @param ifAbsent only record this state if there's not already an entry 
for this memberID
-   */
-  private void recordSeqno(ThreadIdentifier threadID, EventSeqnoHolder evh, 
boolean ifAbsent) {
-    boolean removed;
-    if (logger.isDebugEnabled()) {
-      logger.debug("recording {} {}", threadID.expensiveToString(), 
evh.toString());
-    }
-    do {
-      removed = false;
-      EventSeqnoHolder oldEvh = recordedEvents.putIfAbsent(threadID, evh);
-      if (oldEvh != null) {
-        synchronized (oldEvh) {
-          if (oldEvh.removed) {
-            // need to wait for an entry being removed by the sweeper to go 
away
-            removed = true;
-            continue;
-          } else {
-            if (ifAbsent) {
-              break;
-            }
-            oldEvh.endOfLifeTimer = 0;
-            if (oldEvh.lastSeqno < evh.lastSeqno) {
-              oldEvh.lastSeqno = evh.lastSeqno;
-              oldEvh.versionTag = evh.versionTag;
-              // Exception e = oldEvh.context;
-              // oldEvh.context = new Exception("stack trace");
-              // oldEvh.context.initCause(e);
-            }
-          }
-        }
-      } else {
-        evh.endOfLifeTimer = 0;
-        // evh.context = new Exception("stack trace");
-      }
-    } while (removed);
-  }
-
-  /** record the event's threadid/sequenceid to prevent replay */
-  public void recordEvent(InternalCacheEvent event) {
-    EventID eventID = event.getEventId();
-    if (ignoreEvent(event, eventID)) {
-      return; // not tracked
-    }
-
-    LocalRegion lr = (LocalRegion) event.getRegion();
-
-    ThreadIdentifier membershipID =
-        new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
-
-    VersionTag tag = null;
-    if (lr.getServerProxy() == null/* && event.hasClientOrigin() */) { // 
clients do not need to
-                                                                       // 
store version tags for
-                                                                       // 
replayed events
-      tag = event.getVersionTag();
-      RegionVersionVector v = ((LocalRegion) 
event.getRegion()).getVersionVector();
-      // bug #46453 - make sure ID references are canonical before storing
-      if (v != null && tag != null) {
-        tag.setMemberID(v.getCanonicalId(tag.getMemberID()));
-        if (tag.getPreviousMemberID() != null) {
-          tag.setPreviousMemberID(v.getCanonicalId(tag.getPreviousMemberID()));
-        }
-      }
-    }
-
-    EventSeqnoHolder newEvh = new EventSeqnoHolder(eventID.getSequenceID(), 
tag);
-    if (logger.isTraceEnabled()) {
-      logger.trace("region event tracker recording {}", event);
-    }
-    recordSeqno(membershipID, newEvh);
-
-    // If this is a bulkOp, and concurrency checks are enabled, we need to
-    // save the version tag in case we retry.
-    // Make recordBulkOp version tag after recordSeqno, so that 
recordBulkOpStart
-    // in a retry bulk op would not incorrectly remove the saved version tag in
-    // recordedBulkOpVersionTags
-    if (lr.getConcurrencyChecksEnabled()
-        && (event.getOperation().isPutAll() || 
event.getOperation().isRemoveAll())
-        && lr.getServerProxy() == null) {
-      recordBulkOpEvent(event, membershipID);
-    }
-  }
-
-  /**
-   * Record a version tag for a bulk operation
-   */
-  private void recordBulkOpEvent(InternalCacheEvent event, ThreadIdentifier 
tid) {
-    EventID eventID = event.getEventId();
-
-    VersionTag tag = event.getVersionTag();
-    if (tag == null) {
-      return;
-    }
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("recording bulkOp event {} {} {} op={}", 
tid.expensiveToString(), eventID, tag,
-          event.getOperation());
-    }
-
-    RegionVersionVector v = ((LocalRegion) 
event.getRegion()).getVersionVector();
-    // bug #46453 - make sure ID references are canonical before storing
-    if (v != null) {
-      tag.setMemberID(v.getCanonicalId(tag.getMemberID()));
-      if (tag.getPreviousMemberID() != null) {
-        tag.setPreviousMemberID(v.getCanonicalId(tag.getPreviousMemberID()));
-      }
-    }
-
-    // Loop until we can successfully update the recorded bulk operations
-    // For this thread id.
-    boolean retry = false;
-    do {
-      BulkOpHolder bulkOpTracker = recordedBulkOpVersionTags.get(tid);
-      if (bulkOpTracker == null) {
-        bulkOpTracker = new BulkOpHolder();
-        BulkOpHolder old = recordedBulkOpVersionTags.putIfAbsent(tid, 
bulkOpTracker);
-        if (old != null) {
-          retry = true;
-          continue;
-        }
-      }
-      synchronized (bulkOpTracker) {
-        if (bulkOpTracker.removed) {
-          retry = true;
-          continue;
-        }
-
-        // Add the version tag for bulkOp event.
-        bulkOpTracker.putVersionTag(eventID, event.getVersionTag());
-        retry = false;
-      }
-    } while (retry);
-  }
-
-  public boolean hasSeenEvent(InternalCacheEvent event) {
-    // ClientProxyMembershipID membershipID = event.getContext();
-    EventID eventID = event.getEventId();
-    if (ignoreEvent(event, eventID)) {
-      return false; // not tracked
-    }
-    return hasSeenEvent(eventID, event);
-  }
-
-  public boolean hasSeenEvent(EventID eventID) {
-    return hasSeenEvent(eventID, null);
-  }
-
-  public boolean hasSeenEvent(EventID eventID, InternalCacheEvent tagHolder) {
-    ThreadIdentifier membershipID =
-        new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
-    // if (membershipID == null || eventID == null) {
-    // return false;
-    // }
-
-    EventSeqnoHolder evh = recordedEvents.get(membershipID);
-    if (evh == null) {
-      return false;
-    }
-
-    synchronized (evh) {
-      if (evh.removed || evh.lastSeqno < eventID.getSequenceID()) {
-        return false;
-      }
-      // log at fine because partitioned regions can send event multiple times
-      // during normal operation during bucket region initialization
-      if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER)) {
-        logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER,
-            "Cache encountered replay of event with ID {}.  Highest recorded 
for this source is {}",
-            eventID, evh.lastSeqno);
-      }
-      // bug #44956 - recover version tag for duplicate event
-      if (evh.lastSeqno == eventID.getSequenceID() && tagHolder != null && 
evh.versionTag != null) {
-        ((EntryEventImpl) tagHolder).setVersionTag(evh.versionTag);
-      }
-      return true;
-    } // synchronized
-  }
-
-  public VersionTag findVersionTag(EventID eventID) {
-    ThreadIdentifier threadID =
-        new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
-
-    EventSeqnoHolder evh = recordedEvents.get(threadID);
-    if (evh == null) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("search for version tag failed as no event is recorded 
for {}",
-            threadID.expensiveToString());
-      }
-      return null;
-    }
-
-    synchronized (evh) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("search for version tag located last event for {}: {}",
-            threadID.expensiveToString(), evh);
-      }
-      if (evh.lastSeqno != eventID.getSequenceID()) {
-        return null;
-      }
-      // log at fine because partitioned regions can send event multiple times
-      // during normal operation during bucket region initialization
-      if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER) && 
evh.versionTag == null) {
-        logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER,
-            "Could not recover version tag.  Found event holder with no 
version tag for {}",
-            eventID);
-      }
-      return evh.versionTag;
-    } // synchronized
-  }
-
-  public VersionTag findVersionTagForGateway(EventID eventID) {
-    ThreadIdentifier threadID =
-        new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
-
-    EventSeqnoHolder evh = recordedEvents.get(threadID);
-    if (evh == null) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("search for version tag failed as no event is recorded 
for {}",
-            threadID.expensiveToString());
-      }
-      return null;
-    }
-
-    synchronized (evh) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("search for version tag located last event for {}: {} {}",
-            threadID.expensiveToString(), evh, eventID.getSequenceID());
-      }
-
-      if (evh.lastSeqno < eventID.getSequenceID()) {
-        return null;
-      }
-      // log at fine because partitioned regions can send event multiple times
-      // during normal operation during bucket region initialization
-      if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER) && 
evh.versionTag == null) {
-        logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER,
-            "Could not recover version tag.  Found event holder with no 
version tag for {}",
-            eventID);
-      }
-      return evh.versionTag;
-    } // synchronized
-  }
-
-
-  public VersionTag findVersionTagForBulkOp(EventID eventID) {
-    ThreadIdentifier threadID =
-        new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
-
-    BulkOpHolder evh = recordedBulkOpVersionTags.get(threadID);
-    if (evh == null) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("search for version tag failed as no events are recorded 
for {}",
-            threadID.expensiveToString());
-      }
-      return null;
-    }
-
-    synchronized (evh) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("search for version tag located event holder for {}: {}",
-            threadID.expensiveToString(), evh);
-      }
-      return evh.entryVersionTags.get(eventID);
-    } // synchronized
-  }
-
-  /**
-   * @return true if the event should not be tracked, false otherwise
-   */
-  private boolean ignoreEvent(InternalCacheEvent event, EventID eventID) {
-    if (eventID == null) {
-      return true;
-    } else {
-      boolean isVersioned = (event.getVersionTag() != null);
-      boolean isClient = event.hasClientOrigin();
-      if (isVersioned && isClient) {
-        return false; // version tags for client events are kept for retries 
by the client
-      }
-      boolean isEntry = event.getOperation().isEntry();
-      boolean isPr = 
event.getRegion().getAttributes().getDataPolicy().withPartitioning()
-          || ((LocalRegion) 
event.getRegion()).isUsedForPartitionedRegionBucket();
-      return (!isClient && // ignore if it originated on a server, and
-          isEntry && // it affects an entry and
-          !isPr); // is not on a PR
-    }
-  }
-
-  /**
-   * A routine to provide synchronization running based on <memberShipID, 
threadID> of the
-   * requesting client
-   * 
-   * @param r - a Runnable to wrap the processing of the bulk op
-   * @param eventID - the base event ID of the bulk op
-   *
-   * @since GemFire 5.7
-   */
-  public void syncBulkOp(Runnable r, EventID eventID) {
-    Assert.assertTrue(eventID != null);
-    ThreadIdentifier membershipID =
-        new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
-
-    Object opSyncObj = null;
-    do {
-      opSyncObj = recordedBulkOps.putIfAbsent(membershipID, new Object());
-      if (opSyncObj == null) {
-        opSyncObj = recordedBulkOps.get(membershipID);
-      }
-    } while (opSyncObj == null);
-
-    synchronized (opSyncObj) {
-      try {
-        recordBulkOpStart(membershipID, eventID);
-        // Perform the bulk op
-        r.run();
-      } finally {
-        recordedBulkOps.remove(membershipID);
-      }
-    }
-  }
-
-  /**
-   * Called when a new bulkOp is started on the local region. Used to clear 
event tracker state from
-   * the last bulkOp.
-   */
-  public void recordBulkOpStart(ThreadIdentifier tid, EventID eventID) {
-    if (logger.isDebugEnabled()) {
-      logger.debug("recording bulkOp start for {}", tid.expensiveToString());
-    }
-    EventSeqnoHolder evh = recordedEvents.get(tid);
-    if (evh == null) {
-      return;
-    }
-    synchronized (evh) {
-      // only remove it when a new bulk op occurs
-      if (eventID.getSequenceID() > evh.lastSeqno) {
-        this.recordedBulkOpVersionTags.remove(tid);
-      }
-    }
-  }
-
-  /**
-   * @return the initialized
-   */
-  public boolean isInitialized() {
-    return this.initialized;
-  }
-
-  /**
-   * @param mbr the member in question
-   * @return true if the given member provided the initial image event state 
for this tracker
-   */
-  public boolean isInitialImageProvider(DistributedMember mbr) {
-    return (this.initialImageProvider != null) && (mbr != null)
-        && this.initialImageProvider.equals(mbr);
-  }
-
-  /**
-   * Test method for getting the set of recorded version tags.
-   */
-  protected ConcurrentMap<ThreadIdentifier, BulkOpHolder> 
getRecordedBulkOpVersionTags() {
-    return recordedBulkOpVersionTags;
-  }
-
-  @Override
-  public String toString() {
-    return "" + this.name + "(initialized=" + this.initialized + ")";
-  }
-
-  /**
-   * A sequence number tracker to keep events from clients from being 
re-applied to the cache if
-   * they've already been seen.
-   * 
-   * @since GemFire 5.5
-   */
-  static class EventSeqnoHolder implements DataSerializable {
-    private static final long serialVersionUID = 8137262960763308046L;
-
-    /** event sequence number. These */
-    long lastSeqno = -1;
-
-    /** millisecond timestamp */
-    transient long endOfLifeTimer;
-
-    /** whether this entry is being removed */
-    transient boolean removed;
-
-    /**
-     * version tag, if any, for the operation
-     */
-    VersionTag versionTag;
-
-    // for debugging
-    // transient Exception context;
-
-    EventSeqnoHolder(long id, VersionTag versionTag) {
-      this.lastSeqno = id;
-      this.versionTag = versionTag;
-    }
-
-    public EventSeqnoHolder() {}
-
-    @Override
-    public String toString() {
-      StringBuilder result = new StringBuilder();
-      result.append("seqNo").append(this.lastSeqno);
-      if (this.versionTag != null) {
-        result.append(",").append(this.versionTag);
-      }
-      return result.toString();
-    }
-
-    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
-      lastSeqno = in.readLong();
-      versionTag = (VersionTag) DataSerializer.readObject(in);
-    }
-
-    public void toData(DataOutput out) throws IOException {
-      out.writeLong(lastSeqno);
-      DataSerializer.writeObject(versionTag, out);
-    }
-  }
-
-  /**
-   * A holder for the version tags generated for a bulk operation (putAll or 
removeAll). These
-   * version tags are retrieved when a bulk op is retried.
-   * 
-   * @since GemFire 7.0 protected for test purposes only.
-   */
-  protected static class BulkOpHolder {
-    /**
-     * Whether this object was removed by the cleanup thread.
-     */
-    public boolean removed;
-
-    /**
-     * public for tests only
-     */
-    public Map<EventID, VersionTag> entryVersionTags = new HashMap<EventID, 
VersionTag>();
-
-    /** millisecond timestamp */
-    transient long endOfLifeTimer;
-
-    /**
-     * creates a new instance to save status of a putAllOperation
-     */
-    BulkOpHolder() {
-      // do nothing
-    }
-
-    public void putVersionTag(EventID eventId, VersionTag versionTag) {
-      entryVersionTags.put(eventId, versionTag);
-      this.endOfLifeTimer = 0;
-    }
-
-
-    @Override
-    public String toString() {
-      return "BulkOpHolder tags=" + this.entryVersionTags;
-    }
-  }
-
-  public static class ExpiryTask extends SystemTimerTask {
-
-    InternalCache cache;
-    long expiryTime;
-    List trackers = new LinkedList();
-
-    public ExpiryTask(InternalCache cache, long expiryTime) {
-      this.cache = cache;
-      this.expiryTime = expiryTime;
-    }
-
-    void addTracker(EventTracker tracker) {
-      synchronized (trackers) {
-        trackers.add(tracker);
-      }
-    }
-
-    void removeTracker(EventTracker tracker) {
-      synchronized (trackers) {
-        trackers.remove(tracker);
-      }
-    }
-
-    int getNumberOfTrackers() {
-      return trackers.size();
-    }
-
-    @Override
-    public void run2() {
-      long now = System.currentTimeMillis();
-      long timeout = now - expiryTime;
-      final boolean traceEnabled = logger.isTraceEnabled();
-      synchronized (trackers) {
-        for (Iterator it = trackers.iterator(); it.hasNext();) {
-          EventTracker tracker = (EventTracker) it.next();
-          if (traceEnabled) {
-            logger.trace("{} sweeper: starting", tracker.name);
-          }
-          for (Iterator it2 = tracker.recordedEvents.entrySet().iterator(); 
it2.hasNext();) {
-            Map.Entry e = (Map.Entry) it2.next();
-            EventSeqnoHolder evh = (EventSeqnoHolder) e.getValue();
-            synchronized (evh) {
-              if (evh.endOfLifeTimer == 0) {
-                evh.endOfLifeTimer = now; // a new holder - start the timer
-              }
-              if (evh.endOfLifeTimer <= timeout) {
-                evh.removed = true;
-                evh.lastSeqno = -1;
-                if (traceEnabled) {
-                  logger.trace("{} sweeper: removing {}", tracker.name, 
e.getKey());
-                }
-                it2.remove();
-              }
-            }
-          }
-
-          // Remove bulk operations we're tracking
-          for (Iterator<Map.Entry<ThreadIdentifier, BulkOpHolder>> it2 =
-              tracker.recordedBulkOpVersionTags.entrySet().iterator(); 
it2.hasNext();) {
-            Map.Entry<ThreadIdentifier, BulkOpHolder> e = it2.next();
-            BulkOpHolder evh = e.getValue();
-            synchronized (evh) {
-              if (evh.endOfLifeTimer == 0) {
-                evh.endOfLifeTimer = now; // a new holder - start the timer
-              }
-              // Remove the PutAll tracker only if the put all is complete
-              // and it has expired.
-              if (evh.endOfLifeTimer <= timeout) {
-                evh.removed = true;
-                if (logger.isTraceEnabled()) {
-                  logger.trace("{} sweeper: removing bulkOp {}", tracker.name, 
e.getKey());
-                }
-                it2.remove();
-              }
-            }
-          }
-          if (traceEnabled) {
-            logger.trace("{} sweeper: done", tracker.name);
-          }
-        }
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/3f9be9a7/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java
index 199aafc..6434667 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java
@@ -132,7 +132,7 @@ public class FindVersionTagOperation {
           result = r.findVersionTagForClientBulkOp(eventId);
 
         } else {
-          result = r.findVersionTagForClientEvent(eventId);
+          result = r.findVersionTagForEvent(eventId);
         }
         if (result != null) {
           result.replaceNullIDs(r.getVersionMember());

http://git-wip-us.apache.org/repos/asf/geode/blob/3f9be9a7/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 2dda38c..de5fd88 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -77,6 +77,9 @@ import javax.transaction.TransactionManager;
 import com.sun.jna.Native;
 import com.sun.jna.Platform;
 import org.apache.commons.lang.StringUtils;
+
+import org.apache.geode.internal.cache.event.EventTracker;
+import org.apache.geode.internal.cache.event.EventTrackerExpiryTask;
 import org.apache.geode.internal.security.SecurityServiceFactory;
 import org.apache.logging.log4j.Logger;
 
@@ -474,7 +477,7 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
   /**
    * a system timer task for cleaning up old bridge thread event entries
    */
-  private final EventTracker.ExpiryTask recordedEventSweeper;
+  private final EventTrackerExpiryTask recordedEventSweeper;
 
   private final TombstoneService tombstoneService;
 
@@ -898,7 +901,7 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
             getOffHeapEvictor());
       }
 
-      this.recordedEventSweeper = EventTracker.startTrackerServices(this);
+      this.recordedEventSweeper = createEventTrackerExpiryTask();
       this.tombstoneService = TombstoneService.initialize(this);
 
       TypeRegistry.init();
@@ -941,6 +944,18 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
     } // synchronized
   }
 
+  /**
+   * Initialize the EventTracker's timer task. This is stored for tracking and 
shutdown purposes
+   */
+  private EventTrackerExpiryTask createEventTrackerExpiryTask() {
+    long lifetimeInMillis =
+        Long.getLong(DistributionConfig.GEMFIRE_PREFIX + 
"messageTrackingTimeout",
+            PoolFactory.DEFAULT_SUBSCRIPTION_MESSAGE_TRACKING_TIMEOUT / 3);
+    EventTrackerExpiryTask task = new EventTrackerExpiryTask(lifetimeInMillis);
+    getCCPTimer().scheduleAtFixedRate(task, lifetimeInMillis, 
lifetimeInMillis);
+    return task;
+  }
+
   @Override
   public SecurityService getSecurityService() {
     return this.securityService;
@@ -2347,8 +2362,7 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
 
         this.cachePerfStats.close();
         TXLockService.destroyServices();
-
-        EventTracker.stopTrackerServices(this);
+        getEventTrackerTask().cancel();
 
         synchronized (this.ccpTimerMutex) {
           if (this.ccpTimer != null) {
@@ -2744,7 +2758,7 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
    * @return the sweeper task
    */
   @Override
-  public EventTracker.ExpiryTask getEventTrackerTask() {
+  public EventTrackerExpiryTask getEventTrackerTask() {
     return this.recordedEventSweeper;
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/3f9be9a7/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
index 4cf8f41..baddff8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
@@ -39,6 +39,8 @@ import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.cache.event.EventTracker;
+import org.apache.geode.internal.cache.event.NonDistributedEventTracker;
 import org.apache.geode.internal.cache.ha.HARegionQueue;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
@@ -112,8 +114,9 @@ public class HARegion extends DistributedRegion {
       boolean ifOld, Object expectedOldValue, boolean requireOldValue) {}
 
   @Override
-  public void createEventTracker() {
+  public EventTracker createEventTracker() {
     // event trackers aren't needed for HARegions
+    return NonDistributedEventTracker.getInstance();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/geode/blob/3f9be9a7/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index 4c229db..aed439c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -53,6 +53,7 @@ import 
org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
 import org.apache.geode.internal.cache.control.ResourceAdvisor;
+import org.apache.geode.internal.cache.event.EventTrackerExpiryTask;
 import org.apache.geode.internal.cache.extension.Extensible;
 import org.apache.geode.internal.cache.persistence.BackupManager;
 import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
@@ -237,7 +238,7 @@ public interface InternalCache extends Cache, 
Extensible<Cache>, CacheTime {
 
   TXEntryStateFactory getTXEntryStateFactory();
 
-  EventTracker.ExpiryTask getEventTrackerTask();
+  EventTrackerExpiryTask getEventTrackerTask();
 
   void removeDiskStore(DiskStoreImpl diskStore);
 

http://git-wip-us.apache.org/repos/asf/geode/blob/3f9be9a7/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 3b3047f..6bee770 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -131,6 +131,8 @@ import 
org.apache.geode.internal.cache.control.InternalResourceManager.ResourceT
 import org.apache.geode.internal.cache.control.MemoryEvent;
 import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.cache.control.ResourceListener;
+import org.apache.geode.internal.cache.event.EventTracker;
+import org.apache.geode.internal.cache.event.NonDistributedEventTracker;
 import 
org.apache.geode.internal.cache.execute.DistributedRegionFunctionExecutor;
 import 
org.apache.geode.internal.cache.execute.DistributedRegionFunctionResultSender;
 import org.apache.geode.internal.cache.execute.LocalResultCollector;
@@ -326,9 +328,6 @@ public class LocalRegion extends AbstractRegion implements 
LoaderHelperFactory,
    */
   private final boolean supportsTX;
 
-  /** tracks threadID->seqno information for this region */
-  EventTracker eventTracker;
-
   /**
    * tracks region-level version information for members
    */
@@ -440,6 +439,8 @@ public class LocalRegion extends AbstractRegion implements 
LoaderHelperFactory,
 
   private final ImageState imageState;
 
+  private final EventTracker eventTracker;
+
   /**
    * Register interest count to track if any register interest is in progress 
for this region. This
    * count will be incremented when register interest starts and decremented 
when register interest
@@ -646,13 +647,16 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
         getDataPolicy().withReplication() || getDataPolicy().isPreloaded(),
         getAttributes().getDataPolicy().withPersistence(), this.stopper);
 
-    createEventTracker();
-
     // prevent internal regions from participating in a TX, bug 38709
     this.supportsTX = !isSecret() && !isUsedForPartitionedRegionAdmin() && 
!isUsedForMetaRegion()
         || isMetaRegionWithTransactions();
 
     this.testCallable = internalRegionArgs.getTestCallable();
+    eventTracker = createEventTracker();
+  }
+
+  protected EventTracker createEventTracker() {
+    return NonDistributedEventTracker.getInstance();
   }
 
   private RegionMap createRegionMap(InternalRegionArguments 
internalRegionArgs) {
@@ -676,27 +680,11 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
   }
 
   /**
-   * initialize the event tracker. Not all region implementations want or need 
one of these. Regions
-   * that require one should reimplement this method and create one like so:
-   *
-   * <pre>
-   * {@code
-   * this.eventTracker = new EventTracker(this.cache);
-   * this.eventTracker.start();
-   * }
-   * </pre>
+   * Other region classes may track events using different mechanisms than 
EventTrackers or may not
+   * track events at all
    */
-  void createEventTracker() {
-    // if LocalRegion is changed to have an event tracker, then the 
initialize()
-    // method should be changed to set it to "initialized" state when the
-    // region finishes initialization
-  }
-
-  /**
-   * Other region classes may track events using different mechanisms than 
EventTrackers
-   */
-  EventTracker getEventTracker() {
-    return this.eventTracker;
+  public EventTracker getEventTracker() {
+    return eventTracker;
   }
 
   /** returns the regions version-vector */
@@ -2559,9 +2547,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
       }
     }
 
-    if (this.eventTracker != null) {
-      this.eventTracker.stop();
-    }
+    getEventTracker().stop();
     if (logger.isTraceEnabled(LogMarker.RVV) && getVersionVector() != null) {
       logger.trace(LogMarker.RVV, "version vector for {} is {}", getName(),
           getVersionVector().fullToString());
@@ -2666,9 +2652,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
       }
       this.cache.setRegionByPath(getFullPath(), null);
 
-      if (this.eventTracker != null) {
-        this.eventTracker.stop();
-      }
+      getEventTracker().stop();
 
       if (this.diskRegion != null) {
         this.diskRegion.prepareForClose(this);
@@ -5923,11 +5907,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
    * is installed in the receiver of the image.
    */
   public Map<? extends DataSerializable, ? extends DataSerializable> 
getEventState() {
-    if (this.eventTracker != null) {
-      return this.eventTracker.getState();
-    } else {
-      return null;
-    }
+    return getEventTracker().getState();
   }
 
   /**
@@ -5939,9 +5919,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
    * @param state a Map obtained from getEventState()
    */
   protected void recordEventState(InternalDistributedMember provider, Map 
state) {
-    if (this.eventTracker != null) {
-      this.eventTracker.recordState(provider, state);
-    }
+    getEventTracker().recordState(provider, state);
   }
 
   /**
@@ -5965,9 +5943,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
    * record the event's sequenceId in Region's event state to prevent replay.
    */
   public void recordEvent(InternalCacheEvent event) {
-    if (this.eventTracker != null) {
-      this.eventTracker.recordEvent(event);
-    }
+    getEventTracker().recordEvent(event);
   }
 
   /**
@@ -5976,64 +5952,16 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
    * @return true if the Region's event state has seen the event
    */
   public boolean hasSeenEvent(EntryEventImpl event) {
-    boolean isDuplicate = false;
-
-    if (this.eventTracker != null) {
-      // bug 41289 - wait for event tracker to be initialized before checkin
-      // so that an operation intended for a previous version of a bucket
-      // is not prematurely applied to a new version of the bucket
-      if (this.isUsedForPartitionedRegionBucket()) {
-        try {
-          this.eventTracker.waitOnInitialization();
-        } catch (InterruptedException ie) {
-          this.stopper.checkCancelInProgress(ie);
-          Thread.currentThread().interrupt();
-        }
-      }
-
-      isDuplicate = this.eventTracker.hasSeenEvent(event);
-      if (isDuplicate) {
-        event.setPossibleDuplicate(true);
-        if (getConcurrencyChecksEnabled() && event.getVersionTag() == null) {
-          if (event.isBulkOpInProgress()) {
-            
event.setVersionTag(findVersionTagForClientBulkOp(event.getEventId()));
-          } else {
-            
event.setVersionTag(findVersionTagForClientEvent(event.getEventId()));
-          }
-        }
-      } else {
-        // bug #48205 - a retried PR operation may already have a version 
assigned to it
-        // in another VM
-        if (event.isPossibleDuplicate() && 
event.getRegion().concurrencyChecksEnabled
-            && event.getVersionTag() == null && event.getEventId() != null) {
-          boolean isBulkOp = event.getOperation().isPutAll() || 
event.getOperation().isRemoveAll();
-          VersionTag tag = 
FindVersionTagOperation.findVersionTag(event.getRegion(),
-              event.getEventId(), isBulkOp);
-          event.setVersionTag(tag);
-        }
-      }
-    }
-
-    return isDuplicate;
+    return getEventTracker().hasSeenEvent(event);
   }
 
   /**
-   * tries to find the version tag for a replayed client event
+   * tries to find the version tag for a event
    * 
    * @return the version tag, if known. Null if not
    */
-  public VersionTag findVersionTagForClientEvent(EventID eventId) {
-    if (this.eventTracker != null) {
-      return this.eventTracker.findVersionTag(eventId);
-    }
-    return null;
-  }
-
-  public VersionTag findVersionTagForGatewayEvent(EventID eventId) {
-    if (this.eventTracker != null) {
-      return this.eventTracker.findVersionTagForGateway(eventId);
-    }
-    return null;
+  public VersionTag findVersionTagForEvent(EventID eventId) {
+    return getEventTracker().findVersionTagForSequence(eventId);
   }
 
   /**
@@ -6042,13 +5970,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
    * @return the version tag, if known. Null if not
    */
   public VersionTag findVersionTagForClientBulkOp(EventID eventId) {
-    if (eventId == null) {
-      return null;
-    }
-    if (this.eventTracker != null) {
-      return this.eventTracker.findVersionTagForBulkOp(eventId);
-    }
-    return null;
+    return getEventTracker().findVersionTagForBulkOp(eventId);
   }
 
   /**
@@ -6061,25 +5983,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
    * @return true if the Region's event state has seen the event
    */
   public boolean hasSeenEvent(EventID eventID) {
-    if (eventID == null) {
-      return false;
-    }
-    boolean isDuplicate = false;
-    if (this.eventTracker != null) {
-      // bug 41289 - wait for event tracker to be initialized before checkin
-      // so that an operation intended for a previous version of a bucket
-      // is not prematurely applied to a new version of the bucket
-      if (this.isUsedForPartitionedRegionBucket()) {
-        try {
-          this.eventTracker.waitOnInitialization();
-        } catch (InterruptedException ie) {
-          this.stopper.checkCancelInProgress(ie);
-          Thread.currentThread().interrupt();
-        }
-      }
-      isDuplicate = this.eventTracker.hasSeenEvent(eventID, null);
-    }
-    return isDuplicate;
+    return getEventTracker().hasSeenEvent(eventID);
   }
 
   /**
@@ -6092,16 +5996,12 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
    * @since GemFire 5.7
    */
   void syncBulkOp(Runnable task, EventID eventId) {
-    if (this.eventTracker != null && !isTX()) {
-      this.eventTracker.syncBulkOp(task, eventId);
-    } else {
-      task.run();
-    }
+    getEventTracker().syncBulkOp(task, eventId, isTX());
   }
 
   public void recordBulkOpStart(ThreadIdentifier membershipID, EventID 
eventID) {
-    if (this.eventTracker != null && !isTX()) {
-      this.eventTracker.recordBulkOpStart(membershipID, eventID);
+    if (!isTX()) {
+      getEventTracker().recordBulkOpStart(eventID, membershipID);
     }
   }
 
@@ -7058,14 +6958,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
    * @return true if event state has been transfered to this region from 
another cache
    */
   boolean isEventTrackerInitialized() {
-    return this.eventTracker != null && this.eventTracker.isInitialized();
-  }
-
-  /**
-   * @return true if this region has an event tracker
-   */
-  boolean hasEventTracker() {
-    return this.eventTracker != null;
+    return getEventTracker().isInitialized();
   }
 
   public void acquireDestroyLock() {
@@ -7115,9 +7008,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
     this.destroyedSubregionSerialNumbers = collectSubregionSerialNumbers();
 
     try {
-      if (this.eventTracker != null) {
-        this.eventTracker.stop();
-      }
+      getEventTracker().stop();
 
       if (this.diskRegion != null) {
         // This was needed to fix bug 30937

http://git-wip-us.apache.org/repos/asf/geode/blob/3f9be9a7/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index c3aec13..27b442d 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -5448,16 +5448,11 @@ public class PartitionedRegion extends LocalRegion
   }
 
   @Override
-  void createEventTracker() {
-    // PR buckets maintain their own trackers. None is needed at this level
-  }
-
-  @Override
-  public VersionTag findVersionTagForClientEvent(EventID eventId) {
+  public VersionTag findVersionTagForEvent(EventID eventId) {
     if (this.dataStore != null) {
       Set<Map.Entry<Integer, BucketRegion>> bucketMap = 
this.dataStore.getAllLocalBuckets();
       for (Map.Entry<Integer, BucketRegion> entry : bucketMap) {
-        VersionTag result = 
entry.getValue().findVersionTagForClientEvent(eventId);
+        VersionTag result = entry.getValue().findVersionTagForEvent(eventId);
         if (result != null) {
           return result;
         }

http://git-wip-us.apache.org/repos/asf/geode/blob/3f9be9a7/geode-core/src/main/java/org/apache/geode/internal/cache/event/BulkOperationHolder.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/event/BulkOperationHolder.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/event/BulkOperationHolder.java
new file mode 100644
index 0000000..53c4bb5
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/event/BulkOperationHolder.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.event;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.versions.VersionTag;
+
+/**
+ * A holder for the version tags generated for a bulk operation (putAll or 
removeAll). These version
+ * tags are retrieved when a bulk op is retried.
+ *
+ * @since GemFire 7.0 protected for test purposes only.
+ */
+public class BulkOperationHolder {
+  /**
+   * Whether this object was removed by the cleanup thread.
+   */
+  private boolean removed;
+
+  /**
+   * public for tests only
+   */
+  private Map<EventID, VersionTag> entryVersionTags = new HashMap<>();
+
+  /** millisecond timestamp */
+  private transient long endOfLifeTimestamp;
+
+  /**
+   * creates a new instance to save status of a putAllOperation
+   */
+  BulkOperationHolder() {
+    // do nothing
+  }
+
+  void putVersionTag(EventID eventId, VersionTag versionTag) {
+    entryVersionTags.put(eventId, versionTag);
+    this.endOfLifeTimestamp = 0;
+  }
+
+  public Map<EventID, VersionTag> getEntryVersionTags() {
+    return entryVersionTags;
+  }
+
+  @Override
+  public String toString() {
+    return "BulkOperationHolder tags=" + this.entryVersionTags;
+  }
+
+  public synchronized boolean expire(long now, long expirationTime) {
+    if (endOfLifeTimestamp == 0) {
+      endOfLifeTimestamp = now; // a new holder - start the timer
+    }
+    boolean expired = false;
+    if (endOfLifeTimestamp <= expirationTime) {
+      removed = true;
+      expired = true;
+    }
+    return expired;
+  }
+
+  public boolean isRemoved() {
+    return removed;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/3f9be9a7/geode-core/src/main/java/org/apache/geode/internal/cache/event/DistributedEventTracker.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/event/DistributedEventTracker.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/event/DistributedEventTracker.java
new file mode 100644
index 0000000..ee0e8ff
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/event/DistributedEventTracker.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.event;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalCacheEvent;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch;
+
+
+public class DistributedEventTracker implements EventTracker {
+  private static final Logger logger = LogService.getLogger();
+
+  /**
+   * a mapping of originator to the last event applied to this cache
+   *
+   * Keys are instances of {@link ThreadIdentifier}, values are instances of
+   * {@link EventSequenceNumberHolder}.
+   */
+  private final ConcurrentMap<ThreadIdentifier, EventSequenceNumberHolder> 
recordedEvents =
+      new ConcurrentHashMap<>(100);
+
+  /**
+   * a mapping of originator to bulkOps
+   *
+   * Keys are instances of @link {@link ThreadIdentifier}
+   */
+  private final ConcurrentMap<ThreadIdentifier, Object> recordedBulkOps =
+      new ConcurrentHashMap<>(100);
+
+  /**
+   * a mapping of originator to bulkOperation's last version tags. This map 
differs from
+   * {@link #recordedBulkOps} in that the thread identifier used here is the 
base member id and
+   * thread id of the bulk op, as opposed to the fake thread id which is 
assigned for each bucket.
+   * 
+   * recordedBulkOps are also only tracked on the secondary for partitioned 
regions
+   * recordedBulkOpVersionTags are tracked on both the primary and secondary.
+   *
+   * Keys are instances of @link {@link ThreadIdentifier}, values are 
instances of
+   * {@link BulkOperationHolder}.
+   */
+  private final ConcurrentMap<ThreadIdentifier, BulkOperationHolder> 
recordedBulkOpVersionTags =
+      new ConcurrentHashMap<>(100);
+
+  /**
+   * The member that the region corresponding to this tracker (if any) 
received its initial image
+   * from (if a replicate)
+   */
+  private volatile InternalDistributedMember initialImageProvider;
+
+  /**
+   * The cache associated with this tracker
+   */
+  private InternalCache cache;
+
+  /**
+   * The name of this tracker
+   */
+  private String name;
+
+  /**
+   * whether or not this tracker has been initialized to allow entry 
operation. replicate region
+   * does not initiate event tracker from its replicates.
+   */
+  private volatile boolean initialized;
+
+  /**
+   * object used to wait for initialization
+   */
+  private final StoppableCountDownLatch initializationLatch;
+
+  /**
+   * Create an event tracker
+   * 
+   * @param cache the cache of the region to associate with this tracker
+   * @param stopper the CancelCriterion for the region
+   * @param regionName name of the region
+   */
+  public DistributedEventTracker(InternalCache cache, CancelCriterion stopper, 
String regionName) {
+
+    this.cache = cache;
+    this.name = "Event Tracker for " + regionName;
+    this.initializationLatch = new StoppableCountDownLatch(stopper, 1);
+  }
+
+  @Override
+  public void start() {
+    if (cache.getEventTrackerTask() != null) {
+      cache.getEventTrackerTask().addTracker(this);
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (cache.getEventTrackerTask() != null) {
+      cache.getEventTrackerTask().removeTracker(this);
+    }
+  }
+
+  @Override
+  public Map<ThreadIdentifier, EventSequenceNumberHolder> getState() {
+    Map<ThreadIdentifier, EventSequenceNumberHolder> result = new 
HashMap<>(recordedEvents.size());
+    for (Map.Entry<ThreadIdentifier, EventSequenceNumberHolder> entry : 
recordedEvents.entrySet()) {
+      EventSequenceNumberHolder holder = entry.getValue();
+      // don't transfer version tags - adds too much bulk just so we can do 
client tag recovery
+      result.put(entry.getKey(),
+          new EventSequenceNumberHolder(holder.getLastSequenceNumber(), null));
+    }
+    return result;
+  }
+
+  @Override
+  public void recordState(InternalDistributedMember provider,
+      Map<ThreadIdentifier, EventSequenceNumberHolder> state) {
+    this.initialImageProvider = provider;
+    StringBuffer sb = null;
+    if (logger.isDebugEnabled()) {
+      sb = new StringBuffer(200);
+      sb.append("Recording initial state for ").append(this.name).append(": ");
+    }
+    for (Map.Entry<ThreadIdentifier, EventSequenceNumberHolder> entry : 
state.entrySet()) {
+      if (sb != null) {
+        sb.append("\n  ").append(entry.getKey().expensiveToString()).append("; 
sequenceID=")
+            .append(entry.getValue());
+      }
+      // record only if we haven't received an event that is newer
+      recordSequenceNumber(entry.getKey(), entry.getValue(), true);
+    }
+    if (sb != null) {
+      logger.debug(sb);
+    }
+    // fix for bug 41622 - hang in GII. This keeps ops from waiting for the
+    // full GII to complete
+    setInitialized();
+  }
+
+  @Override
+  public void setInitialized() {
+    initializationLatch.countDown();
+    initialized = true;
+  }
+
+  @Override
+  public void waitOnInitialization() throws InterruptedException {
+    initializationLatch.await();
+  }
+
+  /**
+   * Record an event sequence id if it is higher than what we currently have. 
This is intended for
+   * use during initial image transfer.
+   * 
+   * @param membershipID the key of an entry in the map obtained from 
getEventState()
+   * @param evhObj the value of an entry in the map obtained from 
getEventState()
+   */
+  protected void recordSequenceNumber(ThreadIdentifier membershipID,
+      EventSequenceNumberHolder evhObj) {
+    recordSequenceNumber(membershipID, evhObj, false);
+  }
+
+  /**
+   * Record an event sequence id if it is higher than what we currently have. 
This is intended for
+   * use during initial image transfer.
+   *
+   * @param threadID the key of an entry in the map obtained from 
getEventState()
+   * @param evh the value of an entry in the map obtained from getEventState()
+   * @param ifAbsent only record this state if there's not already an entry 
for this memberID
+   */
+  private void recordSequenceNumber(ThreadIdentifier threadID, 
EventSequenceNumberHolder evh,
+      boolean ifAbsent) {
+    boolean removed;
+    if (logger.isDebugEnabled()) {
+      logger.debug("recording {} {}", threadID.expensiveToString(), 
evh.toString());
+    }
+    do {
+      removed = false;
+      EventSequenceNumberHolder oldEvh = recordedEvents.putIfAbsent(threadID, 
evh);
+      if (oldEvh != null) {
+        synchronized (oldEvh) {
+          if (oldEvh.isRemoved()) {
+            // need to wait for an entry being removed by the sweeper to go 
away
+            removed = true;
+            continue;
+          } else {
+            if (ifAbsent) {
+              break;
+            }
+            oldEvh.setEndOfLifeTimestamp(0);
+            if (oldEvh.getLastSequenceNumber() < evh.getLastSequenceNumber()) {
+              oldEvh.setLastSequenceNumber(evh.getLastSequenceNumber());
+              oldEvh.setVersionTag(evh.getVersionTag());
+            }
+          }
+        }
+      } else {
+        evh.setEndOfLifeTimestamp(0);
+      }
+    } while (removed);
+  }
+
+  @Override
+  public void recordEvent(InternalCacheEvent event) {
+    EventID eventID = event.getEventId();
+    if (ignoreEvent(event, eventID)) {
+      return; // not tracked
+    }
+
+    LocalRegion lr = (LocalRegion) event.getRegion();
+    ThreadIdentifier membershipID = createThreadIDFromEvent(eventID);
+
+    VersionTag tag = null;
+    if (lr.getServerProxy() == null) {
+      tag = event.getVersionTag();
+      RegionVersionVector v = ((LocalRegion) 
event.getRegion()).getVersionVector();
+      canonicalizeIDs(tag, v);
+    }
+
+    EventSequenceNumberHolder newEvh = new 
EventSequenceNumberHolder(eventID.getSequenceID(), tag);
+    if (logger.isTraceEnabled()) {
+      logger.trace("region event tracker recording {}", event);
+    }
+    recordSequenceNumber(membershipID, newEvh);
+
+    // If this is a bulkOp, and concurrency checks are enabled, we need to
+    // save the version tag in case we retry.
+    // Make recordBulkOp version tag after recordSequenceNumber, so that 
recordBulkOpStart
+    // in a retry bulk op would not incorrectly remove the saved version tag in
+    // recordedBulkOpVersionTags
+    if (lr.getConcurrencyChecksEnabled()
+        && (event.getOperation().isPutAll() || 
event.getOperation().isRemoveAll())
+        && lr.getServerProxy() == null) {
+      recordBulkOpEvent(event, membershipID);
+    }
+  }
+
+  private ThreadIdentifier createThreadIDFromEvent(EventID eventID) {
+    return new ThreadIdentifier(eventID.getMembershipID(), 
eventID.getThreadID());
+  }
+
+  /**
+   * Record a version tag for a bulk operation.
+   */
+  private void recordBulkOpEvent(InternalCacheEvent event, ThreadIdentifier 
threadID) {
+    EventID eventID = event.getEventId();
+
+    VersionTag tag = event.getVersionTag();
+    if (tag == null) {
+      return;
+    }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("recording bulkOp event {} {} {} op={}", 
threadID.expensiveToString(), eventID,
+          tag, event.getOperation());
+    }
+
+    RegionVersionVector versionVector = ((LocalRegion) 
event.getRegion()).getVersionVector();
+    canonicalizeIDs(tag, versionVector);
+
+    // Loop until we can successfully update the recorded bulk operations
+    // For this thread id.
+    boolean retry = false;
+    do {
+      BulkOperationHolder bulkOpTracker = 
recordedBulkOpVersionTags.get(threadID);
+      if (bulkOpTracker == null) {
+        bulkOpTracker = new BulkOperationHolder();
+        BulkOperationHolder old = 
recordedBulkOpVersionTags.putIfAbsent(threadID, bulkOpTracker);
+        if (old != null) {
+          retry = true;
+          continue;
+        }
+      }
+      synchronized (bulkOpTracker) {
+        if (bulkOpTracker.isRemoved()) {
+          retry = true;
+          continue;
+        }
+
+        // Add the version tag for bulkOp event.
+        bulkOpTracker.putVersionTag(eventID, event.getVersionTag());
+        retry = false;
+      }
+    } while (retry);
+  }
+
+  private void canonicalizeIDs(VersionTag tag, RegionVersionVector 
versionVector) {
+    if (tag != null && versionVector != null) {
+      tag.setMemberID(versionVector.getCanonicalId(tag.getMemberID()));
+      if (tag.getPreviousMemberID() != null) {
+        
tag.setPreviousMemberID(versionVector.getCanonicalId(tag.getPreviousMemberID()));
+      }
+    }
+  }
+
+  @Override
+  public boolean hasSeenEvent(InternalCacheEvent event) {
+    EventID eventID = event.getEventId();
+    if (ignoreEvent(event, eventID)) {
+      return false; // not tracked
+    }
+    return hasSeenEvent(eventID, event);
+  }
+
+  @Override
+  public boolean hasSeenEvent(EventID eventID) {
+    return hasSeenEvent(eventID, null);
+  }
+
+  @Override
+  public boolean hasSeenEvent(EventID eventID, InternalCacheEvent tagHolder) {
+    if (eventID == null) {
+      return false;
+    }
+
+    EventSequenceNumberHolder evh = getSequenceHolderForEvent(eventID);
+    if (evh == null) {
+      return false;
+    }
+
+    synchronized (evh) {
+      if (evh.isRemoved() || evh.getLastSequenceNumber() < 
eventID.getSequenceID()) {
+        return false;
+      }
+      // log at fine because partitioned regions can send event multiple times
+      // during normal operation during bucket region initialization
+      if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER)) {
+        logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER,
+            "Cache encountered replay of event with ID {}.  Highest recorded 
for this source is {}",
+            eventID, evh.getLastSequenceNumber());
+      }
+      // bug #44956 - recover version tag for duplicate event
+      if (evh.getLastSequenceNumber() == eventID.getSequenceID() && tagHolder 
!= null
+          && evh.getVersionTag() != null) {
+        ((EntryEventImpl) tagHolder).setVersionTag(evh.getVersionTag());
+      }
+      return true;
+    }
+  }
+
+  private EventSequenceNumberHolder getSequenceHolderForEvent(EventID eventID) 
{
+    ThreadIdentifier membershipID = createThreadIDFromEvent(eventID);
+    return recordedEvents.get(membershipID);
+  }
+
+  @Override
+  public VersionTag findVersionTagForSequence(EventID eventID) {
+    EventSequenceNumberHolder evh = getSequenceHolderForEvent(eventID);
+    if (evh == null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("search for version tag failed as no event is recorded 
for {}",
+            createThreadIDFromEvent(eventID).expensiveToString());
+      }
+      return null;
+    }
+
+    synchronized (evh) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("search for version tag located last event for {}: {}",
+            createThreadIDFromEvent(eventID).expensiveToString(), evh);
+      }
+      if (evh.getLastSequenceNumber() != eventID.getSequenceID()) {
+        return null;
+      }
+      // log at fine because partitioned regions can send event multiple times
+      // during normal operation during bucket region initialization
+      if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER)
+          && evh.getVersionTag() == null) {
+        logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER,
+            "Could not recover version tag.  Found event holder with no 
version tag for {}",
+            eventID);
+      }
+      return evh.getVersionTag();
+    }
+  }
+
+  @Override
+  public VersionTag findVersionTagForBulkOp(EventID eventID) {
+    if (eventID == null) {
+      return null;
+    }
+    ThreadIdentifier threadID = createThreadIDFromEvent(eventID);
+    BulkOperationHolder evh = recordedBulkOpVersionTags.get(threadID);
+    if (evh == null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("search for version tag failed as no events are recorded 
for {}",
+            threadID.expensiveToString());
+      }
+      return null;
+    }
+
+    synchronized (evh) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("search for version tag located event holder for {}: {}",
+            threadID.expensiveToString(), evh);
+      }
+      return evh.getEntryVersionTags().get(eventID);
+    }
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * @return true if the event should not be tracked, false otherwise
+   */
+  private boolean ignoreEvent(InternalCacheEvent event, EventID eventID) {
+    if (eventID == null) {
+      return true;
+    } else {
+      boolean isVersioned = (event.getVersionTag() != null);
+      boolean isClient = event.hasClientOrigin();
+      if (isVersioned && isClient) {
+        return false; // version tags for client events are kept for retries 
by the client
+      }
+      boolean isEntry = event.getOperation().isEntry();
+      boolean isPr = 
event.getRegion().getAttributes().getDataPolicy().withPartitioning()
+          || ((LocalRegion) 
event.getRegion()).isUsedForPartitionedRegionBucket();
+      return (!isClient && // ignore if it originated on a server, and
+          isEntry && // it affects an entry and
+          !isPr); // is not on a PR
+    }
+  }
+
+  @Override
+  public void syncBulkOp(Runnable r, EventID eventID, boolean 
partOfTransaction) {
+    if (partOfTransaction) {
+      r.run();
+      return;
+    }
+    Assert.assertTrue(eventID != null);
+    ThreadIdentifier membershipID = createThreadIDFromEvent(eventID);
+    Object opSyncObj = null;
+    do {
+      opSyncObj = recordedBulkOps.putIfAbsent(membershipID, new Object());
+      if (opSyncObj == null) {
+        opSyncObj = recordedBulkOps.get(membershipID);
+      }
+    } while (opSyncObj == null);
+
+    synchronized (opSyncObj) {
+      try {
+        recordBulkOpStart(eventID, membershipID);
+        // Perform the bulk op
+        r.run();
+      } finally {
+        recordedBulkOps.remove(membershipID);
+      }
+    }
+  }
+
+  @Override
+  public void recordBulkOpStart(EventID eventID, ThreadIdentifier tid) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("recording bulkOp start for {}", tid.expensiveToString());
+    }
+    EventSequenceNumberHolder evh = recordedEvents.get(tid);
+    if (evh == null) {
+      return;
+    }
+    synchronized (evh) {
+      // only remove it when a new bulk op occurs
+      if (eventID.getSequenceID() > evh.getLastSequenceNumber()) {
+        this.recordedBulkOpVersionTags.remove(tid);
+      }
+    }
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return this.initialized;
+  }
+
+  @Override
+  public boolean isInitialImageProvider(DistributedMember mbr) {
+    return (this.initialImageProvider != null) && (mbr != null)
+        && this.initialImageProvider.equals(mbr);
+  }
+
+  public ConcurrentMap<ThreadIdentifier, BulkOperationHolder> 
getRecordedBulkOpVersionTags() {
+    return recordedBulkOpVersionTags;
+  }
+
+  @Override
+  public ConcurrentMap<ThreadIdentifier, EventSequenceNumberHolder> 
getRecordedEvents() {
+    return recordedEvents;
+  }
+
+  @Override
+  public String toString() {
+    return "" + this.name + "(initialized=" + this.initialized + ")";
+  }
+
+}

Reply via email to