This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEM-883
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 335cce92548f18f6a376947f8df2b8af22d8da51
Author: zhouxh <[email protected]>
AuthorDate: Thu Nov 9 23:49:29 2017 -0800

    GEODE-3967: if put hits concurrent modification exception should still 
notify serial gateway sender
---
 .../org/apache/geode/internal/cache/LocalRegion.java     | 16 +++++++++++-----
 .../cache/wan/AbstractGatewaySenderEventProcessor.java   |  8 ++++++++
 .../geode/internal/cache/wan/GatewaySenderEventImpl.java |  3 +++
 .../wan/serial/SerialGatewaySenderEventProcessor.java    |  2 +-
 4 files changed, 23 insertions(+), 6 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index bed336a..158ff68 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -2851,6 +2851,8 @@ public class LocalRegion extends AbstractRegion 
implements InternalRegion, Loade
               logger.debug("caught concurrent modification attempt when 
applying {}", event);
             }
             notifyBridgeClients(event);
+            notifyGatewaySender(event.getOperation().isUpdate() ? 
EnumListenerEvent.AFTER_UPDATE
+                : EnumListenerEvent.AFTER_CREATE, event);
           }
           if (!getDataView().isDeferredStats()) {
             getCachePerfStats().endPut(startPut, event.isOriginRemote());
@@ -5624,6 +5626,9 @@ public class LocalRegion extends AbstractRegion 
implements InternalRegion, Loade
         logger.debug("caught concurrent modification attempt when applying 
{}", event);
       }
       notifyBridgeClients(event);
+      notifyGatewaySender(event.getOperation().isUpdate() ? 
EnumListenerEvent.AFTER_UPDATE
+          : EnumListenerEvent.AFTER_CREATE, event);
+
       return false;
     }
 
@@ -6111,8 +6116,7 @@ public class LocalRegion extends AbstractRegion 
implements InternalRegion, Loade
   }
 
   protected void notifyGatewaySender(EnumListenerEvent operation, 
EntryEventImpl event) {
-    if (isPdxTypesRegion() || event.isConcurrencyConflict()) {
-      // isConcurrencyConflict is usually a concurrent cache modification 
problem
+    if (isPdxTypesRegion()) {
       return;
     }
 
@@ -6136,9 +6140,10 @@ public class LocalRegion extends AbstractRegion 
implements InternalRegion, Loade
     if (allRemoteDSIds != null) {
       for (GatewaySender sender : getCache().getAllGatewaySenders()) {
         if (allGatewaySenderIds.contains(sender.getId())) {
-          // TODO: This is a BUG. Why return and not continue?
-          if (!this.getDataPolicy().withStorage() && sender.isParallel()) {
-            return;
+          // if isConcurrencyConflict is true, only notify serial gateway 
sender
+          if ((!this.getDataPolicy().withStorage() || 
event.isConcurrencyConflict())
+              && sender.isParallel()) {
+            continue;
           }
           if (logger.isDebugEnabled()) {
             logger.debug("Notifying the GatewaySender : {}", sender.getId());
@@ -6497,6 +6502,7 @@ public class LocalRegion extends AbstractRegion 
implements InternalRegion, Loade
       if (event.getVersionTag() != null && 
!event.getVersionTag().isGatewayTag()) {
         notifyBridgeClients(event);
       }
+      notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
       return true; // event was elided
 
     } catch (DiskAccessException dae) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 7a2cee1..f94c21d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -528,6 +528,14 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
                 statistics.incEventsNotQueued();
                 continue;
               }
+              if (((GatewaySenderEventImpl) event).isConcurrencyConflict) {
+                if (isDebugEnabled) {
+                  logger.debug("primary should ignore the concurrency conflict 
event:" + event);
+                }
+                itr.remove();
+                statistics.incEventsNotQueued();
+                continue;
+              }
 
               boolean transmit = filter.beforeTransmit(event);
               if (!transmit) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 4d201b2..d28dc5b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -175,6 +175,8 @@ public class GatewaySenderEventImpl
 
   protected boolean isInitialized;
 
+  public boolean isConcurrencyConflict = false;
+
   /**
    * Is this thread in the process of serializing this event?
    */
@@ -316,6 +318,7 @@ public class GatewaySenderEventImpl
     if (initialize) {
       initialize();
     }
+    this.isConcurrencyConflict = event.isConcurrencyConflict();
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 734b560..e9c4d28 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -712,7 +712,7 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
       // now we can safely use the unprocessedEvents field
       Long v = this.unprocessedTokens.remove(gatewayEvent.getEventId());
 
-      if (v == null) {
+      if (v == null && !gatewayEvent.isConcurrencyConflict) {
         // first time for the event
         if (logger.isTraceEnabled()) {
           logger.trace("{}: fromSecondary event {}:{}->{} added from 
unprocessed events map",

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to