GEODE-2860: Refactor use of EventTracker * change EventTracker to an interface with two implementations * move as much logic out of LocalRegion down into subclasses that make use EventTracker * move and refactor static inner classes in EventTracker into own class files * migrate some of event-focused classes into a new sub package * add tests for existing logic from EventTracker
This closes #638 Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/9e7696a6 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/9e7696a6 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/9e7696a6 Branch: refs/heads/feature/GEM-1483 Commit: 9e7696a64621cc05122e065e492cc5e000e96622 Parents: 27aa7d3 Author: Nick Reich <nre...@pivotal.io> Authored: Fri Jun 30 16:05:58 2017 -0700 Committer: Darrel Schneider <dschnei...@pivotal.io> Committed: Mon Jul 17 17:15:10 2017 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/BucketRegion.java | 36 +- .../internal/cache/CreateRegionProcessor.java | 22 +- .../geode/internal/cache/DistributedRegion.java | 46 +- .../geode/internal/cache/EventStateHelper.java | 10 +- .../geode/internal/cache/EventTracker.java | 790 ------------------- .../internal/cache/FindVersionTagOperation.java | 2 +- .../geode/internal/cache/GemFireCacheImpl.java | 24 +- .../apache/geode/internal/cache/HARegion.java | 5 +- .../geode/internal/cache/InternalCache.java | 3 +- .../geode/internal/cache/LocalRegion.java | 167 +--- .../geode/internal/cache/PartitionedRegion.java | 9 +- .../cache/event/BulkOperationHolder.java | 79 ++ .../cache/event/DistributedEventTracker.java | 523 ++++++++++++ .../cache/event/EventSequenceNumberHolder.java | 124 +++ .../internal/cache/event/EventTracker.java | 136 ++++ .../cache/event/EventTrackerExpiryTask.java | 97 +++ .../cache/event/NonDistributedEventTracker.java | 135 ++++ .../cache/tier/sockets/BaseCommand.java | 4 +- .../wan/serial/SerialGatewaySenderQueue.java | 6 +- .../internal/cache/xmlcache/CacheCreation.java | 4 +- .../internal/cache/BucketRegionJUnitTest.java | 4 + .../cache/DistributedRegionJUnitTest.java | 11 +- .../internal/cache/EventTrackerDUnitTest.java | 486 ------------ .../geode/internal/cache/EventTrackerTest.java | 94 --- .../geode/internal/cache/IteratorDUnitTest.java | 2 +- .../cache/PartitionedRegionDUnitTestCase.java | 4 +- ...onedRegionSingleNodeOperationsJUnitTest.java | 11 +- .../event/DistributedEventTrackerTest.java | 328 ++++++++ .../cache/event/EventTrackerDUnitTest.java | 489 ++++++++++++ .../cache/event/EventTrackerExpiryTaskTest.java | 94 +++ .../event/NonDistributedEventTrackerTest.java | 89 +++ .../sanctionedDataSerializables.txt | 8 +- 32 files changed, 2262 insertions(+), 1580 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index 31b341a..30ce9e7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -30,7 +30,7 @@ import org.apache.geode.internal.HeapDataOutputStream; import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.BucketAdvisor.BucketProfile; import org.apache.geode.internal.cache.CreateRegionProcessor.CreateRegionReplyProcessor; -import org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder; +import org.apache.geode.internal.cache.event.EventSequenceNumberHolder; import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo; import org.apache.geode.internal.cache.control.MemoryEvent; import org.apache.geode.internal.cache.ha.ThreadIdentifier; @@ -280,12 +280,6 @@ public class BucketRegion extends DistributedRegion implements Bucket { } @Override - public void createEventTracker() { - this.eventTracker = new EventTracker(this); - this.eventTracker.start(); - } - - @Override public void registerCreateRegionReplyProcessor(CreateRegionReplyProcessor processor) { this.createRegionReplyProcessor = processor; } @@ -293,7 +287,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { @Override protected void recordEventStateFromImageProvider(InternalDistributedMember provider) { if (this.createRegionReplyProcessor != null) { - Map<ThreadIdentifier, EventSeqnoHolder> providerEventStates = + Map<ThreadIdentifier, EventSequenceNumberHolder> providerEventStates = this.createRegionReplyProcessor.getEventState(provider); if (providerEventStates != null) { recordEventState(provider, providerEventStates); @@ -1632,7 +1626,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { if (this.isInitialized()) { boolean callThem = callDispatchListenerEvent; if (event.isPossibleDuplicate() - && this.eventTracker.isInitialImageProvider(event.getDistributedMember())) { + && getEventTracker().isInitialImageProvider(event.getDistributedMember())) { callThem = false; } super.invokeTXCallbacks(eventType, event, callThem); @@ -1664,7 +1658,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { if (this.isInitialized()) { boolean callThem = callDispatchListenerEvent; if (event.isPossibleDuplicate() - && this.eventTracker.isInitialImageProvider(event.getDistributedMember())) { + && this.getEventTracker().isInitialImageProvider(event.getDistributedMember())) { callThem = false; } super.invokeDestroyCallbacks(eventType, event, callThem, notifyGateways); @@ -1695,7 +1689,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { if (this.isInitialized()) { boolean callThem = callDispatchListenerEvent; if (event.isPossibleDuplicate() - && this.eventTracker.isInitialImageProvider(event.getDistributedMember())) { + && this.getEventTracker().isInitialImageProvider(event.getDistributedMember())) { callThem = false; } super.invokeInvalidateCallbacks(eventType, event, callThem); @@ -1729,7 +1723,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { if (this.isInitialized()) { boolean callThem = callDispatchListenerEvent; if (callThem && event.isPossibleDuplicate() - && this.eventTracker.isInitialImageProvider(event.getDistributedMember())) { + && this.getEventTracker().isInitialImageProvider(event.getDistributedMember())) { callThem = false; } super.invokePutCallbacks(eventType, event, callThem, notifyGateways); @@ -2439,5 +2433,23 @@ public class BucketRegion extends DistributedRegion implements Bucket { return getPartitionedRegion().notifiesMultipleSerialGateways(); } + @Override + public boolean hasSeenEvent(EntryEventImpl event) { + ensureEventTrackerInitialization(); + return super.hasSeenEvent(event); + } + + // bug 41289 - wait for event tracker to be initialized before checkin + // so that an operation intended for a previous version of a bucket + // is not prematurely applied to a new version of the bucket + private void ensureEventTrackerInitialization() { + try { + getEventTracker().waitOnInitialization(); + } catch (InterruptedException ie) { + stopper.checkCancelInProgress(ie); + Thread.currentThread().interrupt(); + } + } + } http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java index 1e38065..ee4e2df 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java @@ -49,7 +49,7 @@ import org.apache.geode.internal.Assert; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; import org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice; -import org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder; +import org.apache.geode.internal.cache.event.EventSequenceNumberHolder; import org.apache.geode.internal.cache.ha.ThreadIdentifier; import org.apache.geode.internal.cache.partitioned.Bucket; import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException; @@ -91,10 +91,8 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor { logger.debug("CreateRegionProcessor.initializeRegion, no recipients, msg not sent"); } this.newRegion.getDistributionAdvisor().setInitialized(); - EventTracker tracker = ((LocalRegion) this.newRegion).getEventTracker(); - if (tracker != null) { - tracker.setInitialized(); - } + + ((LocalRegion) this.newRegion).getEventTracker().setInitialized(); return; } @@ -138,13 +136,11 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor { } } finally { replyProc.cleanup(); - EventTracker tracker = ((LocalRegion) this.newRegion).getEventTracker(); - if (tracker != null) { - tracker.setInitialized(); - } + ((LocalRegion) this.newRegion).getEventTracker().setInitialized(); if (((LocalRegion) this.newRegion).isUsedForPartitionedRegionBucket()) { if (logger.isDebugEnabled()) { - logger.debug("initialized bucket event tracker: {}", tracker); + logger.debug("initialized bucket event tracker: {}", + ((LocalRegion) this.newRegion).getEventTracker()); } } } @@ -203,12 +199,12 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor { .getDistributedSystem(), members); } - private final Map<DistributedMember, Map<ThreadIdentifier, EventSeqnoHolder>> remoteEventStates = + private final Map<DistributedMember, Map<ThreadIdentifier, EventSequenceNumberHolder>> remoteEventStates = new ConcurrentHashMap<>(); private boolean allMembersSkippedChecks = true; - public Map<ThreadIdentifier, EventSeqnoHolder> getEventState( + public Map<ThreadIdentifier, EventSequenceNumberHolder> getEventState( InternalDistributedMember provider) { return this.remoteEventStates.get(provider); } @@ -254,7 +250,7 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor { // Save all event states, need to initiate the event tracker from the GII provider if (reply.eventState != null) { remoteEventStates.put(reply.getSender(), - (Map<ThreadIdentifier, EventSeqnoHolder>) reply.eventState); + (Map<ThreadIdentifier, EventSequenceNumberHolder>) reply.eventState); } if (lr.isUsedForPartitionedRegionBucket()) { http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index eb18134..d9ed4ed 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -90,6 +90,8 @@ import org.apache.geode.internal.cache.InitialImageOperation.GIIStatus; import org.apache.geode.internal.cache.RemoteFetchVersionMessage.FetchVersionResponse; import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType; import org.apache.geode.internal.cache.control.MemoryEvent; +import org.apache.geode.internal.cache.event.DistributedEventTracker; +import org.apache.geode.internal.cache.event.EventTracker; import org.apache.geode.internal.cache.execute.DistributedRegionFunctionExecutor; import org.apache.geode.internal.cache.execute.DistributedRegionFunctionResultSender; import org.apache.geode.internal.cache.execute.DistributedRegionFunctionResultWaiter; @@ -98,7 +100,6 @@ import org.apache.geode.internal.cache.execute.LocalResultCollector; import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl; import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender; import org.apache.geode.internal.cache.lru.LRUEntry; -import org.apache.geode.internal.cache.partitioned.Bucket; import org.apache.geode.internal.cache.persistence.CreatePersistentRegionProcessor; import org.apache.geode.internal.cache.persistence.PersistenceAdvisor; import org.apache.geode.internal.cache.persistence.PersistenceAdvisorImpl; @@ -255,9 +256,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } @Override - public void createEventTracker() { - this.eventTracker = new EventTracker(this); - this.eventTracker.start(); + protected EventTracker createEventTracker() { + EventTracker tracker = new DistributedEventTracker(cache, stopper, getName()); + tracker.start(); + return tracker; } /** @@ -493,6 +495,38 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA } } + @Override + public boolean hasSeenEvent(EntryEventImpl event) { + boolean isDuplicate = false; + + isDuplicate = getEventTracker().hasSeenEvent(event); + if (isDuplicate) { + markEventAsDuplicate(event); + } else { + // bug #48205 - a retried PR operation may already have a version assigned to it + // in another VM + if (event.isPossibleDuplicate() && event.getRegion().concurrencyChecksEnabled + && event.getVersionTag() == null && event.getEventId() != null) { + boolean isBulkOp = event.getOperation().isPutAll() || event.getOperation().isRemoveAll(); + VersionTag tag = + FindVersionTagOperation.findVersionTag(event.getRegion(), event.getEventId(), isBulkOp); + event.setVersionTag(tag); + } + } + return isDuplicate; + } + + private void markEventAsDuplicate(EntryEventImpl event) { + event.setPossibleDuplicate(true); + if (concurrencyChecksEnabled && event.getVersionTag() == null) { + if (event.isBulkOpInProgress()) { + event.setVersionTag(getEventTracker().findVersionTagForBulkOp(event.getEventId())); + } else { + event.setVersionTag(getEventTracker().findVersionTagForSequence(event.getEventId())); + } + } + } + void setGeneratedVersionTag(boolean generateVersionTag) { // there is at-least one other persistent member, so turn on concurrencyChecks enableConcurrencyChecks(); @@ -1038,9 +1072,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA // makes sure all latches are released if they haven't been already super.initialize(null, null, null); } finally { - if (this.eventTracker != null) { - this.eventTracker.setInitialized(); - } + getEventTracker().setInitialized(); } } http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/EventStateHelper.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventStateHelper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventStateHelper.java index 9bc5e3a..ec427bb 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventStateHelper.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventStateHelper.java @@ -19,7 +19,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.cache.CreateRegionProcessor.CreateRegionReplyMessage; -import org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder; +import org.apache.geode.internal.cache.event.EventSequenceNumberHolder; import org.apache.geode.internal.cache.InitialImageOperation.RegionStateMessage; import org.apache.geode.internal.cache.ha.HARegionQueue.DispatchedAndCurrentEvents; import org.apache.geode.internal.cache.ha.ThreadIdentifier; @@ -89,7 +89,7 @@ public class EventStateHelper { DispatchedAndCurrentEvents value = (DispatchedAndCurrentEvents) entry.getValue(); InternalDataSerializer.invokeToData(value, dop); } else { - EventSeqnoHolder value = (EventSeqnoHolder) entry.getValue(); + EventSequenceNumberHolder value = (EventSequenceNumberHolder) entry.getValue(); InternalDataSerializer.invokeToData(value, dop); } } @@ -130,11 +130,11 @@ public class EventStateHelper { InternalDataSerializer.invokeFromData(value, dip); eventState.put(key, value); } else { - EventSeqnoHolder value = new EventSeqnoHolder(); + EventSequenceNumberHolder value = new EventSequenceNumberHolder(); InternalDataSerializer.invokeFromData(value, dip); eventState.put(key, value); - if (value.versionTag != null) { - value.versionTag.replaceNullIDs(senderId); + if (value.getVersionTag() != null) { + value.getVersionTag().replaceNullIDs(senderId); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java deleted file mode 100644 index b919043..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java +++ /dev/null @@ -1,790 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.logging.log4j.Logger; - -import org.apache.geode.DataSerializable; -import org.apache.geode.DataSerializer; -import org.apache.geode.cache.client.PoolFactory; -import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.distributed.internal.DistributionConfig; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.Assert; -import org.apache.geode.internal.SystemTimer.SystemTimerTask; -import org.apache.geode.internal.cache.ha.ThreadIdentifier; -import org.apache.geode.internal.cache.versions.RegionVersionVector; -import org.apache.geode.internal.cache.versions.VersionTag; -import org.apache.geode.internal.logging.LogService; -import org.apache.geode.internal.logging.log4j.LogMarker; -import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch; - -/** - * EventTracker tracks the last sequence number for a particular memberID:threadID. It is used to - * avoid replaying events in client/server and partitioned-region configurations. - * - * @since GemFire 6.0 - */ -public class EventTracker { - private static final Logger logger = LogService.getLogger(); - - /** - * a mapping of originator to the last event applied to this cache - * - * Keys are instances of {@link ThreadIdentifier}, values are instances of - * {@link org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder}. - */ - protected final ConcurrentMap<ThreadIdentifier, EventSeqnoHolder> recordedEvents = - new ConcurrentHashMap<ThreadIdentifier, EventSeqnoHolder>(100); - - /** - * a mapping of originator to bulkOps - * - * Keys are instances of @link {@link ThreadIdentifier} - */ - private final ConcurrentMap<ThreadIdentifier, Object> recordedBulkOps = - new ConcurrentHashMap<ThreadIdentifier, Object>(100); - - /** - * a mapping of originator to bulkOperation's last version tags. This map differs from - * {@link #recordedBulkOps} in that the thread identifier used here is the base member id and - * thread id of the bulk op, as opposed to the fake thread id which is assigned for each bucket. - * - * recordedBulkOps are also only tracked on the secondary for partitioned regions - * recordedBulkOpVersionTags are tracked on both the primary and secondary. - * - * Keys are instances of @link {@link ThreadIdentifier}, values are instances of - * {@link BulkOpHolder}. - */ - private final ConcurrentMap<ThreadIdentifier, BulkOpHolder> recordedBulkOpVersionTags = - new ConcurrentHashMap<ThreadIdentifier, BulkOpHolder>(100); - - /** - * The member that the region corresponding to this tracker (if any) received its initial image - * from (if a replicate) - */ - private volatile InternalDistributedMember initialImageProvider; - - /** - * The cache associated with this tracker - */ - InternalCache cache; - - /** - * The name of this tracker - */ - String name; - - /** - * whether or not this tracker has been initialized to allow entry operation. replicate region - * does not initiate event tracker from its replicates. - */ - volatile boolean initialized; - - /** - * object used to wait for initialization - */ - final StoppableCountDownLatch initializationLatch; - - /** - * Initialize the EventTracker's timer task. This is stored in the cache for tracking and shutdown - * purposes - * - * @param cache the cache to schedule tasks with - */ - public static ExpiryTask startTrackerServices(InternalCache cache) { - long expiryTime = Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "messageTrackingTimeout", - PoolFactory.DEFAULT_SUBSCRIPTION_MESSAGE_TRACKING_TIMEOUT / 3); - ExpiryTask result = new ExpiryTask(cache, expiryTime); - cache.getCCPTimer().scheduleAtFixedRate(result, expiryTime, expiryTime); - // schedule(result, expiryTime); - return result; - } - - /** - * Terminate the tracker's timer task - * - * @param cache the cache holding the tracker task - */ - public static void stopTrackerServices(InternalCache cache) { - cache.getEventTrackerTask().cancel(); - } - - /** - * Create an event tracker - * - * @param region the cache region to associate with this tracker - */ - public EventTracker(LocalRegion region) { - this.cache = region.cache; - this.name = "Event Tracker for " + region.getName(); - this.initializationLatch = new StoppableCountDownLatch(region.getStopper(), 1); - } - - /** start this event tracker */ - public void start() { - if (this.cache.getEventTrackerTask() != null) { - this.cache.getEventTrackerTask().addTracker(this); - } - } - - /** stop this event tracker */ - public void stop() { - if (this.cache.getEventTrackerTask() != null) { - this.cache.getEventTrackerTask().removeTracker(this); - } - } - - /** - * retrieve a deep copy of the state of the event tracker. Synchronization is not used while - * copying the tracker's state. - */ - public Map<ThreadIdentifier, EventSeqnoHolder> getState() { - Map<ThreadIdentifier, EventSeqnoHolder> result = - new HashMap<ThreadIdentifier, EventSeqnoHolder>(recordedEvents.size()); - for (Iterator<Map.Entry<ThreadIdentifier, EventSeqnoHolder>> it = - recordedEvents.entrySet().iterator(); it.hasNext();) { - Map.Entry<ThreadIdentifier, EventSeqnoHolder> entry = it.next(); - EventSeqnoHolder holder = entry.getValue(); - result.put(entry.getKey(), new EventSeqnoHolder(holder.lastSeqno, null)); // don't transfer - // version tags - - // adds too much - // bulk just so we - // can do client tag - // recovery - } - return result; - } - - /** - * record the given state in the tracker. - * - * @param provider the member that provided this state - * @param state a Map obtained from getState(); - */ - public void recordState(InternalDistributedMember provider, - Map<ThreadIdentifier, EventSeqnoHolder> state) { - this.initialImageProvider = provider; - StringBuffer sb = null; - if (logger.isDebugEnabled()) { - sb = new StringBuffer(200); - sb.append("Recording initial state for ").append(this.name).append(": "); - } - for (Iterator<Map.Entry<ThreadIdentifier, EventSeqnoHolder>> it = - state.entrySet().iterator(); it.hasNext();) { - Map.Entry<ThreadIdentifier, EventSeqnoHolder> entry = it.next(); - if (sb != null) { - sb.append("\n ").append(entry.getKey().expensiveToString()).append("; sequenceID=") - .append(entry.getValue()); - } - // record only if we haven't received an event that is newer - recordSeqno(entry.getKey(), entry.getValue(), true); - } - if (sb != null) { - logger.debug(sb); - } - // fix for bug 41622 - hang in GII. This keeps ops from waiting for the - // full GII to complete - setInitialized(); - } - - /** - * Use this method to ensure that the tracker is put in an initialized state - */ - public void setInitialized() { - this.initializationLatch.countDown(); - this.initialized = true; - } - - /** - * Wait for the tracker to finishe being initialized - */ - public void waitOnInitialization() throws InterruptedException { - this.initializationLatch.await(); - } - - /** - * Record an event sequence id if it is higher than what we currently have. This is intended for - * use during initial image transfer. - * - * @param membershipID the key of an entry in the map obtained from getEventState() - * @param evhObj the value of an entry in the map obtained from getEventState() - */ - protected void recordSeqno(ThreadIdentifier membershipID, EventSeqnoHolder evhObj) { - recordSeqno(membershipID, evhObj, false); - } - - /** - * Record an event sequence id if it is higher than what we currently have. This is intended for - * use during initial image transfer. - * - * @param threadID the key of an entry in the map obtained from getEventState() - * @param evh the value of an entry in the map obtained from getEventState() - * @param ifAbsent only record this state if there's not already an entry for this memberID - */ - private void recordSeqno(ThreadIdentifier threadID, EventSeqnoHolder evh, boolean ifAbsent) { - boolean removed; - if (logger.isDebugEnabled()) { - logger.debug("recording {} {}", threadID.expensiveToString(), evh.toString()); - } - do { - removed = false; - EventSeqnoHolder oldEvh = recordedEvents.putIfAbsent(threadID, evh); - if (oldEvh != null) { - synchronized (oldEvh) { - if (oldEvh.removed) { - // need to wait for an entry being removed by the sweeper to go away - removed = true; - continue; - } else { - if (ifAbsent) { - break; - } - oldEvh.endOfLifeTimer = 0; - if (oldEvh.lastSeqno < evh.lastSeqno) { - oldEvh.lastSeqno = evh.lastSeqno; - oldEvh.versionTag = evh.versionTag; - // Exception e = oldEvh.context; - // oldEvh.context = new Exception("stack trace"); - // oldEvh.context.initCause(e); - } - } - } - } else { - evh.endOfLifeTimer = 0; - // evh.context = new Exception("stack trace"); - } - } while (removed); - } - - /** record the event's threadid/sequenceid to prevent replay */ - public void recordEvent(InternalCacheEvent event) { - EventID eventID = event.getEventId(); - if (ignoreEvent(event, eventID)) { - return; // not tracked - } - - LocalRegion lr = (LocalRegion) event.getRegion(); - - ThreadIdentifier membershipID = - new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID()); - - VersionTag tag = null; - if (lr.getServerProxy() == null/* && event.hasClientOrigin() */) { // clients do not need to - // store version tags for - // replayed events - tag = event.getVersionTag(); - RegionVersionVector v = ((LocalRegion) event.getRegion()).getVersionVector(); - // bug #46453 - make sure ID references are canonical before storing - if (v != null && tag != null) { - tag.setMemberID(v.getCanonicalId(tag.getMemberID())); - if (tag.getPreviousMemberID() != null) { - tag.setPreviousMemberID(v.getCanonicalId(tag.getPreviousMemberID())); - } - } - } - - EventSeqnoHolder newEvh = new EventSeqnoHolder(eventID.getSequenceID(), tag); - if (logger.isTraceEnabled()) { - logger.trace("region event tracker recording {}", event); - } - recordSeqno(membershipID, newEvh); - - // If this is a bulkOp, and concurrency checks are enabled, we need to - // save the version tag in case we retry. - // Make recordBulkOp version tag after recordSeqno, so that recordBulkOpStart - // in a retry bulk op would not incorrectly remove the saved version tag in - // recordedBulkOpVersionTags - if (lr.getConcurrencyChecksEnabled() - && (event.getOperation().isPutAll() || event.getOperation().isRemoveAll()) - && lr.getServerProxy() == null) { - recordBulkOpEvent(event, membershipID); - } - } - - /** - * Record a version tag for a bulk operation - */ - private void recordBulkOpEvent(InternalCacheEvent event, ThreadIdentifier tid) { - EventID eventID = event.getEventId(); - - VersionTag tag = event.getVersionTag(); - if (tag == null) { - return; - } - - if (logger.isDebugEnabled()) { - logger.debug("recording bulkOp event {} {} {} op={}", tid.expensiveToString(), eventID, tag, - event.getOperation()); - } - - RegionVersionVector v = ((LocalRegion) event.getRegion()).getVersionVector(); - // bug #46453 - make sure ID references are canonical before storing - if (v != null) { - tag.setMemberID(v.getCanonicalId(tag.getMemberID())); - if (tag.getPreviousMemberID() != null) { - tag.setPreviousMemberID(v.getCanonicalId(tag.getPreviousMemberID())); - } - } - - // Loop until we can successfully update the recorded bulk operations - // For this thread id. - boolean retry = false; - do { - BulkOpHolder bulkOpTracker = recordedBulkOpVersionTags.get(tid); - if (bulkOpTracker == null) { - bulkOpTracker = new BulkOpHolder(); - BulkOpHolder old = recordedBulkOpVersionTags.putIfAbsent(tid, bulkOpTracker); - if (old != null) { - retry = true; - continue; - } - } - synchronized (bulkOpTracker) { - if (bulkOpTracker.removed) { - retry = true; - continue; - } - - // Add the version tag for bulkOp event. - bulkOpTracker.putVersionTag(eventID, event.getVersionTag()); - retry = false; - } - } while (retry); - } - - public boolean hasSeenEvent(InternalCacheEvent event) { - // ClientProxyMembershipID membershipID = event.getContext(); - EventID eventID = event.getEventId(); - if (ignoreEvent(event, eventID)) { - return false; // not tracked - } - return hasSeenEvent(eventID, event); - } - - public boolean hasSeenEvent(EventID eventID) { - return hasSeenEvent(eventID, null); - } - - public boolean hasSeenEvent(EventID eventID, InternalCacheEvent tagHolder) { - ThreadIdentifier membershipID = - new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID()); - // if (membershipID == null || eventID == null) { - // return false; - // } - - EventSeqnoHolder evh = recordedEvents.get(membershipID); - if (evh == null) { - return false; - } - - synchronized (evh) { - if (evh.removed || evh.lastSeqno < eventID.getSequenceID()) { - return false; - } - // log at fine because partitioned regions can send event multiple times - // during normal operation during bucket region initialization - if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER)) { - logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER, - "Cache encountered replay of event with ID {}. Highest recorded for this source is {}", - eventID, evh.lastSeqno); - } - // bug #44956 - recover version tag for duplicate event - if (evh.lastSeqno == eventID.getSequenceID() && tagHolder != null && evh.versionTag != null) { - ((EntryEventImpl) tagHolder).setVersionTag(evh.versionTag); - } - return true; - } // synchronized - } - - public VersionTag findVersionTag(EventID eventID) { - ThreadIdentifier threadID = - new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID()); - - EventSeqnoHolder evh = recordedEvents.get(threadID); - if (evh == null) { - if (logger.isDebugEnabled()) { - logger.debug("search for version tag failed as no event is recorded for {}", - threadID.expensiveToString()); - } - return null; - } - - synchronized (evh) { - if (logger.isDebugEnabled()) { - logger.debug("search for version tag located last event for {}: {}", - threadID.expensiveToString(), evh); - } - if (evh.lastSeqno != eventID.getSequenceID()) { - return null; - } - // log at fine because partitioned regions can send event multiple times - // during normal operation during bucket region initialization - if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER) && evh.versionTag == null) { - logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER, - "Could not recover version tag. Found event holder with no version tag for {}", - eventID); - } - return evh.versionTag; - } // synchronized - } - - public VersionTag findVersionTagForGateway(EventID eventID) { - ThreadIdentifier threadID = - new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID()); - - EventSeqnoHolder evh = recordedEvents.get(threadID); - if (evh == null) { - if (logger.isDebugEnabled()) { - logger.debug("search for version tag failed as no event is recorded for {}", - threadID.expensiveToString()); - } - return null; - } - - synchronized (evh) { - if (logger.isDebugEnabled()) { - logger.debug("search for version tag located last event for {}: {} {}", - threadID.expensiveToString(), evh, eventID.getSequenceID()); - } - - if (evh.lastSeqno < eventID.getSequenceID()) { - return null; - } - // log at fine because partitioned regions can send event multiple times - // during normal operation during bucket region initialization - if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER) && evh.versionTag == null) { - logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER, - "Could not recover version tag. Found event holder with no version tag for {}", - eventID); - } - return evh.versionTag; - } // synchronized - } - - - public VersionTag findVersionTagForBulkOp(EventID eventID) { - ThreadIdentifier threadID = - new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID()); - - BulkOpHolder evh = recordedBulkOpVersionTags.get(threadID); - if (evh == null) { - if (logger.isDebugEnabled()) { - logger.debug("search for version tag failed as no events are recorded for {}", - threadID.expensiveToString()); - } - return null; - } - - synchronized (evh) { - if (logger.isDebugEnabled()) { - logger.debug("search for version tag located event holder for {}: {}", - threadID.expensiveToString(), evh); - } - return evh.entryVersionTags.get(eventID); - } // synchronized - } - - /** - * @return true if the event should not be tracked, false otherwise - */ - private boolean ignoreEvent(InternalCacheEvent event, EventID eventID) { - if (eventID == null) { - return true; - } else { - boolean isVersioned = (event.getVersionTag() != null); - boolean isClient = event.hasClientOrigin(); - if (isVersioned && isClient) { - return false; // version tags for client events are kept for retries by the client - } - boolean isEntry = event.getOperation().isEntry(); - boolean isPr = event.getRegion().getAttributes().getDataPolicy().withPartitioning() - || ((LocalRegion) event.getRegion()).isUsedForPartitionedRegionBucket(); - return (!isClient && // ignore if it originated on a server, and - isEntry && // it affects an entry and - !isPr); // is not on a PR - } - } - - /** - * A routine to provide synchronization running based on <memberShipID, threadID> of the - * requesting client - * - * @param r - a Runnable to wrap the processing of the bulk op - * @param eventID - the base event ID of the bulk op - * - * @since GemFire 5.7 - */ - public void syncBulkOp(Runnable r, EventID eventID) { - Assert.assertTrue(eventID != null); - ThreadIdentifier membershipID = - new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID()); - - Object opSyncObj = null; - do { - opSyncObj = recordedBulkOps.putIfAbsent(membershipID, new Object()); - if (opSyncObj == null) { - opSyncObj = recordedBulkOps.get(membershipID); - } - } while (opSyncObj == null); - - synchronized (opSyncObj) { - try { - recordBulkOpStart(membershipID, eventID); - // Perform the bulk op - r.run(); - } finally { - recordedBulkOps.remove(membershipID); - } - } - } - - /** - * Called when a new bulkOp is started on the local region. Used to clear event tracker state from - * the last bulkOp. - */ - public void recordBulkOpStart(ThreadIdentifier tid, EventID eventID) { - if (logger.isDebugEnabled()) { - logger.debug("recording bulkOp start for {}", tid.expensiveToString()); - } - EventSeqnoHolder evh = recordedEvents.get(tid); - if (evh == null) { - return; - } - synchronized (evh) { - // only remove it when a new bulk op occurs - if (eventID.getSequenceID() > evh.lastSeqno) { - this.recordedBulkOpVersionTags.remove(tid); - } - } - } - - /** - * @return the initialized - */ - public boolean isInitialized() { - return this.initialized; - } - - /** - * @param mbr the member in question - * @return true if the given member provided the initial image event state for this tracker - */ - public boolean isInitialImageProvider(DistributedMember mbr) { - return (this.initialImageProvider != null) && (mbr != null) - && this.initialImageProvider.equals(mbr); - } - - /** - * Test method for getting the set of recorded version tags. - */ - protected ConcurrentMap<ThreadIdentifier, BulkOpHolder> getRecordedBulkOpVersionTags() { - return recordedBulkOpVersionTags; - } - - @Override - public String toString() { - return "" + this.name + "(initialized=" + this.initialized + ")"; - } - - /** - * A sequence number tracker to keep events from clients from being re-applied to the cache if - * they've already been seen. - * - * @since GemFire 5.5 - */ - static class EventSeqnoHolder implements DataSerializable { - private static final long serialVersionUID = 8137262960763308046L; - - /** event sequence number. These */ - long lastSeqno = -1; - - /** millisecond timestamp */ - transient long endOfLifeTimer; - - /** whether this entry is being removed */ - transient boolean removed; - - /** - * version tag, if any, for the operation - */ - VersionTag versionTag; - - // for debugging - // transient Exception context; - - EventSeqnoHolder(long id, VersionTag versionTag) { - this.lastSeqno = id; - this.versionTag = versionTag; - } - - public EventSeqnoHolder() {} - - @Override - public String toString() { - StringBuilder result = new StringBuilder(); - result.append("seqNo").append(this.lastSeqno); - if (this.versionTag != null) { - result.append(",").append(this.versionTag); - } - return result.toString(); - } - - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - lastSeqno = in.readLong(); - versionTag = (VersionTag) DataSerializer.readObject(in); - } - - public void toData(DataOutput out) throws IOException { - out.writeLong(lastSeqno); - DataSerializer.writeObject(versionTag, out); - } - } - - /** - * A holder for the version tags generated for a bulk operation (putAll or removeAll). These - * version tags are retrieved when a bulk op is retried. - * - * @since GemFire 7.0 protected for test purposes only. - */ - protected static class BulkOpHolder { - /** - * Whether this object was removed by the cleanup thread. - */ - public boolean removed; - - /** - * public for tests only - */ - public Map<EventID, VersionTag> entryVersionTags = new HashMap<EventID, VersionTag>(); - - /** millisecond timestamp */ - transient long endOfLifeTimer; - - /** - * creates a new instance to save status of a putAllOperation - */ - BulkOpHolder() { - // do nothing - } - - public void putVersionTag(EventID eventId, VersionTag versionTag) { - entryVersionTags.put(eventId, versionTag); - this.endOfLifeTimer = 0; - } - - - @Override - public String toString() { - return "BulkOpHolder tags=" + this.entryVersionTags; - } - } - - public static class ExpiryTask extends SystemTimerTask { - - InternalCache cache; - long expiryTime; - List trackers = new LinkedList(); - - public ExpiryTask(InternalCache cache, long expiryTime) { - this.cache = cache; - this.expiryTime = expiryTime; - } - - void addTracker(EventTracker tracker) { - synchronized (trackers) { - trackers.add(tracker); - } - } - - void removeTracker(EventTracker tracker) { - synchronized (trackers) { - trackers.remove(tracker); - } - } - - int getNumberOfTrackers() { - return trackers.size(); - } - - @Override - public void run2() { - long now = System.currentTimeMillis(); - long timeout = now - expiryTime; - final boolean traceEnabled = logger.isTraceEnabled(); - synchronized (trackers) { - for (Iterator it = trackers.iterator(); it.hasNext();) { - EventTracker tracker = (EventTracker) it.next(); - if (traceEnabled) { - logger.trace("{} sweeper: starting", tracker.name); - } - for (Iterator it2 = tracker.recordedEvents.entrySet().iterator(); it2.hasNext();) { - Map.Entry e = (Map.Entry) it2.next(); - EventSeqnoHolder evh = (EventSeqnoHolder) e.getValue(); - synchronized (evh) { - if (evh.endOfLifeTimer == 0) { - evh.endOfLifeTimer = now; // a new holder - start the timer - } - if (evh.endOfLifeTimer <= timeout) { - evh.removed = true; - evh.lastSeqno = -1; - if (traceEnabled) { - logger.trace("{} sweeper: removing {}", tracker.name, e.getKey()); - } - it2.remove(); - } - } - } - - // Remove bulk operations we're tracking - for (Iterator<Map.Entry<ThreadIdentifier, BulkOpHolder>> it2 = - tracker.recordedBulkOpVersionTags.entrySet().iterator(); it2.hasNext();) { - Map.Entry<ThreadIdentifier, BulkOpHolder> e = it2.next(); - BulkOpHolder evh = e.getValue(); - synchronized (evh) { - if (evh.endOfLifeTimer == 0) { - evh.endOfLifeTimer = now; // a new holder - start the timer - } - // Remove the PutAll tracker only if the put all is complete - // and it has expired. - if (evh.endOfLifeTimer <= timeout) { - evh.removed = true; - if (logger.isTraceEnabled()) { - logger.trace("{} sweeper: removing bulkOp {}", tracker.name, e.getKey()); - } - it2.remove(); - } - } - } - if (traceEnabled) { - logger.trace("{} sweeper: done", tracker.name); - } - } - } - } - - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java index 199aafc..6434667 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java @@ -132,7 +132,7 @@ public class FindVersionTagOperation { result = r.findVersionTagForClientBulkOp(eventId); } else { - result = r.findVersionTagForClientEvent(eventId); + result = r.findVersionTagForEvent(eventId); } if (result != null) { result.replaceNullIDs(r.getVersionMember()); http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index 2dda38c..de5fd88 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -77,6 +77,9 @@ import javax.transaction.TransactionManager; import com.sun.jna.Native; import com.sun.jna.Platform; import org.apache.commons.lang.StringUtils; + +import org.apache.geode.internal.cache.event.EventTracker; +import org.apache.geode.internal.cache.event.EventTrackerExpiryTask; import org.apache.geode.internal.security.SecurityServiceFactory; import org.apache.logging.log4j.Logger; @@ -474,7 +477,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has /** * a system timer task for cleaning up old bridge thread event entries */ - private final EventTracker.ExpiryTask recordedEventSweeper; + private final EventTrackerExpiryTask recordedEventSweeper; private final TombstoneService tombstoneService; @@ -898,7 +901,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has getOffHeapEvictor()); } - this.recordedEventSweeper = EventTracker.startTrackerServices(this); + this.recordedEventSweeper = createEventTrackerExpiryTask(); this.tombstoneService = TombstoneService.initialize(this); TypeRegistry.init(); @@ -941,6 +944,18 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has } // synchronized } + /** + * Initialize the EventTracker's timer task. This is stored for tracking and shutdown purposes + */ + private EventTrackerExpiryTask createEventTrackerExpiryTask() { + long lifetimeInMillis = + Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "messageTrackingTimeout", + PoolFactory.DEFAULT_SUBSCRIPTION_MESSAGE_TRACKING_TIMEOUT / 3); + EventTrackerExpiryTask task = new EventTrackerExpiryTask(lifetimeInMillis); + getCCPTimer().scheduleAtFixedRate(task, lifetimeInMillis, lifetimeInMillis); + return task; + } + @Override public SecurityService getSecurityService() { return this.securityService; @@ -2347,8 +2362,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has this.cachePerfStats.close(); TXLockService.destroyServices(); - - EventTracker.stopTrackerServices(this); + getEventTrackerTask().cancel(); synchronized (this.ccpTimerMutex) { if (this.ccpTimer != null) { @@ -2744,7 +2758,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has * @return the sweeper task */ @Override - public EventTracker.ExpiryTask getEventTrackerTask() { + public EventTrackerExpiryTask getEventTrackerTask() { return this.recordedEventSweeper; } http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java index 4cf8f41..baddff8 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java @@ -39,6 +39,8 @@ import org.apache.geode.cache.TimeoutException; import org.apache.geode.distributed.internal.DistributionAdvisor.Profile; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.Assert; +import org.apache.geode.internal.cache.event.EventTracker; +import org.apache.geode.internal.cache.event.NonDistributedEventTracker; import org.apache.geode.internal.cache.ha.HARegionQueue; import org.apache.geode.internal.cache.ha.ThreadIdentifier; import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; @@ -112,8 +114,9 @@ public class HARegion extends DistributedRegion { boolean ifOld, Object expectedOldValue, boolean requireOldValue) {} @Override - public void createEventTracker() { + public EventTracker createEventTracker() { // event trackers aren't needed for HARegions + return NonDistributedEventTracker.getInstance(); } /** http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java index 4c229db..aed439c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java @@ -53,6 +53,7 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.cache.control.InternalResourceManager; import org.apache.geode.internal.cache.control.ResourceAdvisor; +import org.apache.geode.internal.cache.event.EventTrackerExpiryTask; import org.apache.geode.internal.cache.extension.Extensible; import org.apache.geode.internal.cache.persistence.BackupManager; import org.apache.geode.internal.cache.persistence.PersistentMemberManager; @@ -237,7 +238,7 @@ public interface InternalCache extends Cache, Extensible<Cache>, CacheTime { TXEntryStateFactory getTXEntryStateFactory(); - EventTracker.ExpiryTask getEventTrackerTask(); + EventTrackerExpiryTask getEventTrackerTask(); void removeDiskStore(DiskStoreImpl diskStore); http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 3b3047f..6bee770 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -131,6 +131,8 @@ import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceT import org.apache.geode.internal.cache.control.MemoryEvent; import org.apache.geode.internal.cache.control.MemoryThresholds; import org.apache.geode.internal.cache.control.ResourceListener; +import org.apache.geode.internal.cache.event.EventTracker; +import org.apache.geode.internal.cache.event.NonDistributedEventTracker; import org.apache.geode.internal.cache.execute.DistributedRegionFunctionExecutor; import org.apache.geode.internal.cache.execute.DistributedRegionFunctionResultSender; import org.apache.geode.internal.cache.execute.LocalResultCollector; @@ -326,9 +328,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, */ private final boolean supportsTX; - /** tracks threadID->seqno information for this region */ - EventTracker eventTracker; - /** * tracks region-level version information for members */ @@ -440,6 +439,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, private final ImageState imageState; + private final EventTracker eventTracker; + /** * Register interest count to track if any register interest is in progress for this region. This * count will be incremented when register interest starts and decremented when register interest @@ -646,13 +647,16 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, getDataPolicy().withReplication() || getDataPolicy().isPreloaded(), getAttributes().getDataPolicy().withPersistence(), this.stopper); - createEventTracker(); - // prevent internal regions from participating in a TX, bug 38709 this.supportsTX = !isSecret() && !isUsedForPartitionedRegionAdmin() && !isUsedForMetaRegion() || isMetaRegionWithTransactions(); this.testCallable = internalRegionArgs.getTestCallable(); + eventTracker = createEventTracker(); + } + + protected EventTracker createEventTracker() { + return NonDistributedEventTracker.getInstance(); } private RegionMap createRegionMap(InternalRegionArguments internalRegionArgs) { @@ -676,27 +680,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } /** - * initialize the event tracker. Not all region implementations want or need one of these. Regions - * that require one should reimplement this method and create one like so: - * - * <pre> - * {@code - * this.eventTracker = new EventTracker(this.cache); - * this.eventTracker.start(); - * } - * </pre> + * Other region classes may track events using different mechanisms than EventTrackers or may not + * track events at all */ - void createEventTracker() { - // if LocalRegion is changed to have an event tracker, then the initialize() - // method should be changed to set it to "initialized" state when the - // region finishes initialization - } - - /** - * Other region classes may track events using different mechanisms than EventTrackers - */ - EventTracker getEventTracker() { - return this.eventTracker; + public EventTracker getEventTracker() { + return eventTracker; } /** returns the regions version-vector */ @@ -2559,9 +2547,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } } - if (this.eventTracker != null) { - this.eventTracker.stop(); - } + getEventTracker().stop(); if (logger.isTraceEnabled(LogMarker.RVV) && getVersionVector() != null) { logger.trace(LogMarker.RVV, "version vector for {} is {}", getName(), getVersionVector().fullToString()); @@ -2666,9 +2652,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } this.cache.setRegionByPath(getFullPath(), null); - if (this.eventTracker != null) { - this.eventTracker.stop(); - } + getEventTracker().stop(); if (this.diskRegion != null) { this.diskRegion.prepareForClose(this); @@ -5923,11 +5907,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * is installed in the receiver of the image. */ public Map<? extends DataSerializable, ? extends DataSerializable> getEventState() { - if (this.eventTracker != null) { - return this.eventTracker.getState(); - } else { - return null; - } + return getEventTracker().getState(); } /** @@ -5939,9 +5919,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * @param state a Map obtained from getEventState() */ protected void recordEventState(InternalDistributedMember provider, Map state) { - if (this.eventTracker != null) { - this.eventTracker.recordState(provider, state); - } + getEventTracker().recordState(provider, state); } /** @@ -5965,9 +5943,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * record the event's sequenceId in Region's event state to prevent replay. */ public void recordEvent(InternalCacheEvent event) { - if (this.eventTracker != null) { - this.eventTracker.recordEvent(event); - } + getEventTracker().recordEvent(event); } /** @@ -5976,64 +5952,16 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * @return true if the Region's event state has seen the event */ public boolean hasSeenEvent(EntryEventImpl event) { - boolean isDuplicate = false; - - if (this.eventTracker != null) { - // bug 41289 - wait for event tracker to be initialized before checkin - // so that an operation intended for a previous version of a bucket - // is not prematurely applied to a new version of the bucket - if (this.isUsedForPartitionedRegionBucket()) { - try { - this.eventTracker.waitOnInitialization(); - } catch (InterruptedException ie) { - this.stopper.checkCancelInProgress(ie); - Thread.currentThread().interrupt(); - } - } - - isDuplicate = this.eventTracker.hasSeenEvent(event); - if (isDuplicate) { - event.setPossibleDuplicate(true); - if (getConcurrencyChecksEnabled() && event.getVersionTag() == null) { - if (event.isBulkOpInProgress()) { - event.setVersionTag(findVersionTagForClientBulkOp(event.getEventId())); - } else { - event.setVersionTag(findVersionTagForClientEvent(event.getEventId())); - } - } - } else { - // bug #48205 - a retried PR operation may already have a version assigned to it - // in another VM - if (event.isPossibleDuplicate() && event.getRegion().concurrencyChecksEnabled - && event.getVersionTag() == null && event.getEventId() != null) { - boolean isBulkOp = event.getOperation().isPutAll() || event.getOperation().isRemoveAll(); - VersionTag tag = FindVersionTagOperation.findVersionTag(event.getRegion(), - event.getEventId(), isBulkOp); - event.setVersionTag(tag); - } - } - } - - return isDuplicate; + return getEventTracker().hasSeenEvent(event); } /** - * tries to find the version tag for a replayed client event + * tries to find the version tag for a event * * @return the version tag, if known. Null if not */ - public VersionTag findVersionTagForClientEvent(EventID eventId) { - if (this.eventTracker != null) { - return this.eventTracker.findVersionTag(eventId); - } - return null; - } - - public VersionTag findVersionTagForGatewayEvent(EventID eventId) { - if (this.eventTracker != null) { - return this.eventTracker.findVersionTagForGateway(eventId); - } - return null; + public VersionTag findVersionTagForEvent(EventID eventId) { + return getEventTracker().findVersionTagForSequence(eventId); } /** @@ -6042,13 +5970,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * @return the version tag, if known. Null if not */ public VersionTag findVersionTagForClientBulkOp(EventID eventId) { - if (eventId == null) { - return null; - } - if (this.eventTracker != null) { - return this.eventTracker.findVersionTagForBulkOp(eventId); - } - return null; + return getEventTracker().findVersionTagForBulkOp(eventId); } /** @@ -6061,25 +5983,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * @return true if the Region's event state has seen the event */ public boolean hasSeenEvent(EventID eventID) { - if (eventID == null) { - return false; - } - boolean isDuplicate = false; - if (this.eventTracker != null) { - // bug 41289 - wait for event tracker to be initialized before checkin - // so that an operation intended for a previous version of a bucket - // is not prematurely applied to a new version of the bucket - if (this.isUsedForPartitionedRegionBucket()) { - try { - this.eventTracker.waitOnInitialization(); - } catch (InterruptedException ie) { - this.stopper.checkCancelInProgress(ie); - Thread.currentThread().interrupt(); - } - } - isDuplicate = this.eventTracker.hasSeenEvent(eventID, null); - } - return isDuplicate; + return getEventTracker().hasSeenEvent(eventID); } /** @@ -6092,16 +5996,12 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * @since GemFire 5.7 */ void syncBulkOp(Runnable task, EventID eventId) { - if (this.eventTracker != null && !isTX()) { - this.eventTracker.syncBulkOp(task, eventId); - } else { - task.run(); - } + getEventTracker().syncBulkOp(task, eventId, isTX()); } public void recordBulkOpStart(ThreadIdentifier membershipID, EventID eventID) { - if (this.eventTracker != null && !isTX()) { - this.eventTracker.recordBulkOpStart(membershipID, eventID); + if (!isTX()) { + getEventTracker().recordBulkOpStart(eventID, membershipID); } } @@ -7058,14 +6958,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * @return true if event state has been transfered to this region from another cache */ boolean isEventTrackerInitialized() { - return this.eventTracker != null && this.eventTracker.isInitialized(); - } - - /** - * @return true if this region has an event tracker - */ - boolean hasEventTracker() { - return this.eventTracker != null; + return getEventTracker().isInitialized(); } public void acquireDestroyLock() { @@ -7115,9 +7008,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, this.destroyedSubregionSerialNumbers = collectSubregionSerialNumbers(); try { - if (this.eventTracker != null) { - this.eventTracker.stop(); - } + getEventTracker().stop(); if (this.diskRegion != null) { // This was needed to fix bug 30937 http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index c3aec13..27b442d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -5448,16 +5448,11 @@ public class PartitionedRegion extends LocalRegion } @Override - void createEventTracker() { - // PR buckets maintain their own trackers. None is needed at this level - } - - @Override - public VersionTag findVersionTagForClientEvent(EventID eventId) { + public VersionTag findVersionTagForEvent(EventID eventId) { if (this.dataStore != null) { Set<Map.Entry<Integer, BucketRegion>> bucketMap = this.dataStore.getAllLocalBuckets(); for (Map.Entry<Integer, BucketRegion> entry : bucketMap) { - VersionTag result = entry.getValue().findVersionTagForClientEvent(eventId); + VersionTag result = entry.getValue().findVersionTagForEvent(eventId); if (result != null) { return result; } http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/event/BulkOperationHolder.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/event/BulkOperationHolder.java b/geode-core/src/main/java/org/apache/geode/internal/cache/event/BulkOperationHolder.java new file mode 100644 index 0000000..53c4bb5 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/event/BulkOperationHolder.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.event; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.versions.VersionTag; + +/** + * A holder for the version tags generated for a bulk operation (putAll or removeAll). These version + * tags are retrieved when a bulk op is retried. + * + * @since GemFire 7.0 protected for test purposes only. + */ +public class BulkOperationHolder { + /** + * Whether this object was removed by the cleanup thread. + */ + private boolean removed; + + /** + * public for tests only + */ + private Map<EventID, VersionTag> entryVersionTags = new HashMap<>(); + + /** millisecond timestamp */ + private transient long endOfLifeTimestamp; + + /** + * creates a new instance to save status of a putAllOperation + */ + BulkOperationHolder() { + // do nothing + } + + void putVersionTag(EventID eventId, VersionTag versionTag) { + entryVersionTags.put(eventId, versionTag); + this.endOfLifeTimestamp = 0; + } + + public Map<EventID, VersionTag> getEntryVersionTags() { + return entryVersionTags; + } + + @Override + public String toString() { + return "BulkOperationHolder tags=" + this.entryVersionTags; + } + + public synchronized boolean expire(long now, long expirationTime) { + if (endOfLifeTimestamp == 0) { + endOfLifeTimestamp = now; // a new holder - start the timer + } + boolean expired = false; + if (endOfLifeTimestamp <= expirationTime) { + removed = true; + expired = true; + } + return expired; + } + + public boolean isRemoved() { + return removed; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/event/DistributedEventTracker.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/event/DistributedEventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/event/DistributedEventTracker.java new file mode 100644 index 0000000..ee0e8ff --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/event/DistributedEventTracker.java @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.event; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.Assert; +import org.apache.geode.internal.cache.EntryEventImpl; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.InternalCacheEvent; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.ha.ThreadIdentifier; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.internal.cache.versions.VersionTag; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.internal.logging.log4j.LogMarker; +import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch; + + +public class DistributedEventTracker implements EventTracker { + private static final Logger logger = LogService.getLogger(); + + /** + * a mapping of originator to the last event applied to this cache + * + * Keys are instances of {@link ThreadIdentifier}, values are instances of + * {@link EventSequenceNumberHolder}. + */ + private final ConcurrentMap<ThreadIdentifier, EventSequenceNumberHolder> recordedEvents = + new ConcurrentHashMap<>(100); + + /** + * a mapping of originator to bulkOps + * + * Keys are instances of @link {@link ThreadIdentifier} + */ + private final ConcurrentMap<ThreadIdentifier, Object> recordedBulkOps = + new ConcurrentHashMap<>(100); + + /** + * a mapping of originator to bulkOperation's last version tags. This map differs from + * {@link #recordedBulkOps} in that the thread identifier used here is the base member id and + * thread id of the bulk op, as opposed to the fake thread id which is assigned for each bucket. + * + * recordedBulkOps are also only tracked on the secondary for partitioned regions + * recordedBulkOpVersionTags are tracked on both the primary and secondary. + * + * Keys are instances of @link {@link ThreadIdentifier}, values are instances of + * {@link BulkOperationHolder}. + */ + private final ConcurrentMap<ThreadIdentifier, BulkOperationHolder> recordedBulkOpVersionTags = + new ConcurrentHashMap<>(100); + + /** + * The member that the region corresponding to this tracker (if any) received its initial image + * from (if a replicate) + */ + private volatile InternalDistributedMember initialImageProvider; + + /** + * The cache associated with this tracker + */ + private InternalCache cache; + + /** + * The name of this tracker + */ + private String name; + + /** + * whether or not this tracker has been initialized to allow entry operation. replicate region + * does not initiate event tracker from its replicates. + */ + private volatile boolean initialized; + + /** + * object used to wait for initialization + */ + private final StoppableCountDownLatch initializationLatch; + + /** + * Create an event tracker + * + * @param cache the cache of the region to associate with this tracker + * @param stopper the CancelCriterion for the region + * @param regionName name of the region + */ + public DistributedEventTracker(InternalCache cache, CancelCriterion stopper, String regionName) { + + this.cache = cache; + this.name = "Event Tracker for " + regionName; + this.initializationLatch = new StoppableCountDownLatch(stopper, 1); + } + + @Override + public void start() { + if (cache.getEventTrackerTask() != null) { + cache.getEventTrackerTask().addTracker(this); + } + } + + @Override + public void stop() { + if (cache.getEventTrackerTask() != null) { + cache.getEventTrackerTask().removeTracker(this); + } + } + + @Override + public Map<ThreadIdentifier, EventSequenceNumberHolder> getState() { + Map<ThreadIdentifier, EventSequenceNumberHolder> result = new HashMap<>(recordedEvents.size()); + for (Map.Entry<ThreadIdentifier, EventSequenceNumberHolder> entry : recordedEvents.entrySet()) { + EventSequenceNumberHolder holder = entry.getValue(); + // don't transfer version tags - adds too much bulk just so we can do client tag recovery + result.put(entry.getKey(), + new EventSequenceNumberHolder(holder.getLastSequenceNumber(), null)); + } + return result; + } + + @Override + public void recordState(InternalDistributedMember provider, + Map<ThreadIdentifier, EventSequenceNumberHolder> state) { + this.initialImageProvider = provider; + StringBuffer sb = null; + if (logger.isDebugEnabled()) { + sb = new StringBuffer(200); + sb.append("Recording initial state for ").append(this.name).append(": "); + } + for (Map.Entry<ThreadIdentifier, EventSequenceNumberHolder> entry : state.entrySet()) { + if (sb != null) { + sb.append("\n ").append(entry.getKey().expensiveToString()).append("; sequenceID=") + .append(entry.getValue()); + } + // record only if we haven't received an event that is newer + recordSequenceNumber(entry.getKey(), entry.getValue(), true); + } + if (sb != null) { + logger.debug(sb); + } + // fix for bug 41622 - hang in GII. This keeps ops from waiting for the + // full GII to complete + setInitialized(); + } + + @Override + public void setInitialized() { + initializationLatch.countDown(); + initialized = true; + } + + @Override + public void waitOnInitialization() throws InterruptedException { + initializationLatch.await(); + } + + /** + * Record an event sequence id if it is higher than what we currently have. This is intended for + * use during initial image transfer. + * + * @param membershipID the key of an entry in the map obtained from getEventState() + * @param evhObj the value of an entry in the map obtained from getEventState() + */ + protected void recordSequenceNumber(ThreadIdentifier membershipID, + EventSequenceNumberHolder evhObj) { + recordSequenceNumber(membershipID, evhObj, false); + } + + /** + * Record an event sequence id if it is higher than what we currently have. This is intended for + * use during initial image transfer. + * + * @param threadID the key of an entry in the map obtained from getEventState() + * @param evh the value of an entry in the map obtained from getEventState() + * @param ifAbsent only record this state if there's not already an entry for this memberID + */ + private void recordSequenceNumber(ThreadIdentifier threadID, EventSequenceNumberHolder evh, + boolean ifAbsent) { + boolean removed; + if (logger.isDebugEnabled()) { + logger.debug("recording {} {}", threadID.expensiveToString(), evh.toString()); + } + do { + removed = false; + EventSequenceNumberHolder oldEvh = recordedEvents.putIfAbsent(threadID, evh); + if (oldEvh != null) { + synchronized (oldEvh) { + if (oldEvh.isRemoved()) { + // need to wait for an entry being removed by the sweeper to go away + removed = true; + continue; + } else { + if (ifAbsent) { + break; + } + oldEvh.setEndOfLifeTimestamp(0); + if (oldEvh.getLastSequenceNumber() < evh.getLastSequenceNumber()) { + oldEvh.setLastSequenceNumber(evh.getLastSequenceNumber()); + oldEvh.setVersionTag(evh.getVersionTag()); + } + } + } + } else { + evh.setEndOfLifeTimestamp(0); + } + } while (removed); + } + + @Override + public void recordEvent(InternalCacheEvent event) { + EventID eventID = event.getEventId(); + if (ignoreEvent(event, eventID)) { + return; // not tracked + } + + LocalRegion lr = (LocalRegion) event.getRegion(); + ThreadIdentifier membershipID = createThreadIDFromEvent(eventID); + + VersionTag tag = null; + if (lr.getServerProxy() == null) { + tag = event.getVersionTag(); + RegionVersionVector v = ((LocalRegion) event.getRegion()).getVersionVector(); + canonicalizeIDs(tag, v); + } + + EventSequenceNumberHolder newEvh = new EventSequenceNumberHolder(eventID.getSequenceID(), tag); + if (logger.isTraceEnabled()) { + logger.trace("region event tracker recording {}", event); + } + recordSequenceNumber(membershipID, newEvh); + + // If this is a bulkOp, and concurrency checks are enabled, we need to + // save the version tag in case we retry. + // Make recordBulkOp version tag after recordSequenceNumber, so that recordBulkOpStart + // in a retry bulk op would not incorrectly remove the saved version tag in + // recordedBulkOpVersionTags + if (lr.getConcurrencyChecksEnabled() + && (event.getOperation().isPutAll() || event.getOperation().isRemoveAll()) + && lr.getServerProxy() == null) { + recordBulkOpEvent(event, membershipID); + } + } + + private ThreadIdentifier createThreadIDFromEvent(EventID eventID) { + return new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID()); + } + + /** + * Record a version tag for a bulk operation. + */ + private void recordBulkOpEvent(InternalCacheEvent event, ThreadIdentifier threadID) { + EventID eventID = event.getEventId(); + + VersionTag tag = event.getVersionTag(); + if (tag == null) { + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("recording bulkOp event {} {} {} op={}", threadID.expensiveToString(), eventID, + tag, event.getOperation()); + } + + RegionVersionVector versionVector = ((LocalRegion) event.getRegion()).getVersionVector(); + canonicalizeIDs(tag, versionVector); + + // Loop until we can successfully update the recorded bulk operations + // For this thread id. + boolean retry = false; + do { + BulkOperationHolder bulkOpTracker = recordedBulkOpVersionTags.get(threadID); + if (bulkOpTracker == null) { + bulkOpTracker = new BulkOperationHolder(); + BulkOperationHolder old = recordedBulkOpVersionTags.putIfAbsent(threadID, bulkOpTracker); + if (old != null) { + retry = true; + continue; + } + } + synchronized (bulkOpTracker) { + if (bulkOpTracker.isRemoved()) { + retry = true; + continue; + } + + // Add the version tag for bulkOp event. + bulkOpTracker.putVersionTag(eventID, event.getVersionTag()); + retry = false; + } + } while (retry); + } + + private void canonicalizeIDs(VersionTag tag, RegionVersionVector versionVector) { + if (tag != null && versionVector != null) { + tag.setMemberID(versionVector.getCanonicalId(tag.getMemberID())); + if (tag.getPreviousMemberID() != null) { + tag.setPreviousMemberID(versionVector.getCanonicalId(tag.getPreviousMemberID())); + } + } + } + + @Override + public boolean hasSeenEvent(InternalCacheEvent event) { + EventID eventID = event.getEventId(); + if (ignoreEvent(event, eventID)) { + return false; // not tracked + } + return hasSeenEvent(eventID, event); + } + + @Override + public boolean hasSeenEvent(EventID eventID) { + return hasSeenEvent(eventID, null); + } + + @Override + public boolean hasSeenEvent(EventID eventID, InternalCacheEvent tagHolder) { + if (eventID == null) { + return false; + } + + EventSequenceNumberHolder evh = getSequenceHolderForEvent(eventID); + if (evh == null) { + return false; + } + + synchronized (evh) { + if (evh.isRemoved() || evh.getLastSequenceNumber() < eventID.getSequenceID()) { + return false; + } + // log at fine because partitioned regions can send event multiple times + // during normal operation during bucket region initialization + if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER)) { + logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER, + "Cache encountered replay of event with ID {}. Highest recorded for this source is {}", + eventID, evh.getLastSequenceNumber()); + } + // bug #44956 - recover version tag for duplicate event + if (evh.getLastSequenceNumber() == eventID.getSequenceID() && tagHolder != null + && evh.getVersionTag() != null) { + ((EntryEventImpl) tagHolder).setVersionTag(evh.getVersionTag()); + } + return true; + } + } + + private EventSequenceNumberHolder getSequenceHolderForEvent(EventID eventID) { + ThreadIdentifier membershipID = createThreadIDFromEvent(eventID); + return recordedEvents.get(membershipID); + } + + @Override + public VersionTag findVersionTagForSequence(EventID eventID) { + EventSequenceNumberHolder evh = getSequenceHolderForEvent(eventID); + if (evh == null) { + if (logger.isDebugEnabled()) { + logger.debug("search for version tag failed as no event is recorded for {}", + createThreadIDFromEvent(eventID).expensiveToString()); + } + return null; + } + + synchronized (evh) { + if (logger.isDebugEnabled()) { + logger.debug("search for version tag located last event for {}: {}", + createThreadIDFromEvent(eventID).expensiveToString(), evh); + } + if (evh.getLastSequenceNumber() != eventID.getSequenceID()) { + return null; + } + // log at fine because partitioned regions can send event multiple times + // during normal operation during bucket region initialization + if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER) + && evh.getVersionTag() == null) { + logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER, + "Could not recover version tag. Found event holder with no version tag for {}", + eventID); + } + return evh.getVersionTag(); + } + } + + @Override + public VersionTag findVersionTagForBulkOp(EventID eventID) { + if (eventID == null) { + return null; + } + ThreadIdentifier threadID = createThreadIDFromEvent(eventID); + BulkOperationHolder evh = recordedBulkOpVersionTags.get(threadID); + if (evh == null) { + if (logger.isDebugEnabled()) { + logger.debug("search for version tag failed as no events are recorded for {}", + threadID.expensiveToString()); + } + return null; + } + + synchronized (evh) { + if (logger.isDebugEnabled()) { + logger.debug("search for version tag located event holder for {}: {}", + threadID.expensiveToString(), evh); + } + return evh.getEntryVersionTags().get(eventID); + } + } + + @Override + public String getName() { + return name; + } + + /** + * @return true if the event should not be tracked, false otherwise + */ + private boolean ignoreEvent(InternalCacheEvent event, EventID eventID) { + if (eventID == null) { + return true; + } else { + boolean isVersioned = (event.getVersionTag() != null); + boolean isClient = event.hasClientOrigin(); + if (isVersioned && isClient) { + return false; // version tags for client events are kept for retries by the client + } + boolean isEntry = event.getOperation().isEntry(); + boolean isPr = event.getRegion().getAttributes().getDataPolicy().withPartitioning() + || ((LocalRegion) event.getRegion()).isUsedForPartitionedRegionBucket(); + return (!isClient && // ignore if it originated on a server, and + isEntry && // it affects an entry and + !isPr); // is not on a PR + } + } + + @Override + public void syncBulkOp(Runnable r, EventID eventID, boolean partOfTransaction) { + if (partOfTransaction) { + r.run(); + return; + } + Assert.assertTrue(eventID != null); + ThreadIdentifier membershipID = createThreadIDFromEvent(eventID); + Object opSyncObj = null; + do { + opSyncObj = recordedBulkOps.putIfAbsent(membershipID, new Object()); + if (opSyncObj == null) { + opSyncObj = recordedBulkOps.get(membershipID); + } + } while (opSyncObj == null); + + synchronized (opSyncObj) { + try { + recordBulkOpStart(eventID, membershipID); + // Perform the bulk op + r.run(); + } finally { + recordedBulkOps.remove(membershipID); + } + } + } + + @Override + public void recordBulkOpStart(EventID eventID, ThreadIdentifier tid) { + if (logger.isDebugEnabled()) { + logger.debug("recording bulkOp start for {}", tid.expensiveToString()); + } + EventSequenceNumberHolder evh = recordedEvents.get(tid); + if (evh == null) { + return; + } + synchronized (evh) { + // only remove it when a new bulk op occurs + if (eventID.getSequenceID() > evh.getLastSequenceNumber()) { + this.recordedBulkOpVersionTags.remove(tid); + } + } + } + + @Override + public boolean isInitialized() { + return this.initialized; + } + + @Override + public boolean isInitialImageProvider(DistributedMember mbr) { + return (this.initialImageProvider != null) && (mbr != null) + && this.initialImageProvider.equals(mbr); + } + + public ConcurrentMap<ThreadIdentifier, BulkOperationHolder> getRecordedBulkOpVersionTags() { + return recordedBulkOpVersionTags; + } + + @Override + public ConcurrentMap<ThreadIdentifier, EventSequenceNumberHolder> getRecordedEvents() { + return recordedEvents; + } + + @Override + public String toString() { + return "" + this.name + "(initialized=" + this.initialized + ")"; + } + +}