This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEM-883 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 335cce92548f18f6a376947f8df2b8af22d8da51 Author: zhouxh <[email protected]> AuthorDate: Thu Nov 9 23:49:29 2017 -0800 GEODE-3967: if put hits concurrent modification exception should still notify serial gateway sender --- .../org/apache/geode/internal/cache/LocalRegion.java | 16 +++++++++++----- .../cache/wan/AbstractGatewaySenderEventProcessor.java | 8 ++++++++ .../geode/internal/cache/wan/GatewaySenderEventImpl.java | 3 +++ .../wan/serial/SerialGatewaySenderEventProcessor.java | 2 +- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index bed336a..158ff68 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -2851,6 +2851,8 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade logger.debug("caught concurrent modification attempt when applying {}", event); } notifyBridgeClients(event); + notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE + : EnumListenerEvent.AFTER_CREATE, event); } if (!getDataView().isDeferredStats()) { getCachePerfStats().endPut(startPut, event.isOriginRemote()); @@ -5624,6 +5626,9 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade logger.debug("caught concurrent modification attempt when applying {}", event); } notifyBridgeClients(event); + notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE + : EnumListenerEvent.AFTER_CREATE, event); + return false; } @@ -6111,8 +6116,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade } protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { - if (isPdxTypesRegion() || event.isConcurrencyConflict()) { - // isConcurrencyConflict is usually a concurrent cache modification problem + if (isPdxTypesRegion()) { return; } @@ -6136,9 +6140,10 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade if (allRemoteDSIds != null) { for (GatewaySender sender : getCache().getAllGatewaySenders()) { if (allGatewaySenderIds.contains(sender.getId())) { - // TODO: This is a BUG. Why return and not continue? - if (!this.getDataPolicy().withStorage() && sender.isParallel()) { - return; + // if isConcurrencyConflict is true, only notify serial gateway sender + if ((!this.getDataPolicy().withStorage() || event.isConcurrencyConflict()) + && sender.isParallel()) { + continue; } if (logger.isDebugEnabled()) { logger.debug("Notifying the GatewaySender : {}", sender.getId()); @@ -6497,6 +6502,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) { notifyBridgeClients(event); } + notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event); return true; // event was elided } catch (DiskAccessException dae) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 7a2cee1..f94c21d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -528,6 +528,14 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { statistics.incEventsNotQueued(); continue; } + if (((GatewaySenderEventImpl) event).isConcurrencyConflict) { + if (isDebugEnabled) { + logger.debug("primary should ignore the concurrency conflict event:" + event); + } + itr.remove(); + statistics.incEventsNotQueued(); + continue; + } boolean transmit = filter.beforeTransmit(event); if (!transmit) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index 4d201b2..d28dc5b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -175,6 +175,8 @@ public class GatewaySenderEventImpl protected boolean isInitialized; + public boolean isConcurrencyConflict = false; + /** * Is this thread in the process of serializing this event? */ @@ -316,6 +318,7 @@ public class GatewaySenderEventImpl if (initialize) { initialize(); } + this.isConcurrencyConflict = event.isConcurrencyConflict(); } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index 734b560..e9c4d28 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -712,7 +712,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven // now we can safely use the unprocessedEvents field Long v = this.unprocessedTokens.remove(gatewayEvent.getEventId()); - if (v == null) { + if (v == null && !gatewayEvent.isConcurrencyConflict) { // first time for the event if (logger.isTraceEnabled()) { logger.trace("{}: fromSecondary event {}:{}->{} added from unprocessed events map", -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
