This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-3967
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 974497c1da530e51569ed7965f2368e051894b98
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Sat Jan 20 17:39:00 2018 -0800

    GEODE-3967: There're following 6 problems fixed here:
    1) When ConcurrentCacheModificationException happened, 
GatewaySenderEventImpl
     should save the status and notify gatewaysender if it hold primary queue,
    because other member might have put the event into the secondary queue.
    2) In AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the
    3rd try should allow both create and update.
    3) Let event with CME not to dispatch. The old logic does not allow CME 
event
    to  enqueue. This is wrong, because an event without CME might have been
    added into the secondary queue. So we should enqueue it, but not to 
dispatch.
    4) Let UPDATE_VERSION_STAMP not to enqueue if not primary queue, because
    the event did not fire in pair.
    5) AbstractGatewaySenderEventProcessor put loop of filter in wrong place,
    which caused UPDATE_VERSION_STAMP and CME events are not ignored.
    However, not to fix it for now. Leave it in GEODE-4659.
    6) shouldSendVersionEvents for Remote sender should return true, since
    we no longer support 7.0.1 any more.
    7) change version to 150
    8) CME event should not retry in AUO.doPutOrCreate, because retry will end 
up with CME too.
    9) CME && !originRemote: only enqueue to primary
---
 .../internal/cache/AbstractUpdateOperation.java    |  8 ++++-
 .../apache/geode/internal/cache/LocalRegion.java   |  6 ++--
 .../internal/cache/wan/GatewaySenderEventImpl.java | 34 ++++++++++++++++++----
 .../serial/SerialGatewaySenderEventProcessor.java  | 16 ++++++++--
 .../cache30/DistributedAckRegionCCEDUnitTest.java  |  4 ++-
 .../codeAnalysis/sanctionedDataSerializables.txt   |  8 +++--
 6 files changed, 61 insertions(+), 15 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
index 585e131..1eb2761 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
@@ -142,6 +142,12 @@ public abstract class AbstractUpdateOperation extends 
DistributedCacheOperation
               }
               doUpdate = false;
             }
+            if (ev.isConcurrencyConflict()) {
+              if (logger.isDebugEnabled()) {
+                logger.debug("basicUpdate failed with CME, not to retry:" + 
ev);
+              }
+              doUpdate = false;
+            }
           }
         } finally {
           if (isBucket) {
@@ -175,7 +181,7 @@ public abstract class AbstractUpdateOperation extends 
DistributedCacheOperation
                   || (rgn.getDataPolicy().withReplication() && 
rgn.getConcurrencyChecksEnabled())) {
                 overwriteDestroyed = true;
                 ev.makeCreate();
-                rgn.basicUpdate(ev, true /* ifNew */, false/* ifOld */, 
lastMod,
+                rgn.basicUpdate(ev, false /* ifNew */, false/* ifOld */, 
lastMod,
                     overwriteDestroyed);
                 rgn.getCachePerfStats().endPut(startPut, ev.isOriginRemote());
                 updated = true;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index e6202ab..c33a13a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -5631,6 +5631,8 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
         logger.debug("caught concurrent modification attempt when applying 
{}", event);
       }
       notifyBridgeClients(event);
+      notifyGatewaySender(event.getOperation().isUpdate() ? 
EnumListenerEvent.AFTER_UPDATE
+          : EnumListenerEvent.AFTER_CREATE, event);
       return false;
     }
 
@@ -6114,8 +6116,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
   }
 
   protected void notifyGatewaySender(EnumListenerEvent operation, 
EntryEventImpl event) {
-    if (isPdxTypesRegion() || event.isConcurrencyConflict()) {
-      // isConcurrencyConflict is usually a concurrent cache modification 
problem
+    if (isPdxTypesRegion()) {
       return;
     }
 
@@ -6505,6 +6506,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
       // Notify clients only if its NOT a gateway event.
       if (event.getVersionTag() != null && 
!event.getVersionTag().isGatewayTag()) {
         notifyBridgeClients(event);
+        notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
       }
       return true; // event was elided
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 2748c7d..d314664 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -33,6 +33,7 @@ import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.VersionedDataInputStream;
+import org.apache.geode.internal.VersionedDataSerializable;
 import org.apache.geode.internal.cache.CachedDeserializable;
 import org.apache.geode.internal.cache.CachedDeserializableFactory;
 import org.apache.geode.internal.cache.Conflatable;
@@ -61,8 +62,8 @@ import org.apache.geode.internal.size.Sizeable;
  * @since GemFire 7.0
  *
  */
-public class GatewaySenderEventImpl
-    implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, 
Releasable {
+public class GatewaySenderEventImpl implements AsyncEvent, 
DataSerializableFixedID, Conflatable,
+    Sizeable, Releasable, VersionedDataSerializable {
   private static final long serialVersionUID = -5690172020872255422L;
 
   protected static final Object TOKEN_NULL = new Object();
@@ -171,6 +172,8 @@ public class GatewaySenderEventImpl
 
   protected boolean isInitialized;
 
+  private transient boolean isConcurrencyConflict = false;
+
   /**
    * Is this thread in the process of serializing this event?
    */
@@ -312,6 +315,7 @@ public class GatewaySenderEventImpl
     if (initialize) {
       initialize();
     }
+    this.isConcurrencyConflict = event.isConcurrencyConflict();
   }
 
   /**
@@ -673,7 +677,13 @@ public class GatewaySenderEventImpl
     return GATEWAY_SENDER_EVENT_IMPL;
   }
 
+  @Override
   public void toData(DataOutput out) throws IOException {
+    toDataPre_GEODE_1_5_0_0(out);
+    DataSerializer.writeBoolean(this.isConcurrencyConflict, out);
+  }
+
+  public void toDataPre_GEODE_1_5_0_0(DataOutput out) throws IOException {
     // Make sure we are initialized before we serialize.
     initialize();
     out.writeShort(VERSION);
@@ -697,7 +707,13 @@ public class GatewaySenderEventImpl
     DataSerializer.writeObject(this.key, out);
   }
 
+  @Override
   public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+    fromDataPre_GEODE_1_5_0_0(in);
+    this.isConcurrencyConflict = DataSerializer.readBoolean(in);
+  }
+
+  public void fromDataPre_GEODE_1_5_0_0(DataInput in) throws IOException, 
ClassNotFoundException {
     short version = in.readShort();
     if (version != VERSION) {
       // warning?`
@@ -744,7 +760,8 @@ public class GatewaySenderEventImpl
         
.append(";creationTime=").append(this.creationTime).append(";shadowKey= ")
         
.append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp)
         
.append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched)
-        .append(";bucketId=").append(this.bucketId).append("]");
+        
.append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=")
+        .append(this.isConcurrencyConflict).append("]");
     return buffer.toString();
   }
 
@@ -1128,6 +1145,14 @@ public class GatewaySenderEventImpl
     return bucketId;
   }
 
+  public boolean isConcurrencyConflict() {
+    return isConcurrencyConflict;
+  }
+
+  public boolean setConcurrencyConflict(boolean isConcurrencyConflict) {
+    return this.isConcurrencyConflict = isConcurrencyConflict;
+  }
+
   /**
    * @param tailKey the tailKey to set
    */
@@ -1144,8 +1169,7 @@ public class GatewaySenderEventImpl
 
   @Override
   public Version[] getSerializationVersions() {
-    // TODO Auto-generated method stub
-    return null;
+    return new Version[] {Version.GEODE_150};
   }
 
   public int getSerializedValueSize() {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index f9eb9c0..3fa4d6a 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -34,6 +34,7 @@ import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheListener;
 import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.wan.GatewayQueueEvent;
@@ -41,6 +42,7 @@ import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
@@ -377,6 +379,9 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
       if (m != null) {
         for (EventWrapper ew : m.values()) {
           GatewaySenderEventImpl gatewayEvent = ew.event;
+          if (logger.isDebugEnabled()) {
+            logger.debug("releaseUnprocessedEvents:" + gatewayEvent);
+          }
           gatewayEvent.release();
         }
         this.unprocessedEvents = null;
@@ -421,9 +426,14 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
         } else {
           // If it is not, create an uninitialized GatewayEventImpl and
           // put it into the map of unprocessed events.
-          senderEvent = new GatewaySenderEventImpl(operation, event, 
substituteValue, false); // OFFHEAP
-                                                                               
               // ok
-          handleSecondaryEvent(senderEvent);
+          // 2 Special cases:
+          // 1) UPDATE_VERSION_STAMP: only enqueue to primary
+          // 2) CME && !originRemote: only enqueue to primary
+          if (!(event.getOperation().equals(Operation.UPDATE_VERSION_STAMP)
+              || ((EntryEventImpl) event).isConcurrencyConflict() && 
!event.isOriginRemote())) {
+            senderEvent = new GatewaySenderEventImpl(operation, event, 
substituteValue, false); // OFFHEAP
+            handleSecondaryEvent(senderEvent);
+          }
         }
       }
     }
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
index 90e880c..ac96819 100644
--- 
a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -476,7 +476,9 @@ public class DistributedAckRegionCCEDUnitTest extends 
DistributedAckRegionDUnitT
       CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id, 
true, holder);
       vm0.invoke(new SerializableRunnable("check conflation count") {
         public void run() {
-          assertEquals("expected one conflated event", 1,
+          // after changed the 3rd try of AUO.doPutOrCreate to be ifOld=false 
ifNew=false
+          // ARM.updateEntry will be called one more time, so there will be 2 
conflacted events
+          assertEquals("expected two conflated event", 2,
               CCRegion.getCachePerfStats().getConflatedEventsCount());
         }
       });
diff --git 
a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
 
b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 3623071..b50e95c 100644
--- 
a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ 
b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1988,9 +1988,11 @@ 
org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2
 
fromData,63,2a2bb700182a2bb80019b60017b500032abb00075905b7000ab500062bb9001a01003d033e1d1ca200172ab400062bb9001a0100b6001b57840301a7ffeab1
 
toData,87,2a2bb700112ab40003b800122bb800132ab40006c6003b2b2ab40006b60014b9001502002ab40006b600164d2cb9000c010099001a2cb9000d0100c0000e4e2b2db60017b900150200a7ffe3a7000a2b03b900150200b1
 
-org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,2
-fromData,183,2bb9007201003d1c10119f00032a04b5002b2a2bb900730100b500282a2bb900730100b500291c1011a200232bc1007499001c2bb80075b20076a60012bb0077592bc00074b20078b700794c2a2bb8007ac0007bb5002a2a2bb8007cb500102a2bb9007d0100b5002e2a2bb6007e2a2bb8007fb500302a2bb8007ac00020b500212a2bb900800100b500132a2bb900810100b500172a2bb900730100b500092a2bb900810100b80004b500052a2bb900810100b5001bb1
-toData,133,2ab600272b1011b9006702002b2ab40028b9006802002b2ab40029b9006802002ab4002a2bb800692ab400102bb8006a2b2ab4002eb9006b02002a2bb6006c2ab6002f2bb8006d2ab400212bb800692b2ab40013b9006e02002b2ab40017b9006f03002b2ab40009b9006802002b2ab40005b60070b9006f03002b2ab60071b9006f0300b1
+org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,4
+fromData,17,2a2bb600772a2bb80078b60079b50006b1
+fromDataPre_GEODE_1_5_0_0,183,2bb9007a01003d1c10119f00032a04b5002d2a2bb9007b0100b5002a2a2bb9007b0100b5002b1c1011a200232bc1007c99001c2bb8007db2007ea60012bb007f592bc0007cb20080b700814c2a2bb80082c00083b5002c2a2bb80084b500112a2bb900850100b500302a2bb600862a2bb80087b500322a2bb80082c00021b500222a2bb900880100b500142a2bb900890100b500182a2bb9007b0100b5000a2a2bb900890100b80004b500052a2bb900890100b5001cb1
+toData,17,2a2bb600692ab40006b8006a2bb8006bb1
+toDataPre_GEODE_1_5_0_0,133,2ab600282b1011b9006c02002b2ab4002ab9006d02002b2ab4002bb9006d02002ab4002c2bb8006e2ab400112bb8006f2b2ab40030b9007002002a2bb600712ab600312bb800722ab400222bb8006e2b2ab40014b9007302002b2ab40018b9007403002b2ab4000ab9006d02002b2ab40005b60075b9007403002b2ab60076b900740300b1
 
 
org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation$GatewaySenderQueueEntrySynchronizationEntry,2
 fromData,20,2a2bb80006b500022a2bb80006c00007b50001b1

-- 
To stop receiving notification emails like this one, please contact
zho...@apache.org.

Reply via email to