http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventSequenceNumberHolder.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventSequenceNumberHolder.java b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventSequenceNumberHolder.java new file mode 100644 index 0000000..9bb450d --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventSequenceNumberHolder.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.event; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.geode.DataSerializable; +import org.apache.geode.DataSerializer; +import org.apache.geode.internal.cache.versions.VersionTag; + +/** + * A sequence number tracker to keep events from clients from being re-applied to the cache if + * they've already been seen. + * + * @since GemFire 5.5 + */ +public class EventSequenceNumberHolder implements DataSerializable { + private static final long serialVersionUID = 8137262960763308046L; + + /** + * event sequence number. These + */ + private long lastSequenceNumber = -1; + + /** + * millisecond timestamp + */ + private transient long endOfLifeTimestamp; + + /** + * whether this entry is being removed + */ + private transient boolean removed; + + /** + * version tag, if any, for the operation + */ + private VersionTag versionTag; + + // for debugging + // transient Exception context; + + EventSequenceNumberHolder(long id, VersionTag versionTag) { + this.lastSequenceNumber = id; + this.versionTag = versionTag; + } + + public EventSequenceNumberHolder() {} + + public long getLastSequenceNumber() { + return lastSequenceNumber; + } + + public VersionTag getVersionTag() { + return versionTag; + } + + public boolean isRemoved() { + return removed; + } + + void setRemoved(boolean removed) { + this.removed = removed; + } + + void setEndOfLifeTimestamp(long endOfLifeTimestamp) { + this.endOfLifeTimestamp = endOfLifeTimestamp; + } + + void setVersionTag(VersionTag versionTag) { + this.versionTag = versionTag; + } + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + result.append("seqNo").append(this.lastSequenceNumber); + if (this.versionTag != null) { + result.append(",").append(this.versionTag); + } + return result.toString(); + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + lastSequenceNumber = in.readLong(); + versionTag = (VersionTag) DataSerializer.readObject(in); + } + + public void toData(DataOutput out) throws IOException { + out.writeLong(lastSequenceNumber); + DataSerializer.writeObject(versionTag, out); + } + + public synchronized boolean expire(long now, long expirationTime) { + if (endOfLifeTimestamp == 0) { + endOfLifeTimestamp = now; // a new holder - start the timer + } + boolean expire = false; + if (endOfLifeTimestamp <= expirationTime) { + removed = true; + lastSequenceNumber = -1; + expire = true; + } + return expire; + } + + public void setLastSequenceNumber(long lastSequenceNumber) { + this.lastSequenceNumber = lastSequenceNumber; + } +}
http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTracker.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTracker.java new file mode 100644 index 0000000..43a0458 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTracker.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.event; + +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.InternalCacheEvent; +import org.apache.geode.internal.cache.ha.ThreadIdentifier; +import org.apache.geode.internal.cache.versions.VersionTag; + +/** + * EventTracker tracks the last sequence number for a particular memberID:threadID. It is used to + * avoid replaying events in client/server and partitioned-region configurations. + * + * @since GemFire 6.0 + */ +public interface EventTracker { + /** start this event tracker */ + void start(); + + /** stop this event tracker */ + void stop(); + + /** + * retrieve a deep copy of the state of the event tracker. Synchronization is not used while + * copying the tracker's state. + */ + Map<ThreadIdentifier, EventSequenceNumberHolder> getState(); + + /** + * record the given state in the tracker. + * + * @param provider the member that provided this state + * @param state a Map obtained from getState(); + */ + void recordState(InternalDistributedMember provider, + Map<ThreadIdentifier, EventSequenceNumberHolder> state); + + /** + * Use this method to ensure that the tracker is put in an initialized state + */ + void setInitialized(); + + /** + * Wait for the tracker to finish being initialized + */ + void waitOnInitialization() throws InterruptedException; + + /** record the event's threadid/sequenceid to prevent replay */ + void recordEvent(InternalCacheEvent event); + + /** + * Determines if an event has already been seen by the tracker + * + * @param event The event to determine if it has been seen by the tracker already + * @return if the event provided has already been seen + */ + boolean hasSeenEvent(InternalCacheEvent event); + + /** + * Determines if an event has already been seen by the tracker + * + * @param eventID The id of the event to determine if it has been seen by the tracker already + * @return if the event provided has already been seen + */ + boolean hasSeenEvent(EventID eventID); + + /** + * Determines if an event has already been seen by the tracker + * + * @param eventID The id of the event to determine if it has been seen by the tracker already + * @param tagHolder Event to update version tag with that of eventID, if event was seen before + * @return if the event provided has already been seen + */ + boolean hasSeenEvent(EventID eventID, InternalCacheEvent tagHolder); + + VersionTag findVersionTagForSequence(EventID eventID); + + VersionTag findVersionTagForBulkOp(EventID eventID); + + /** + * The name of the event tracker for logging purposes + * + * @return the name of the tracker + */ + String getName(); + + ConcurrentMap<ThreadIdentifier, BulkOperationHolder> getRecordedBulkOpVersionTags(); + + ConcurrentMap<ThreadIdentifier, EventSequenceNumberHolder> getRecordedEvents(); + + /** + * A routine to provide synchronization running based on <memberShipID, threadID> of the + * requesting client + * + * @param r - a Runnable to wrap the processing of the bulk op + * @param eventID - the base event ID of the bulk op + * + * @since GemFire 5.7 + */ + void syncBulkOp(Runnable r, EventID eventID, boolean partOfTransaction); + + /** + * Called when a new bulkOp is started on the local region. Used to clear event tracker state from + * the last bulkOp. + */ + void recordBulkOpStart(EventID eventID, ThreadIdentifier tid); + + /** + * @return the initialization state of the tracker + */ + boolean isInitialized(); + + /** + * @param mbr the member in question + * @return true if the given member provided the initial image event state for this tracker + */ + boolean isInitialImageProvider(DistributedMember mbr); + +} http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTrackerExpiryTask.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTrackerExpiryTask.java b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTrackerExpiryTask.java new file mode 100644 index 0000000..dff80df --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/event/EventTrackerExpiryTask.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.event; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.geode.internal.SystemTimer; +import org.apache.geode.internal.cache.ha.ThreadIdentifier; + +public class EventTrackerExpiryTask extends SystemTimer.SystemTimerTask { + + private final long lifetimeInMillis; + private final List<EventTracker> trackers = new LinkedList<>(); + private final boolean traceEnabled = logger.isTraceEnabled(); + + public EventTrackerExpiryTask(long lifetimeInMillis) { + this.lifetimeInMillis = lifetimeInMillis; + } + + void addTracker(EventTracker tracker) { + synchronized (trackers) { + trackers.add(tracker); + } + } + + void removeTracker(EventTracker tracker) { + synchronized (trackers) { + trackers.remove(tracker); + } + } + + int getNumberOfTrackers() { + return trackers.size(); + } + + @Override + public void run2() { + long now = System.currentTimeMillis(); + long expirationTime = now - lifetimeInMillis; + synchronized (trackers) { + for (EventTracker tracker : trackers) { + if (traceEnabled) { + logger.trace("{} sweeper: starting", tracker.getName()); + } + removeExpiredSequenceTracker(tracker, now, expirationTime); + removeExpiredBulkOperations(tracker, now, expirationTime); + + if (traceEnabled) { + logger.trace("{} sweeper: done", tracker.getName()); + } + } + } + } + + private void removeExpiredSequenceTracker(EventTracker tracker, long now, long expirationTime) { + for (Iterator<Map.Entry<ThreadIdentifier, EventSequenceNumberHolder>> entryIterator = + tracker.getRecordedEvents().entrySet().iterator(); entryIterator.hasNext();) { + Map.Entry<ThreadIdentifier, EventSequenceNumberHolder> entry = entryIterator.next(); + EventSequenceNumberHolder evh = entry.getValue(); + if (evh.expire(now, expirationTime)) { + if (traceEnabled) { + logger.trace("{} sweeper: removing {}", tracker.getName(), entry.getKey()); + } + entryIterator.remove(); + } + } + } + + private void removeExpiredBulkOperations(EventTracker tracker, long now, long expirationTime) { + for (Iterator<Map.Entry<ThreadIdentifier, BulkOperationHolder>> entryIterator = + tracker.getRecordedBulkOpVersionTags().entrySet().iterator(); entryIterator.hasNext();) { + Map.Entry<ThreadIdentifier, BulkOperationHolder> entry = entryIterator.next(); + BulkOperationHolder evh = entry.getValue(); + if (evh.expire(now, expirationTime)) { + if (traceEnabled) { + logger.trace("{} sweeper: removing bulkOp {}", tracker.getName(), entry.getKey()); + } + entryIterator.remove(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/event/NonDistributedEventTracker.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/event/NonDistributedEventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/event/NonDistributedEventTracker.java new file mode 100644 index 0000000..f5b1831 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/event/NonDistributedEventTracker.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.event; + +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.InternalCacheEvent; +import org.apache.geode.internal.cache.ha.ThreadIdentifier; +import org.apache.geode.internal.cache.versions.VersionTag; + +public class NonDistributedEventTracker implements EventTracker { + private static final NonDistributedEventTracker INSTANCE = new NonDistributedEventTracker(); + + static final String NAME = "The NonDistributedEventTracker"; + + public static NonDistributedEventTracker getInstance() { + return INSTANCE; + } + + private NonDistributedEventTracker() { + // private no arg constructor to enforce singleton pattern + } + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override + public Map<ThreadIdentifier, EventSequenceNumberHolder> getState() { + return null; + } + + @Override + public void recordState(InternalDistributedMember provider, Map state) { + + } + + @Override + public void recordEvent(InternalCacheEvent event) { + + } + + @Override + public boolean hasSeenEvent(InternalCacheEvent event) { + return false; + } + + @Override + public void waitOnInitialization() throws InterruptedException { + + } + + @Override + public VersionTag findVersionTagForSequence(EventID eventId) { + return null; + } + + @Override + public VersionTag findVersionTagForBulkOp(EventID eventId) { + return null; + } + + @Override + public String getName() { + return NAME; + } + + @Override + public boolean hasSeenEvent(EventID eventID) { + return false; + } + + @Override + public boolean hasSeenEvent(EventID eventID, InternalCacheEvent tagHolder) { + return false; + } + + @Override + public void recordBulkOpStart(EventID eventID, ThreadIdentifier membershipID) { + + } + + @Override + public void syncBulkOp(Runnable task, EventID eventId, boolean partOfTransaction) { + task.run(); + } + + @Override + public boolean isInitialized() { + return true; + } + + @Override + public void setInitialized() { + + } + + @Override + public boolean isInitialImageProvider(DistributedMember mbr) { + return false; + } + + @Override + public ConcurrentMap<ThreadIdentifier, BulkOperationHolder> getRecordedBulkOpVersionTags() { + return null; + } + + @Override + public ConcurrentMap<ThreadIdentifier, EventSequenceNumberHolder> getRecordedEvents() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java index cc78cca..1f47897 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java @@ -211,9 +211,9 @@ public abstract class BaseCommand implements Command { LocalRegion r = clientEvent.getRegion(); VersionTag tag; if (clientEvent.getVersionTag() != null && clientEvent.getVersionTag().isGatewayTag()) { - tag = r.findVersionTagForGatewayEvent(clientEvent.getEventId()); + tag = r.findVersionTagForEvent(clientEvent.getEventId()); } else { - tag = r.findVersionTagForClientEvent(clientEvent.getEventId()); + tag = r.findVersionTagForEvent(clientEvent.getEventId()); } if (tag == null) { if (r instanceof DistributedRegion || r instanceof PartitionedRegion) { http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java index 6fa17a4..9bad91a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -52,11 +52,13 @@ import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.cache.CachedDeserializable; import org.apache.geode.internal.cache.Conflatable; +import org.apache.geode.internal.cache.event.EventTracker; import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.InternalRegionArguments; import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.event.NonDistributedEventTracker; import org.apache.geode.internal.cache.RegionQueue; import org.apache.geode.internal.cache.Token; import org.apache.geode.internal.cache.versions.RegionVersionVector; @@ -1188,7 +1190,9 @@ public class SerialGatewaySenderQueue implements RegionQueue { // @override event tracker not needed for this type of region @Override - public void createEventTracker() {} + public EventTracker createEventTracker() { + return NonDistributedEventTracker.getInstance(); + } @Override protected boolean shouldNotifyBridgeClients() { http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java index 61f91a0..db5f7ca 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java @@ -88,7 +88,7 @@ import org.apache.geode.internal.cache.DiskStoreFactoryImpl; import org.apache.geode.internal.cache.DiskStoreImpl; import org.apache.geode.internal.cache.DiskStoreMonitor; import org.apache.geode.internal.cache.DistributedRegion; -import org.apache.geode.internal.cache.EventTracker.ExpiryTask; +import org.apache.geode.internal.cache.event.EventTrackerExpiryTask; import org.apache.geode.internal.cache.ExpirationScheduler; import org.apache.geode.internal.cache.FilterProfile; import org.apache.geode.internal.cache.GemFireCacheImpl; @@ -1670,7 +1670,7 @@ public class CacheCreation implements InternalCache { } @Override - public ExpiryTask getEventTrackerTask() { + public EventTrackerExpiryTask getEventTrackerTask() { throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString()); } http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java index d76ff1b..eaaa3aa 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java @@ -20,6 +20,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.*; +import java.io.IOException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -49,6 +50,9 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest { protected DistributedRegion createAndDefineRegion(boolean isConcurrencyChecksEnabled, RegionAttributes ra, InternalRegionArguments ira, GemFireCacheImpl cache) { BucketRegion br = new BucketRegion("testRegion", ra, null, cache, ira); + // it is necessary to set the event tracker to initialized, since initialize() in not being + // called on the instantiated region + br.getEventTracker().setInitialized(); // since br is a real bucket region object, we need to tell mockito to monitor it br = spy(br); http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java index ce21c67..78cdd84 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java @@ -27,7 +27,8 @@ import org.junit.experimental.categories.Category; import org.apache.geode.cache.Operation; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.internal.cache.EventTracker.BulkOpHolder; +import org.apache.geode.internal.cache.event.BulkOperationHolder; +import org.apache.geode.internal.cache.event.EventTracker; import org.apache.geode.internal.cache.ha.ThreadIdentifier; import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.cache.versions.VersionTag; @@ -112,7 +113,6 @@ public class DistributedRegionJUnitTest extends AbstractDistributedRegionJUnitTe DistributedRegion region = prepare(true, true); DistributedMember member = mock(DistributedMember.class); ClientProxyMembershipID memberId = mock(ClientProxyMembershipID.class); - doReturn(false).when(region).isUsedForPartitionedRegionBucket(); byte[] memId = {1, 2, 3}; long threadId = 1; @@ -124,8 +124,9 @@ public class DistributedRegionJUnitTest extends AbstractDistributedRegionJUnitTe recordPutAllEvents(region, memId, threadId, skipCallbacks, member, memberId, size); EventTracker eventTracker = region.getEventTracker(); - ConcurrentMap<ThreadIdentifier, BulkOpHolder> map = eventTracker.getRecordedBulkOpVersionTags(); - BulkOpHolder holder = map.get(tid); + ConcurrentMap<ThreadIdentifier, BulkOperationHolder> map = + eventTracker.getRecordedBulkOpVersionTags(); + BulkOperationHolder holder = map.get(tid); EntryEventImpl retryEvent = EntryEventImpl.create(region, Operation.PUTALL_CREATE, "key1", "value1", null, false, member, !skipCallbacks, retryEventID); @@ -133,7 +134,7 @@ public class DistributedRegionJUnitTest extends AbstractDistributedRegionJUnitTe retryEvent.setPutAllOperation(mock(DistributedPutAllOperation.class)); region.hasSeenEvent(retryEvent); - assertTrue(retryEvent.getVersionTag().equals(holder.entryVersionTags.get(retryEventID))); + assertTrue(retryEvent.getVersionTag().equals(holder.getEntryVersionTags().get(retryEventID))); } protected void recordPutAllEvents(DistributedRegion region, byte[] memId, long threadId, http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java deleted file mode 100755 index 77c0998..0000000 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java +++ /dev/null @@ -1,486 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache; - -import static org.junit.Assert.*; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - -import org.awaitility.Awaitility; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.geode.cache.AttributesFactory; -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheException; -import org.apache.geode.cache.PartitionAttributesFactory; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionFactory; -import org.apache.geode.cache.RegionShortcut; -import org.apache.geode.cache.Scope; -import org.apache.geode.cache.server.CacheServer; -import org.apache.geode.cache30.CacheSerializableRunnable; -import org.apache.geode.cache30.ClientServerTestCase; -import org.apache.geode.distributed.internal.DistributionConfig; -import org.apache.geode.internal.cache.EventTracker.BulkOpHolder; -import org.apache.geode.internal.cache.ha.ThreadIdentifier; -import org.apache.geode.test.dunit.Assert; -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.NetworkUtils; -import org.apache.geode.test.dunit.SerializableRunnable; -import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.junit.categories.DistributedTest; - -/** - * Tests <code>EventTracker</code> management. - * - * @since GemFire 6.5 - */ -@Category(DistributedTest.class) -public class EventTrackerDUnitTest extends JUnit4CacheTestCase { - - /** The port on which the <code>CacheServer</code> was started in this VM */ - private static int cacheServerPort; - - /** The <code>Cache</code>'s <code>ExpiryTask</code>'s ping interval */ - private static final String MESSAGE_TRACKING_TIMEOUT = "5000"; - - @Override - public final void postTearDownCacheTestCase() throws Exception { - disconnectAllFromDS(); - } - - /** - * Tests <code>EventTracker</code> is created and destroyed when a <code>Region</code> is created - * and destroyed. - */ - @Test - public void testEventTrackerCreateDestroy() throws CacheException { - // Verify the Cache's ExpiryTask contains no EventTrackers - GemFireCacheImpl cache = (GemFireCacheImpl) getCache(); - EventTracker.ExpiryTask expiryTask = cache.getEventTrackerTask(); - assertNotNull(expiryTask); - - // We start with 3 event trackers: - // one for the PDX registry region - // one for ManagementConstants.MONITORING_REGION - // one for ManagementConstants.NOTIFICATION_REGION - final int EXPECTED_TRACKERS = 3; - assertEquals(EXPECTED_TRACKERS, expiryTask.getNumberOfTrackers()); - - // Create a distributed Region - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - LocalRegion region = (LocalRegion) createRegion(getName(), factory.create()); - - // Verify an EventTracker is created and is empty - EventTracker eventTracker = region.getEventTracker(); - assertNotNull(eventTracker); - Map eventState = region.getEventState(); - assertNotNull(eventState); - assertEquals(0, eventState.size()); - - // Verify it and the root region's EventTracker are added to the Cache's ExpiryTask's trackers - assertEquals(EXPECTED_TRACKERS + 2, expiryTask.getNumberOfTrackers()); - - // Destroy the Region - region.destroyRegion(); - - // Verify the EventTracker is removed from the Cache's ExpiryTask's trackers - assertEquals(EXPECTED_TRACKERS + 1, expiryTask.getNumberOfTrackers()); - } - - /** - * Tests adding threads to an <code>EventTracker</code>. - */ - @Test - public void testEventTrackerAddThreadIdentifier() throws CacheException { - Host host = Host.getHost(0); - VM serverVM = host.getVM(0); - VM clientVM = host.getVM(1); - final String regionName = getName(); - - // Create Region in the server and verify tracker is created - serverVM.invoke(new CacheSerializableRunnable("Create server") { - public void run2() throws CacheException { - // Create a distributed Region - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - LocalRegion region = (LocalRegion) createRegion(regionName, factory.create()); - - // Verify an EventTracker is created - EventTracker eventTracker = region.getEventTracker(); - assertNotNull(eventTracker); - try { - startCacheServer(); - } catch (Exception ex) { - Assert.fail("While starting CacheServer", ex); - } - } - }); - - // Verify tracker in server contains no entries - serverVM.invoke(new CacheSerializableRunnable("Do puts") { - public void run2() throws CacheException { - LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName); - Map eventState = region.getEventState(); - assertEquals(0, eventState.size()); - } - }); - - // Create Create Region in the client - final int port = serverVM.invoke(() -> EventTrackerDUnitTest.getCacheServerPort()); - final String hostName = NetworkUtils.getServerHostName(host); - clientVM.invoke(new CacheSerializableRunnable("Create client") { - public void run2() throws CacheException { - getCache(); - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.LOCAL); - ClientServerTestCase.configureConnectionPool(factory, hostName, port, -1, false, -1, -1, - null); - createRegion(regionName, factory.create()); - } - }); - - // Do puts in the client - clientVM.invoke(new CacheSerializableRunnable("Do puts") { - public void run2() throws CacheException { - Region region = getRootRegion().getSubregion(regionName); - for (int i = 0; i < 10; i++) { - region.put(i, i); - } - } - }); - - // Verify tracker in server contains an entry for client thread - serverVM.invoke(new CacheSerializableRunnable("Do puts") { - public void run2() throws CacheException { - LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName); - Map eventState = region.getEventState(); - assertEquals(1, eventState.size()); - } - }); - } - - /** - * Tests adding events to and removing events from an <code>EventTracker</code>. - */ - @Test - public void testEventTrackerAddRemoveThreadIdentifier() throws CacheException { - Host host = Host.getHost(0); - VM serverVM = host.getVM(0); - VM clientVM = host.getVM(1); - final String regionName = getName(); - - // Create Region in the server and verify tracker is created - serverVM.invoke(new CacheSerializableRunnable("Create server") { - public void run2() throws CacheException { - // Set the message tracking timeout - System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "messageTrackingTimeout", - MESSAGE_TRACKING_TIMEOUT); - - // Create a distributed Region - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - LocalRegion region = (LocalRegion) createRegion(regionName, factory.create()); - - // Verify an EventTracker is created - EventTracker eventTracker = region.getEventTracker(); - assertNotNull(eventTracker); - try { - startCacheServer(); - } catch (Exception ex) { - Assert.fail("While starting CacheServer", ex); - } - } - }); - - // Verify tracker in server contains no entries - serverVM.invoke(new CacheSerializableRunnable("Do puts") { - public void run2() throws CacheException { - LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName); - Map eventState = region.getEventState(); - assertEquals(0, eventState.size()); - } - }); - - // Create Create Region in the client - final int port = serverVM.invoke(() -> EventTrackerDUnitTest.getCacheServerPort()); - final String hostName = NetworkUtils.getServerHostName(host); - clientVM.invoke(new CacheSerializableRunnable("Create client") { - public void run2() throws CacheException { - getCache(); - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.LOCAL); - ClientServerTestCase.configureConnectionPool(factory, hostName, port, -1, false, -1, -1, - null); - createRegion(regionName, factory.create()); - } - }); - - // Do puts in the client - clientVM.invoke(new CacheSerializableRunnable("Do puts") { - public void run2() throws CacheException { - Region region = getRootRegion().getSubregion(regionName); - for (int i = 0; i < 10; i++) { - region.put(i, i); - } - } - }); - - // Verify tracker in server - serverVM.invoke(new CacheSerializableRunnable("Do puts") { - public void run2() throws CacheException { - // First verify it contains an entry - LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName); - Map eventState = region.getEventState(); - assertEquals(1, eventState.size()); - - // Pause for the message tracking timeout - int waitTime = Integer.parseInt(MESSAGE_TRACKING_TIMEOUT) * 3; - Wait.pause(waitTime); - - // Verify the server no longer contains an entry - eventState = region.getEventState(); - assertEquals(0, eventState.size()); - } - }); - } - - /** - * Test to make sure we don't leak put all events in the event tracker after multiple putAlls - */ - @Test - public void testPutAllHoldersInEventTracker() { - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - VM vm2 = host.getVM(2); - - SerializableRunnable createRegion = new SerializableRunnable("createRegion") { - - public void run() { - Cache cache = getCache(); - RegionFactory<Object, Object> rf = - cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT); - PartitionAttributesFactory paf = new PartitionAttributesFactory(); - paf.setRedundantCopies(1); - paf.setTotalNumBuckets(3); - rf.setPartitionAttributes(paf.create()); - rf.setConcurrencyChecksEnabled(true); - rf.create("partitioned"); - - rf = cache.createRegionFactory(RegionShortcut.REPLICATE); - rf.setConcurrencyChecksEnabled(true); - rf.create("replicate"); - try { - startCacheServer(); - } catch (Exception ex) { - Assert.fail("While starting CacheServer", ex); - } - } - }; - - vm0.invoke(createRegion); - vm1.invoke(createRegion); - - // Create Create Region in the client - final int port = vm0.invoke(() -> EventTrackerDUnitTest.getCacheServerPort()); - final String hostName = NetworkUtils.getServerHostName(host); - vm2.invoke(new CacheSerializableRunnable("Create client") { - public void run2() throws CacheException { - getCache(); - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.LOCAL); - ClientServerTestCase.configureConnectionPool(factory, hostName, port, -1, false, -1, -1, - null); - createRootRegion("partitioned", factory.create()); - createRootRegion("replicate", factory.create()); - } - }); - - doTwoPutAlls(vm2, "partitioned"); - doTwoPutAlls(vm2, "replicate"); - - // Make sure that the event tracker for each bucket only records the last - // event. - checkBucketEventTracker(vm0, 0, 3); - checkBucketEventTracker(vm1, 0, 3); - checkBucketEventTracker(vm0, 1, 3); - checkBucketEventTracker(vm1, 1, 3); - checkBucketEventTracker(vm0, 2, 3); - checkBucketEventTracker(vm1, 2, 3); - - checkReplicateEventTracker(vm0, 9); - checkReplicateEventTracker(vm1, 9); - } - - private void doTwoPutAlls(VM vm, final String regionName) { - SerializableRunnable createData = new SerializableRunnable("putAlls") { - - public void run() { - Cache cache = getCache(); - Region region = cache.getRegion(regionName); - - Map putAllMap = new HashMap(); - for (int i = 0; i < 9; i++) { - putAllMap.put(i, i); - } - region.putAll(putAllMap); - - putAllMap.clear(); - for (int i = 10; i < 19; i++) { - putAllMap.put(i, i); - } - region.putAll(putAllMap); - } - }; - - vm.invoke(createData); - } - - private SerializableRunnable checkReplicateEventTracker(VM vm, final int expectedEntryCount) { - SerializableRunnable checkEventTracker = new SerializableRunnable("checkEventTracker") { - - public void run() { - Cache cache = getCache(); - DistributedRegion region = (DistributedRegion) cache.getRegion("replicate"); - checkEventTracker(region, expectedEntryCount); - } - }; - vm.invoke(checkEventTracker); - return checkEventTracker; - } - - private SerializableRunnable checkBucketEventTracker(VM vm, final int bucketNumber, - final int expectedEntryCount) { - SerializableRunnable checkEventTracker = new SerializableRunnable("checkEventTracker") { - - public void run() { - Cache cache = getCache(); - PartitionedRegion region = (PartitionedRegion) cache.getRegion("partitioned"); - BucketRegion br = region.getBucketRegion(bucketNumber); - - checkEventTracker(br, expectedEntryCount); - } - }; - vm.invoke(checkEventTracker); - return checkEventTracker; - } - - private void checkEventTracker(LocalRegion region, int numberOfEvents) { - EventTracker tracker = region.getEventTracker(); - ConcurrentMap<ThreadIdentifier, BulkOpHolder> memberToTags = - tracker.getRecordedBulkOpVersionTags(); - assertEquals("memberToTags=" + memberToTags, 1, memberToTags.size()); - BulkOpHolder holder = memberToTags.values().iterator().next(); - // We expect the holder to retain only the last putAll that was performed. - assertEquals("entryToVersionTags=" + holder.entryVersionTags, numberOfEvents, - holder.entryVersionTags.size()); - } - - protected void startCacheServer() throws IOException { - CacheServer cacheServer = getCache().addCacheServer(); - cacheServer.setPort(0); - cacheServer.start(); - cacheServerPort = cacheServer.getPort(); - } - - protected static int getCacheServerPort() { - return cacheServerPort; - } - - /** - * Tests event track is initialized after gii - */ - @Test - public void testEventTrackerIsInitalized() throws CacheException { - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - VM vm2 = host.getVM(2); - - createPRInVMs(vm0, vm1, vm2); - - createPR(); - - doPutsInVMs(vm0, vm1, vm2); - - doPuts(); - - verifyEventTrackerContent(); - - // close the region - getCache().getRegion(getName()).close(); - - // create the region again. - createPR(); - - for (int i = 0; i < 12; i++) { - waitEntryIsLocal(i); - } - - // verify event track initialized after create region - verifyEventTrackerContent(); - - } - - private void waitEntryIsLocal(int i) { - Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS) - .atMost(30, TimeUnit.SECONDS) - .until(() -> getCache().getRegion(getName()).getEntry(i) != null); - } - - private void verifyEventTrackerContent() { - PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(getName()); - BucketRegion br = pr.getDataStore().getLocalBucketById(0); - Map<?, ?> eventStates = br.getEventState(); - assertTrue(eventStates.size() == 4); - } - - public void createPRInVMs(VM... vms) { - for (VM vm : vms) { - vm.invoke(() -> createPR()); - } - } - - private void createPR() { - PartitionAttributesFactory paf = - new PartitionAttributesFactory().setRedundantCopies(3).setTotalNumBuckets(4); - RegionFactory fact = getCache().createRegionFactory(RegionShortcut.PARTITION) - .setPartitionAttributes(paf.create()); - fact.create(getName()); - } - - public void doPutsInVMs(VM... vms) { - for (VM vm : vms) { - vm.invoke(() -> doPuts()); - } - } - - private void doPuts() { - Region region = getCache().getRegion(getName()); - for (int i = 0; i < 12; i++) { - region.put(i, i); - } - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerTest.java deleted file mode 100644 index d74b3d5..0000000 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.internal.cache; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -import java.util.concurrent.ConcurrentMap; - -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.apache.geode.CancelCriterion; -import org.apache.geode.cache.DataPolicy; -import org.apache.geode.cache.Operation; -import org.apache.geode.cache.RegionAttributes; -import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.internal.cache.EventTracker.BulkOpHolder; -import org.apache.geode.internal.cache.ha.ThreadIdentifier; -import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; -import org.apache.geode.internal.cache.versions.VersionTag; -import org.apache.geode.test.junit.categories.UnitTest; - -@Category(UnitTest.class) -public class EventTrackerTest { - LocalRegion lr; - RegionAttributes<?, ?> ra; - EntryEventImpl[] events; - EventTracker eventTracker; - ClientProxyMembershipID memberId; - DistributedMember member; - - @Before - public void setUp() { - lr = mock(LocalRegion.class); - ra = mock(RegionAttributes.class); - when(lr.createStopper()).thenCallRealMethod(); - CancelCriterion stopper = lr.createStopper(); - when(lr.getStopper()).thenReturn(stopper); - memberId = mock(ClientProxyMembershipID.class); - when(lr.getAttributes()).thenReturn(ra); - when(ra.getDataPolicy()).thenReturn(mock(DataPolicy.class)); - when(lr.getConcurrencyChecksEnabled()).thenReturn(true); - - member = mock(DistributedMember.class); - } - - @Test - public void retriedBulkOpDoesNotRemoveRecordedBulkOpVersionTags() { - byte[] memId = {1, 2, 3}; - long threadId = 1; - long retrySeqId = 1; - ThreadIdentifier tid = new ThreadIdentifier(memId, threadId); - EventID retryEventID = new EventID(memId, threadId, retrySeqId); - boolean skipCallbacks = true; - int size = 5; - recordPutAllEvents(memId, threadId, skipCallbacks, size); - - ConcurrentMap<ThreadIdentifier, BulkOpHolder> map = eventTracker.getRecordedBulkOpVersionTags(); - BulkOpHolder holder = map.get(tid); - int beforeSize = holder.entryVersionTags.size(); - - eventTracker.recordBulkOpStart(tid, retryEventID); - map = eventTracker.getRecordedBulkOpVersionTags(); - holder = map.get(tid); - // Retried bulk op should not remove exiting BulkOpVersionTags - assertTrue(holder.entryVersionTags.size() == beforeSize); - } - - private void recordPutAllEvents(byte[] memId, long threadId, boolean skipCallbacks, int size) { - events = new EntryEventImpl[size]; - eventTracker = new EventTracker(lr); - for (int i = 0; i < size; i++) { - events[i] = EntryEventImpl.create(lr, Operation.PUTALL_CREATE, "key" + i, "value" + i, null, - false, member, !skipCallbacks, new EventID(memId, threadId, i + 1)); - events[i].setContext(memberId); - events[i].setVersionTag(mock(VersionTag.class)); - eventTracker.recordEvent(events[i]); - } - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/test/java/org/apache/geode/internal/cache/IteratorDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/IteratorDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/IteratorDUnitTest.java index 6ca11b2..a1922cf 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/IteratorDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/IteratorDUnitTest.java @@ -58,7 +58,7 @@ public class IteratorDUnitTest extends JUnit4CacheTestCase { r.put("key3", "value3"); LocalRegion lr = (LocalRegion) r; // simulate a removed key - // lr.getRegionMap().getEntry("key")._setValue(Token.REMOVED_PHASE1); + // region.getRegionMap().getEntry("key")._setValue(Token.REMOVED_PHASE1); lr.getRegionMap().getEntry("key").setValue(lr, Token.REMOVED_PHASE1); Iterator it = r.keySet().iterator(); int numKeys = 0; http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDUnitTestCase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDUnitTestCase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDUnitTestCase.java index 54263b9..c3aff4c 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDUnitTestCase.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDUnitTestCase.java @@ -192,9 +192,9 @@ public class PartitionedRegionDUnitTestCase extends JUnit4CacheTestCase { final long recoveryDelay) { return new CacheSerializableRunnable("getCreateMultiplePRregion") { public void run2() throws CacheException { - // final Random ra = new Random(); + // final Random regionAttributes = new Random(); for (int i = 0; i < maxIndex; i++) { - // final int rind = ra.nextInt(maxIndex); + // final int rind = regionAttributes.nextInt(maxIndex); try { getCache().createRegion(prPrefix + i, PartitionedRegionTestHelper .createRegionAttrsForPR(redundancy, localmaxMemory, recoveryDelay)); http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java index e69aa95..cd8408a 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java @@ -581,11 +581,12 @@ public class PartitionedRegionSingleNodeOperationsJUnitTest { * * Region distRegion = null; * - * AttributesFactory af = new AttributesFactory(); RegionAttributes ra; // setting property - * af.setScope(Scope.DISTRIBUTED_ACK); // creating region attributes ra = af.create(); try { - * distRegion = PartitionedRegionTestHelper.createCache().createRegion( diRegion, ra); } catch - * (RegionExistsException rex) { distRegion = PartitionedRegionTestHelper.createCache() - * .getRegion(diRegion); } // Closing the regions distRegion.close(); pr.close(); + * AttributesFactory af = new AttributesFactory(); RegionAttributes regionAttributes; // setting + * property af.setScope(Scope.DISTRIBUTED_ACK); // creating region attributes regionAttributes = + * af.create(); try { distRegion = PartitionedRegionTestHelper.createCache().createRegion( + * diRegion, regionAttributes); } catch (RegionExistsException rex) { distRegion = + * PartitionedRegionTestHelper.createCache() .getRegion(diRegion); } // Closing the regions + * distRegion.close(); pr.close(); * * if (!pr.getCache().equals(distRegion.getCache())) { * fail("testValidateCloseFunction: getCache is not matching. "); } else { if http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/test/java/org/apache/geode/internal/cache/event/DistributedEventTrackerTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/event/DistributedEventTrackerTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/event/DistributedEventTrackerTest.java new file mode 100644 index 0000000..fd69f98 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/event/DistributedEventTrackerTest.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.event; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.EntryEventImpl; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.InternalCacheEvent; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.ha.ThreadIdentifier; +import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; +import org.apache.geode.internal.cache.versions.VersionTag; +import org.apache.geode.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class DistributedEventTrackerTest { + LocalRegion region; + RegionAttributes<?, ?> regionAttributes; + DistributedEventTracker eventTracker; + ClientProxyMembershipID memberId; + DistributedMember member; + + @Before + public void setup() { + region = mock(LocalRegion.class); + regionAttributes = mock(RegionAttributes.class); + when(region.createStopper()).thenCallRealMethod(); + memberId = mock(ClientProxyMembershipID.class); + when(region.getAttributes()).thenReturn(regionAttributes); + when(regionAttributes.getDataPolicy()).thenReturn(mock(DataPolicy.class)); + when(region.getConcurrencyChecksEnabled()).thenReturn(true); + + member = mock(DistributedMember.class); + eventTracker = new DistributedEventTracker(region.getCache(), mock(CancelCriterion.class), + region.getName()); + } + + @Test + public void retriedBulkOpDoesNotRemoveRecordedBulkOpVersionTags() { + byte[] memId = {1, 2, 3}; + long threadId = 1; + long retrySeqId = 1; + ThreadIdentifier tid = new ThreadIdentifier(memId, threadId); + EventID retryEventID = new EventID(memId, threadId, retrySeqId); + boolean skipCallbacks = true; + int size = 5; + recordPutAllEvents(memId, threadId, skipCallbacks, size); + + ConcurrentMap<ThreadIdentifier, BulkOperationHolder> map = + eventTracker.getRecordedBulkOpVersionTags(); + BulkOperationHolder holder = map.get(tid); + int beforeSize = holder.getEntryVersionTags().size(); + + eventTracker.recordBulkOpStart(retryEventID, tid); + map = eventTracker.getRecordedBulkOpVersionTags(); + holder = map.get(tid); + // Retried bulk op should not remove exiting BulkOpVersionTags + assertTrue(holder.getEntryVersionTags().size() == beforeSize); + } + + private void recordPutAllEvents(byte[] memId, long threadId, boolean skipCallbacks, int size) { + for (int i = 0; i < size; i++) { + putEvent("key" + i, "value" + i, memId, threadId, skipCallbacks, i + 1); + EntryEventImpl event = EntryEventImpl.create(region, Operation.PUTALL_CREATE, "key" + i, + "value" + i, null, false, member, !skipCallbacks, new EventID(memId, threadId, i + 1)); + event.setContext(memberId); + event.setVersionTag(mock(VersionTag.class)); + eventTracker.recordEvent(event); + } + } + + private void putEvent(String key, String value, byte[] memId, long threadId, + boolean skipCallbacks, int sequenceId) { + EntryEventImpl event = EntryEventImpl.create(region, Operation.PUTALL_CREATE, key, value, null, + false, member, !skipCallbacks, new EventID(memId, threadId, sequenceId)); + event.setContext(memberId); + event.setVersionTag(mock(VersionTag.class)); + eventTracker.recordEvent(event); + } + + private void putEvent(String key, String value, byte[] memId, long threadId, + boolean skipCallbacks, int sequenceId, VersionTag tag) { + EntryEventImpl event = EntryEventImpl.create(region, Operation.PUTALL_CREATE, key, value, null, + false, member, !skipCallbacks, new EventID(memId, threadId, sequenceId)); + event.setContext(memberId); + event.setVersionTag(tag); + eventTracker.recordEvent(event); + } + + @Test + public void returnsCorrectNameOfCache() { + String testName = "testing"; + when(region.getName()).thenReturn(testName); + eventTracker = new DistributedEventTracker(region.getCache(), mock(CancelCriterion.class), + region.getName()); + assertEquals("Event Tracker for " + testName, eventTracker.getName()); + } + + @Test + public void initializationCorrectlyReadiesTheTracker() throws InterruptedException { + assertFalse(eventTracker.isInitialized()); + eventTracker.setInitialized(); + assertTrue(eventTracker.isInitialized()); + eventTracker.waitOnInitialization(); + } + + @Test + public void startAndStopAddAndRemoveTrackerFromExpiryTask() { + EventTrackerExpiryTask task = mock(EventTrackerExpiryTask.class); + InternalCache cache = mock(InternalCache.class); + when(region.getCache()).thenReturn(cache); + when(cache.getEventTrackerTask()).thenReturn(task); + eventTracker = new DistributedEventTracker(region.getCache(), mock(CancelCriterion.class), + region.getName()); + eventTracker.start(); + verify(task, times(1)).addTracker(eventTracker); + eventTracker.stop(); + verify(task, times(1)).removeTracker(eventTracker); + } + + @Test + public void returnsEmptyMapIfRecordedEventsAreEmpty() { + assertEquals(0, eventTracker.getState().size()); + } + + @Test + public void returnsMapContainingSequenceIdHoldersCurrentlyPresent() { + EventSequenceNumberHolder sequenceIdHolder = new EventSequenceNumberHolder(0L, null); + ThreadIdentifier threadId = new ThreadIdentifier(new byte[0], 0L); + eventTracker.recordSequenceNumber(threadId, sequenceIdHolder); + Map<ThreadIdentifier, EventSequenceNumberHolder> state = eventTracker.getState(); + assertEquals(1, state.size()); + EventSequenceNumberHolder returnedHolder = state.get(threadId); + assertNotNull(returnedHolder); + // the version tag is stripped out on purpose, so passed in object and returned one are not + // equal to each other + assertNull(returnedHolder.getVersionTag()); + assertEquals(sequenceIdHolder.getLastSequenceNumber(), returnedHolder.getLastSequenceNumber()); + } + + @Test + public void setToInitializedWhenStateRecorded() { + eventTracker.recordState(null, Collections.emptyMap()); + assertTrue(eventTracker.isInitialized()); + } + + @Test + public void setsInitialImageProvidedWhenStateRecorded() { + InternalDistributedMember distributedMember = mock(InternalDistributedMember.class); + eventTracker.recordState(distributedMember, Collections.emptyMap()); + assertTrue(eventTracker.isInitialImageProvider(distributedMember)); + } + + @Test + public void entryInRecordedStateStoredWhenNotInCurrentState() { + EventSequenceNumberHolder sequenceIdHolder = new EventSequenceNumberHolder(0L, null); + ThreadIdentifier threadId = new ThreadIdentifier(new byte[0], 0L); + Map<ThreadIdentifier, EventSequenceNumberHolder> state = + Collections.singletonMap(threadId, sequenceIdHolder); + eventTracker.recordState(null, state); + Map<ThreadIdentifier, EventSequenceNumberHolder> storedState = eventTracker.getState(); + assertEquals(storedState.get(threadId).getLastSequenceNumber(), + sequenceIdHolder.getLastSequenceNumber()); + } + + @Test + public void entryInRecordedStateNotStoredIfAlreadyInCurrentState() { + EventSequenceNumberHolder originalSequenceIdHolder = new EventSequenceNumberHolder(0L, null); + ThreadIdentifier threadId = new ThreadIdentifier(new byte[0], 0L); + Map<ThreadIdentifier, EventSequenceNumberHolder> state = + Collections.singletonMap(threadId, originalSequenceIdHolder); + eventTracker.recordState(null, state); + + EventSequenceNumberHolder newSequenceIdHolder = new EventSequenceNumberHolder(1L, null); + Map<ThreadIdentifier, EventSequenceNumberHolder> newState = + Collections.singletonMap(threadId, newSequenceIdHolder); + eventTracker.recordState(null, newState); + + Map<ThreadIdentifier, EventSequenceNumberHolder> storedState = eventTracker.getState(); + assertEquals(storedState.get(threadId).getLastSequenceNumber(), + originalSequenceIdHolder.getLastSequenceNumber()); + } + + @Test + public void hasSeenEventReturnsFalseForEventWithNoID() { + InternalCacheEvent event = mock(InternalCacheEvent.class); + when(event.getEventId()).thenReturn(null); + assertFalse(eventTracker.hasSeenEvent(event)); + } + + @Test + public void hasSeenEventReturnsFalseForNullEventID() { + assertFalse(eventTracker.hasSeenEvent((EventID) null)); + assertFalse(eventTracker.hasSeenEvent(null, null)); + } + + @Test + public void hasNotSeenEventIDThatIsNotInRecordedEvents() { + EventID eventID = new EventID(new byte[0], 0L, 0L); + assertFalse(eventTracker.hasSeenEvent(eventID)); + } + + @Test + public void hasSeenEventIDThatIsInRecordedEvents() { + EventID eventID = new EventID(new byte[0], 0L, 0L); + recordSequence(eventID); + assertTrue(eventTracker.hasSeenEvent(eventID)); + } + + @Test + public void hasNotSeenEventIDWhosSequenceIDIsMarkedRemoved() { + EventID eventID = new EventID(new byte[0], 0L, 0L); + EventSequenceNumberHolder sequenceIdHolder = + new EventSequenceNumberHolder(eventID.getSequenceID(), null); + sequenceIdHolder.setRemoved(true); + ThreadIdentifier threadId = new ThreadIdentifier(new byte[0], 0L); + eventTracker.recordSequenceNumber(threadId, sequenceIdHolder); + + assertFalse(eventTracker.hasSeenEvent(eventID)); + } + + @Test + public void hasNotSeeEventIDWhosSequenceIDIsLargerThanSeen() { + EventID eventID = new EventID(new byte[0], 0L, 0L); + recordSequence(eventID); + + EventID higherSequenceID = new EventID(new byte[0], 0L, 1); + assertFalse(eventTracker.hasSeenEvent(higherSequenceID)); + } + + @Test + public void returnsNoTagIfNoSequenceForEvent() { + EventID eventID = new EventID(new byte[0], 0L, 1L); + assertNull(eventTracker.findVersionTagForSequence(eventID)); + } + + @Test + public void returnsNoTagIfSequencesDoNotMatchForEvent() { + EventID eventID = new EventID(new byte[0], 0L, 1); + recordSequence(eventID); + assertNull(eventTracker.findVersionTagForSequence(eventID)); + } + + @Test + public void returnsCorrectTagForEvent() { + EventID eventID = new EventID(new byte[0], 0L, 0L); + EventSequenceNumberHolder sequenceIdHolder = recordSequence(eventID); + assertEquals(sequenceIdHolder.getVersionTag(), eventTracker.findVersionTagForSequence(eventID)); + } + + @Test + public void returnsNoTagIfNoBulkOpWhenNoEventGiven() { + assertNull(eventTracker.findVersionTagForBulkOp(null)); + } + + @Test + public void returnsNoTagIfNoBulkOpForEventWithSequence() { + EventID eventID = new EventID(new byte[0], 0L, 1L); + assertNull(eventTracker.findVersionTagForBulkOp(eventID)); + } + + @Test + public void returnsNoTagIfBulkOpsDoNotMatchForEvent() { + putEvent("key", "value", new byte[0], 0, false, 0); + EventID eventIDWithoutBulkOp = new EventID(new byte[0], 0L, 1); + assertNull(eventTracker.findVersionTagForBulkOp(eventIDWithoutBulkOp)); + } + + @Test + public void returnsCorrectTagForEventWithBulkOp() { + EventID eventID = new EventID(new byte[0], 0L, 0L); + VersionTag tag = mock(VersionTag.class); + putEvent("key", "value", new byte[0], 0, false, 0, tag); + assertEquals(tag, eventTracker.findVersionTagForBulkOp(eventID)); + } + + @Test + public void executesABulkOperations() { + EventID eventID = new EventID(new byte[0], 0L, 1L); + Runnable bulkOperation = mock(Runnable.class); + eventTracker.syncBulkOp(bulkOperation, eventID, false); + verify(bulkOperation, times(1)).run(); + } + + @Test + public void executesRunnableIfNotPartOfATransaction() { + EventID eventID = new EventID(new byte[0], 0L, 1L); + Runnable bulkOperation = mock(Runnable.class); + eventTracker.syncBulkOp(bulkOperation, eventID, true); + verify(bulkOperation, times(1)).run(); + } + + private EventSequenceNumberHolder recordSequence(EventID eventID) { + EventSequenceNumberHolder sequenceIdHolder = + new EventSequenceNumberHolder(eventID.getSequenceID(), null); + ThreadIdentifier threadIdentifier = new ThreadIdentifier(new byte[0], eventID.getThreadID()); + eventTracker.recordSequenceNumber(threadIdentifier, sequenceIdHolder); + return sequenceIdHolder; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/9e7696a6/geode-core/src/test/java/org/apache/geode/internal/cache/event/EventTrackerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/event/EventTrackerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/event/EventTrackerDUnitTest.java new file mode 100755 index 0000000..b85827c --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/event/EventTrackerDUnitTest.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.event; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import org.awaitility.Awaitility; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.Scope; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache30.CacheSerializableRunnable; +import org.apache.geode.cache30.ClientServerTestCase; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.internal.cache.BucketRegion; +import org.apache.geode.internal.cache.DistributedRegion; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.ha.ThreadIdentifier; +import org.apache.geode.test.dunit.Assert; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.NetworkUtils; +import org.apache.geode.test.dunit.SerializableRunnable; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.Wait; +import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; +import org.apache.geode.test.junit.categories.DistributedTest; + +/** + * Tests <code>EventTracker</code> management. + * + * @since GemFire 6.5 + */ +@Category(DistributedTest.class) +public class EventTrackerDUnitTest extends JUnit4CacheTestCase { + + /** The port on which the <code>CacheServer</code> was started in this VM */ + private static int cacheServerPort; + + /** The <code>Cache</code>'s <code>ExpiryTask</code>'s ping interval */ + private static final String MESSAGE_TRACKING_TIMEOUT = "5000"; + + @Override + public final void postTearDownCacheTestCase() throws Exception { + disconnectAllFromDS(); + } + + /** + * Tests <code>EventTracker</code> is created and destroyed when a <code>Region</code> is created + * and destroyed. + */ + @Test + public void testEventTrackerCreateDestroy() throws CacheException { + // Verify the Cache's ExpiryTask contains no EventTrackers + GemFireCacheImpl cache = (GemFireCacheImpl) getCache(); + EventTrackerExpiryTask expiryTask = cache.getEventTrackerTask(); + assertNotNull(expiryTask); + + // We start with 3 event trackers: + // one for the PDX registry region + // one for ManagementConstants.MONITORING_REGION + // one for ManagementConstants.NOTIFICATION_REGION + final int EXPECTED_TRACKERS = 3; + assertEquals(EXPECTED_TRACKERS, expiryTask.getNumberOfTrackers()); + + // Create a distributed Region + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + LocalRegion region = (LocalRegion) createRegion(getName(), factory.create()); + + // Verify an EventTracker is created and is empty + EventTracker eventTracker = region.getEventTracker(); + assertNotNull(eventTracker); + Map eventState = region.getEventState(); + assertNotNull(eventState); + assertEquals(0, eventState.size()); + + // Verify it and the root region's EventTracker are added to the Cache's ExpiryTask's trackers + assertEquals(EXPECTED_TRACKERS + 2, expiryTask.getNumberOfTrackers()); + + // Destroy the Region + region.destroyRegion(); + + // Verify the EventTracker is removed from the Cache's ExpiryTask's trackers + assertEquals(EXPECTED_TRACKERS + 1, expiryTask.getNumberOfTrackers()); + } + + /** + * Tests adding threads to an <code>EventTracker</code>. + */ + @Test + public void testEventTrackerAddThreadIdentifier() throws CacheException { + Host host = Host.getHost(0); + VM serverVM = host.getVM(0); + VM clientVM = host.getVM(1); + final String regionName = getName(); + + // Create Region in the server and verify tracker is created + serverVM.invoke(new CacheSerializableRunnable("Create server") { + public void run2() throws CacheException { + // Create a distributed Region + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + LocalRegion region = (LocalRegion) createRegion(regionName, factory.create()); + + // Verify an EventTracker is created + EventTracker eventTracker = region.getEventTracker(); + assertNotNull(eventTracker); + try { + startCacheServer(); + } catch (Exception ex) { + Assert.fail("While starting CacheServer", ex); + } + } + }); + + // Verify tracker in server contains no entries + serverVM.invoke(new CacheSerializableRunnable("Do puts") { + public void run2() throws CacheException { + LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName); + Map eventState = region.getEventState(); + assertEquals(0, eventState.size()); + } + }); + + // Create Create Region in the client + final int port = serverVM.invoke(() -> EventTrackerDUnitTest.getCacheServerPort()); + final String hostName = NetworkUtils.getServerHostName(host); + clientVM.invoke(new CacheSerializableRunnable("Create client") { + public void run2() throws CacheException { + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + ClientServerTestCase.configureConnectionPool(factory, hostName, port, -1, false, -1, -1, + null); + createRegion(regionName, factory.create()); + } + }); + + // Do puts in the client + clientVM.invoke(new CacheSerializableRunnable("Do puts") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(regionName); + for (int i = 0; i < 10; i++) { + region.put(i, i); + } + } + }); + + // Verify tracker in server contains an entry for client thread + serverVM.invoke(new CacheSerializableRunnable("Do puts") { + public void run2() throws CacheException { + LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName); + Map eventState = region.getEventState(); + assertEquals(1, eventState.size()); + } + }); + } + + /** + * Tests adding events to and removing events from an <code>EventTracker</code>. + */ + @Test + public void testEventTrackerAddRemoveThreadIdentifier() throws CacheException { + Host host = Host.getHost(0); + VM serverVM = host.getVM(0); + VM clientVM = host.getVM(1); + final String regionName = getName(); + + // Create Region in the server and verify tracker is created + serverVM.invoke(new CacheSerializableRunnable("Create server") { + public void run2() throws CacheException { + // Set the message tracking timeout + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "messageTrackingTimeout", + MESSAGE_TRACKING_TIMEOUT); + + // Create a distributed Region + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + LocalRegion region = (LocalRegion) createRegion(regionName, factory.create()); + + // Verify an EventTracker is created + EventTracker eventTracker = region.getEventTracker(); + assertNotNull(eventTracker); + try { + startCacheServer(); + } catch (Exception ex) { + Assert.fail("While starting CacheServer", ex); + } + } + }); + + // Verify tracker in server contains no entries + serverVM.invoke(new CacheSerializableRunnable("Do puts") { + public void run2() throws CacheException { + LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName); + Map eventState = region.getEventState(); + assertEquals(0, eventState.size()); + } + }); + + // Create Create Region in the client + final int port = serverVM.invoke(() -> EventTrackerDUnitTest.getCacheServerPort()); + final String hostName = NetworkUtils.getServerHostName(host); + clientVM.invoke(new CacheSerializableRunnable("Create client") { + public void run2() throws CacheException { + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + ClientServerTestCase.configureConnectionPool(factory, hostName, port, -1, false, -1, -1, + null); + createRegion(regionName, factory.create()); + } + }); + + // Do puts in the client + clientVM.invoke(new CacheSerializableRunnable("Do puts") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(regionName); + for (int i = 0; i < 10; i++) { + region.put(i, i); + } + } + }); + + // Verify tracker in server + serverVM.invoke(new CacheSerializableRunnable("Do puts") { + public void run2() throws CacheException { + // First verify it contains an entry + LocalRegion region = (LocalRegion) getRootRegion().getSubregion(regionName); + Map eventState = region.getEventState(); + assertEquals(1, eventState.size()); + + // Pause for the message tracking timeout + int waitTime = Integer.parseInt(MESSAGE_TRACKING_TIMEOUT) * 3; + Wait.pause(waitTime); + + // Verify the server no longer contains an entry + eventState = region.getEventState(); + assertEquals(0, eventState.size()); + } + }); + } + + /** + * Test to make sure we don't leak put all events in the event tracker after multiple putAlls + */ + @Test + public void testPutAllHoldersInEventTracker() { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + SerializableRunnable createRegion = new SerializableRunnable("createRegion") { + + public void run() { + Cache cache = getCache(); + RegionFactory<Object, Object> rf = + cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(1); + paf.setTotalNumBuckets(3); + rf.setPartitionAttributes(paf.create()); + rf.setConcurrencyChecksEnabled(true); + rf.create("partitioned"); + + rf = cache.createRegionFactory(RegionShortcut.REPLICATE); + rf.setConcurrencyChecksEnabled(true); + rf.create("replicate"); + try { + startCacheServer(); + } catch (Exception ex) { + Assert.fail("While starting CacheServer", ex); + } + } + }; + + vm0.invoke(createRegion); + vm1.invoke(createRegion); + + // Create Create Region in the client + final int port = vm0.invoke(() -> EventTrackerDUnitTest.getCacheServerPort()); + final String hostName = NetworkUtils.getServerHostName(host); + vm2.invoke(new CacheSerializableRunnable("Create client") { + public void run2() throws CacheException { + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + ClientServerTestCase.configureConnectionPool(factory, hostName, port, -1, false, -1, -1, + null); + createRootRegion("partitioned", factory.create()); + createRootRegion("replicate", factory.create()); + } + }); + + doTwoPutAlls(vm2, "partitioned"); + doTwoPutAlls(vm2, "replicate"); + + // Make sure that the event tracker for each bucket only records the last + // event. + checkBucketEventTracker(vm0, 0, 3); + checkBucketEventTracker(vm1, 0, 3); + checkBucketEventTracker(vm0, 1, 3); + checkBucketEventTracker(vm1, 1, 3); + checkBucketEventTracker(vm0, 2, 3); + checkBucketEventTracker(vm1, 2, 3); + + checkReplicateEventTracker(vm0, 9); + checkReplicateEventTracker(vm1, 9); + } + + private void doTwoPutAlls(VM vm, final String regionName) { + SerializableRunnable createData = new SerializableRunnable("putAlls") { + + public void run() { + Cache cache = getCache(); + Region region = cache.getRegion(regionName); + + Map putAllMap = new HashMap(); + for (int i = 0; i < 9; i++) { + putAllMap.put(i, i); + } + region.putAll(putAllMap); + + putAllMap.clear(); + for (int i = 10; i < 19; i++) { + putAllMap.put(i, i); + } + region.putAll(putAllMap); + } + }; + + vm.invoke(createData); + } + + private SerializableRunnable checkReplicateEventTracker(VM vm, final int expectedEntryCount) { + SerializableRunnable checkEventTracker = new SerializableRunnable("checkEventTracker") { + + public void run() { + Cache cache = getCache(); + DistributedRegion region = (DistributedRegion) cache.getRegion("replicate"); + checkEventTracker(region, expectedEntryCount); + } + }; + vm.invoke(checkEventTracker); + return checkEventTracker; + } + + private SerializableRunnable checkBucketEventTracker(VM vm, final int bucketNumber, + final int expectedEntryCount) { + SerializableRunnable checkEventTracker = new SerializableRunnable("checkEventTracker") { + + public void run() { + Cache cache = getCache(); + PartitionedRegion region = (PartitionedRegion) cache.getRegion("partitioned"); + BucketRegion br = region.getBucketRegion(bucketNumber); + + checkEventTracker(br, expectedEntryCount); + } + }; + vm.invoke(checkEventTracker); + return checkEventTracker; + } + + private void checkEventTracker(LocalRegion region, int numberOfEvents) { + EventTracker tracker = region.getEventTracker(); + ConcurrentMap<ThreadIdentifier, BulkOperationHolder> memberToTags = + tracker.getRecordedBulkOpVersionTags(); + assertEquals("memberToTags=" + memberToTags, 1, memberToTags.size()); + BulkOperationHolder holder = memberToTags.values().iterator().next(); + // We expect the holder to retain only the last putAll that was performed. + assertEquals("entryToVersionTags=" + holder.getEntryVersionTags(), numberOfEvents, + holder.getEntryVersionTags().size()); + } + + protected void startCacheServer() throws IOException { + CacheServer cacheServer = getCache().addCacheServer(); + cacheServer.setPort(0); + cacheServer.start(); + cacheServerPort = cacheServer.getPort(); + } + + protected static int getCacheServerPort() { + return cacheServerPort; + } + + /** + * Tests event track is initialized after gii + */ + @Test + public void testEventTrackerIsInitalized() throws CacheException { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + createPRInVMs(vm0, vm1, vm2); + + createPR(); + + doPutsInVMs(vm0, vm1, vm2); + + doPuts(); + + verifyEventTrackerContent(); + + // close the region + getCache().getRegion(getName()).close(); + + // create the region again. + createPR(); + + for (int i = 0; i < 12; i++) { + waitEntryIsLocal(i); + } + + // verify event track initialized after create region + verifyEventTrackerContent(); + + } + + private void waitEntryIsLocal(int i) { + Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .until(() -> getCache().getRegion(getName()).getEntry(i) != null); + } + + private void verifyEventTrackerContent() { + PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(getName()); + BucketRegion br = pr.getDataStore().getLocalBucketById(0); + Map<?, ?> eventStates = br.getEventState(); + assertTrue(eventStates.size() == 4); + } + + public void createPRInVMs(VM... vms) { + for (VM vm : vms) { + vm.invoke(() -> createPR()); + } + } + + private void createPR() { + PartitionAttributesFactory paf = + new PartitionAttributesFactory().setRedundantCopies(3).setTotalNumBuckets(4); + RegionFactory fact = getCache().createRegionFactory(RegionShortcut.PARTITION) + .setPartitionAttributes(paf.create()); + fact.create(getName()); + } + + public void doPutsInVMs(VM... vms) { + for (VM vm : vms) { + vm.invoke(() -> doPuts()); + } + } + + private void doPuts() { + Region region = getCache().getRegion(getName()); + for (int i = 0; i < 12; i++) { + region.put(i, i); + } + } +}