This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEM-883
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 5fa6ca55288c4efb83b07fdd77b80941bc74de47
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Thu Nov 9 23:49:29 2017 -0800

    GEODE-3967: if put hits concurrent modification exception should still 
notify serial gateway sender
    GEODE-3967: notifyTimestampsToGateways should inherit isConcurrencyConflict
    GEODE-3967: add to secondary event isConcurrencyConflict
---
 .../geode/internal/cache/AbstractRegionMap.java    |  3 +++
 .../geode/internal/cache/DestroyOperation.java     |  3 ---
 .../internal/cache/DistributedCacheOperation.java  | 15 +++++++++++++-
 .../apache/geode/internal/cache/LocalRegion.java   | 19 +++++++++++++-----
 .../geode/internal/cache/LocalRegionDataView.java  |  9 +++++++++
 .../wan/AbstractGatewaySenderEventProcessor.java   | 15 +++++++++-----
 .../internal/cache/wan/GatewaySenderEventImpl.java |  3 +++
 .../serial/SerialGatewaySenderEventProcessor.java  | 23 ++++++++++++++++++++++
 8 files changed, 76 insertions(+), 14 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 404488b..75d8484 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -1183,6 +1183,7 @@ public abstract class AbstractRegionMap implements 
RegionMap {
                               true/* conflict with clear */, duringRI, true);
                           doPart3 = true;
                         } catch (ConcurrentCacheModificationException ccme) {
+                          event.isConcurrencyConflict(true);
                           VersionTag tag = event.getVersionTag();
                           if (tag != null && tag.isTimeStampUpdated()) {
                             // Notify gateways of new time-stamp.
@@ -2092,6 +2093,7 @@ public abstract class AbstractRegionMap implements 
RegionMap {
                   }
                 } // !opCompleted
               } catch (ConcurrentCacheModificationException ccme) {
+                event.isConcurrencyConflict(true);
                 VersionTag tag = event.getVersionTag();
                 if (tag != null && tag.isTimeStampUpdated()) {
                   // Notify gateways of new time-stamp.
@@ -2849,6 +2851,7 @@ public abstract class AbstractRegionMap implements 
RegionMap {
                     clearOccured = true;
                     owner.recordEvent(event);
                   } catch (ConcurrentCacheModificationException ccme) {
+                    event.isConcurrencyConflict(true);
                     VersionTag tag = event.getVersionTag();
                     if (tag != null && tag.isTimeStampUpdated()) {
                       // Notify gateways of new time-stamp.
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
index 0d2dc7f..7870f58 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
@@ -95,9 +95,6 @@ public class DestroyOperation extends 
DistributedCacheOperation {
 
       } catch (EntryNotFoundException e) {
         dispatchElidedEvent(rgn, ev);
-        if (!ev.isConcurrencyConflict()) {
-          rgn.notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, ev);
-        }
         throw e;
       } catch (CacheWriterException e) {
         throw new Error(
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 908bf83..ddde23a 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -1289,11 +1289,24 @@ public abstract class DistributedCacheOperation {
      */
     protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) {
       if (logger.isDebugEnabled()) {
-        logger.debug("dispatching elided event: {}", ev);
+        logger.debug("GGG:dispatching elided event: {}", ev, new Exception());
       }
       ev.isConcurrencyConflict(true);
       rgn.generateLocalFilterRouting(ev);
       rgn.notifyBridgeClients(ev);
+      rgn.notifyGatewaySender(getOperation(ev), ev);
+    }
+
+    private EnumListenerEvent getOperation(EntryEventImpl ev) {
+      if (ev.getOperation().isInvalidate()) {
+        return EnumListenerEvent.AFTER_INVALIDATE;
+      } else if (ev.getOperation().isDestroy()) {
+        return EnumListenerEvent.AFTER_DESTROY;
+      } else if (ev.getOperation().isUpdate()) {
+        return EnumListenerEvent.AFTER_UPDATE;
+      } else {
+        return EnumListenerEvent.AFTER_CREATE;
+      }
     }
 
     protected abstract InternalCacheEvent createEvent(DistributedRegion rgn)
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index aa0f8c6..38f74ec 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -2853,6 +2853,8 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
               logger.debug("caught concurrent modification attempt when 
applying {}", event);
             }
             notifyBridgeClients(event);
+            notifyGatewaySender(event.getOperation().isUpdate() ? 
EnumListenerEvent.AFTER_UPDATE
+                : EnumListenerEvent.AFTER_CREATE, event);
           }
           if (!getDataView().isDeferredStats()) {
             getCachePerfStats().endPut(startPut, event.isOriginRemote());
@@ -5630,6 +5632,9 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
         logger.debug("caught concurrent modification attempt when applying 
{}", event);
       }
       notifyBridgeClients(event);
+      notifyGatewaySender(event.getOperation().isUpdate() ? 
EnumListenerEvent.AFTER_UPDATE
+          : EnumListenerEvent.AFTER_CREATE, event);
+
       return false;
     }
 
@@ -5856,6 +5861,9 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
     updateTimeStampEvent.setGenerateCallbacks(false);
     updateTimeStampEvent.distributedMember = event.getDistributedMember();
     updateTimeStampEvent.setNewEventId(getSystem());
+    if (event.isConcurrencyConflict()) {
+      updateTimeStampEvent.isConcurrencyConflict(true);
+    }
 
     if (event.getRegion() instanceof BucketRegion) {
       BucketRegion bucketRegion = (BucketRegion) event.getRegion();
@@ -6117,8 +6125,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
   }
 
   protected void notifyGatewaySender(EnumListenerEvent operation, 
EntryEventImpl event) {
-    if (isPdxTypesRegion() || event.isConcurrencyConflict()) {
-      // isConcurrencyConflict is usually a concurrent cache modification 
problem
+    if (isPdxTypesRegion()) {
       return;
     }
 
@@ -6142,9 +6149,10 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
     if (allRemoteDSIds != null) {
       for (GatewaySender sender : getCache().getAllGatewaySenders()) {
         if (allGatewaySenderIds.contains(sender.getId())) {
-          // TODO: This is a BUG. Why return and not continue?
-          if (!this.getDataPolicy().withStorage() && sender.isParallel()) {
-            return;
+          // if isConcurrencyConflict is true, only notify serial gateway 
sender
+          if ((!this.getDataPolicy().withStorage() || 
event.isConcurrencyConflict())
+              && sender.isParallel()) {
+            continue;
           }
           if (logger.isDebugEnabled()) {
             logger.debug("Notifying the GatewaySender : {}", sender.getId());
@@ -6503,6 +6511,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
       if (event.getVersionTag() != null && 
!event.getVersionTag().isGatewayTag()) {
         notifyBridgeClients(event);
       }
+      notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
       return true; // event was elided
 
     } catch (DiskAccessException dae) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
index eed6176..b68859e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
@@ -25,6 +25,7 @@ import 
org.apache.geode.internal.cache.entries.AbstractRegionEntry;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import 
org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
+import org.apache.geode.internal.logging.LogService;
 
 /**
  *
@@ -71,6 +72,10 @@ public class LocalRegionDataView implements InternalDataView 
{
     } catch (ConcurrentCacheModificationException e) {
       // a newer event has already been applied to the cache. this can happen
       // in a client cache if another thread is operating on the same key
+      event.isConcurrencyConflict(true);
+      LocalRegion lr = event.getLocalRegion();
+      LogService.getLogger().info("GGG:invalidateExistingEntry:" + event, new 
Exception());
+      // lr.notifyGatewaySender(EnumListenerEvent.AFTER_INVALIDATE, event);
     }
   }
 
@@ -81,6 +86,10 @@ public class LocalRegionDataView implements InternalDataView 
{
     } catch (ConcurrentCacheModificationException e) {
       // a later in time event has already been applied to the cache. this can 
happen
       // in a cache if another thread is operating on the same key
+      event.isConcurrencyConflict(true);
+      LocalRegion lr = event.getLocalRegion();
+      LogService.getLogger().info("GGG:updateEntryVersion:" + event, new 
Exception());
+      // lr.notifyGatewaySender(EnumListenerEvent.TIMESTAMP_UPDATE, event);
     }
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 7a2cee1..a557875 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -518,16 +518,21 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
               // version is < 7.0.1, especially to prevent another loop over 
events.
               if (!sendUpdateVersionEvents
                   && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
-                if (isTraceEnabled) {
-                  logger.trace(
-                      "Update Event Version event: {} removed from Gateway 
Sender queue: {}", event,
-                      sender);
-                }
+                logger.debug("Update Event Version event: {} removed from 
Gateway Sender queue: {}",
+                    event, sender);
 
                 itr.remove();
                 statistics.incEventsNotQueued();
                 continue;
               }
+              if (((GatewaySenderEventImpl) event).isConcurrencyConflict) {
+                if (isDebugEnabled) {
+                  logger.debug("primary should ignore the concurrency conflict 
event:" + event);
+                }
+                itr.remove();
+                statistics.incEventsNotQueued();
+                continue;
+              }
 
               boolean transmit = filter.beforeTransmit(event);
               if (!transmit) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 5b1ba54..468dca2 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -171,6 +171,8 @@ public class GatewaySenderEventImpl
 
   protected boolean isInitialized;
 
+  public boolean isConcurrencyConflict = false;
+
   /**
    * Is this thread in the process of serializing this event?
    */
@@ -312,6 +314,7 @@ public class GatewaySenderEventImpl
     if (initialize) {
       initialize();
     }
+    this.isConcurrencyConflict = event.isConcurrencyConflict();
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 734b560..995007d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -291,6 +291,10 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
             if (o != null && o instanceof GatewaySenderEventImpl) {
               GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o;
               EventWrapper unprocessedEvent = 
this.unprocessedEvents.remove(ge.getEventId());
+              if (unprocessedEvent != null && ge.isConcurrencyConflict) {
+                logger.info(
+                    "GGG:secondary after removed by handleFailover:" + 
unprocessedEvent + ":" + ge);
+              }
               if (unprocessedEvent != null) {
                 unprocessedEvent.event.release();
                 if (this.unprocessedEvents.isEmpty()) {
@@ -379,6 +383,7 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
       if (m != null) {
         for (EventWrapper ew : m.values()) {
           GatewaySenderEventImpl gatewayEvent = ew.event;
+          logger.info("GGG:releaseUnprocessedEvents:" + gatewayEvent);
           gatewayEvent.release();
         }
         this.unprocessedEvents = null;
@@ -632,6 +637,10 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
         return;
       // now we can safely use the unprocessedEvents field
       EventWrapper ew = 
this.unprocessedEvents.remove(gatewayEvent.getEventId());
+      if (ew != null && gatewayEvent.isConcurrencyConflict) {
+        logger.info("GGG:secondary after removed by destroy listener:" + ew + 
":" + gatewayEvent);
+      }
+
       if (ew != null) {
         ew.event.release();
         statistics.incUnprocessedEventsRemovedByPrimary();
@@ -651,8 +660,16 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
         return;
       // now we can safely use the unprocessedEvents field
       EventWrapper ew = 
this.unprocessedEvents.remove(gatewayEvent.getEventId());
+      if (ew != null && gatewayEvent.isConcurrencyConflict) {
+        logger.info("GGG:secondary after removed by create listener:" + ew + 
":" + gatewayEvent,
+            new Exception());
+      }
 
       if (ew == null) {
+        if (gatewayEvent.isConcurrencyConflict) {
+          logger.info("GGG:secondary before add to by create listener:" + 
gatewayEvent,
+              new Exception());
+        }
         // first time for the event
         if (logger.isTraceEnabled()) {
           logger.trace("{}: fromPrimary event {} : {}->{} added to unprocessed 
token map",
@@ -711,8 +728,14 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
       // @todo add an assertion that !getPrimary()
       // now we can safely use the unprocessedEvents field
       Long v = this.unprocessedTokens.remove(gatewayEvent.getEventId());
+      if (v != null && gatewayEvent.isConcurrencyConflict) {
+        logger.info("GGG:secondary after removed token:" + v + ":" + 
gatewayEvent);
+      }
 
       if (v == null) {
+        if (gatewayEvent.isConcurrencyConflict) {
+          logger.info("GGG:secondary before add to:" + gatewayEvent, new 
Exception());
+        }
         // first time for the event
         if (logger.isTraceEnabled()) {
           logger.trace("{}: fromSecondary event {}:{}->{} added from 
unprocessed events map",

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <commits@geode.apache.org>.

Reply via email to