This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new de7c6d8 GEODE-3967: There're following 9 problems fixed here: 1) When ConcurrentCacheModificationException happened, GatewaySenderEventImpl should save the status and notify gatewaysender if it hold primary queue, because other member might have put the event into the secondary queue. 2) In AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the 3rd try should allow both create and update. 3) Let event with CME not to dispatch. The old logic does not allow CME eve [...] de7c6d8 is described below commit de7c6d8b4a9b3e2c1c0ebd4ce1835aff0007f9e1 Author: zhouxh <gz...@pivotal.io> AuthorDate: Sat Jan 20 17:39:00 2018 -0800 GEODE-3967: There're following 9 problems fixed here: 1) When ConcurrentCacheModificationException happened, GatewaySenderEventImpl should save the status and notify gatewaysender if it hold primary queue, because other member might have put the event into the secondary queue. 2) In AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the 3rd try should allow both create and update. 3) Let event with CME not to dispatch. The old logic does not allow CME event to enqueue. This is wrong, because an event without CME might have been added into the secondary queue. So we should enqueue it, but not to dispatch. 4) Let UPDATE_VERSION_STAMP not to enqueue if not primary queue, because the event did not fire in pair. 5) AbstractGatewaySenderEventProcessor put loop of filter in wrong place, which caused UPDATE_VERSION_STAMP and CME events are not ignored. However, not to fix it for now. Leave it in GEODE-4659. 6) shouldSendVersionEvents for Remote sender should return true, since we no longer support 7.0.1 any more. 7) change version to 150 8) CME event should not retry in AUO.doPutOrCreate, because retry will end up with CME too. 9) CME && !originRemote: only enqueue to primary This closes #1317 --- .../internal/cache/AbstractUpdateOperation.java | 8 ++++- .../apache/geode/internal/cache/LocalRegion.java | 6 ++-- .../internal/cache/wan/GatewaySenderEventImpl.java | 34 ++++++++++++++++++---- .../serial/SerialGatewaySenderEventProcessor.java | 16 ++++++++-- .../cache30/DistributedAckRegionCCEDUnitTest.java | 4 ++- .../codeAnalysis/sanctionedDataSerializables.txt | 8 +++-- 6 files changed, 61 insertions(+), 15 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java index 585e131..1eb2761 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java @@ -142,6 +142,12 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation } doUpdate = false; } + if (ev.isConcurrencyConflict()) { + if (logger.isDebugEnabled()) { + logger.debug("basicUpdate failed with CME, not to retry:" + ev); + } + doUpdate = false; + } } } finally { if (isBucket) { @@ -175,7 +181,7 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation || (rgn.getDataPolicy().withReplication() && rgn.getConcurrencyChecksEnabled())) { overwriteDestroyed = true; ev.makeCreate(); - rgn.basicUpdate(ev, true /* ifNew */, false/* ifOld */, lastMod, + rgn.basicUpdate(ev, false /* ifNew */, false/* ifOld */, lastMod, overwriteDestroyed); rgn.getCachePerfStats().endPut(startPut, ev.isOriginRemote()); updated = true; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index e6202ab..c33a13a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -5631,6 +5631,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, logger.debug("caught concurrent modification attempt when applying {}", event); } notifyBridgeClients(event); + notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE + : EnumListenerEvent.AFTER_CREATE, event); return false; } @@ -6114,8 +6116,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { - if (isPdxTypesRegion() || event.isConcurrencyConflict()) { - // isConcurrencyConflict is usually a concurrent cache modification problem + if (isPdxTypesRegion()) { return; } @@ -6505,6 +6506,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, // Notify clients only if its NOT a gateway event. if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) { notifyBridgeClients(event); + notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event); } return true; // event was elided diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index 2748c7d..d314664 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -33,6 +33,7 @@ import org.apache.geode.internal.DataSerializableFixedID; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.Version; import org.apache.geode.internal.VersionedDataInputStream; +import org.apache.geode.internal.VersionedDataSerializable; import org.apache.geode.internal.cache.CachedDeserializable; import org.apache.geode.internal.cache.CachedDeserializableFactory; import org.apache.geode.internal.cache.Conflatable; @@ -61,8 +62,8 @@ import org.apache.geode.internal.size.Sizeable; * @since GemFire 7.0 * */ -public class GatewaySenderEventImpl - implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable { +public class GatewaySenderEventImpl implements AsyncEvent, DataSerializableFixedID, Conflatable, + Sizeable, Releasable, VersionedDataSerializable { private static final long serialVersionUID = -5690172020872255422L; protected static final Object TOKEN_NULL = new Object(); @@ -171,6 +172,8 @@ public class GatewaySenderEventImpl protected boolean isInitialized; + private transient boolean isConcurrencyConflict = false; + /** * Is this thread in the process of serializing this event? */ @@ -312,6 +315,7 @@ public class GatewaySenderEventImpl if (initialize) { initialize(); } + this.isConcurrencyConflict = event.isConcurrencyConflict(); } /** @@ -673,7 +677,13 @@ public class GatewaySenderEventImpl return GATEWAY_SENDER_EVENT_IMPL; } + @Override public void toData(DataOutput out) throws IOException { + toDataPre_GEODE_1_5_0_0(out); + DataSerializer.writeBoolean(this.isConcurrencyConflict, out); + } + + public void toDataPre_GEODE_1_5_0_0(DataOutput out) throws IOException { // Make sure we are initialized before we serialize. initialize(); out.writeShort(VERSION); @@ -697,7 +707,13 @@ public class GatewaySenderEventImpl DataSerializer.writeObject(this.key, out); } + @Override public void fromData(DataInput in) throws IOException, ClassNotFoundException { + fromDataPre_GEODE_1_5_0_0(in); + this.isConcurrencyConflict = DataSerializer.readBoolean(in); + } + + public void fromDataPre_GEODE_1_5_0_0(DataInput in) throws IOException, ClassNotFoundException { short version = in.readShort(); if (version != VERSION) { // warning?` @@ -744,7 +760,8 @@ public class GatewaySenderEventImpl .append(";creationTime=").append(this.creationTime).append(";shadowKey= ") .append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp) .append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched) - .append(";bucketId=").append(this.bucketId).append("]"); + .append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=") + .append(this.isConcurrencyConflict).append("]"); return buffer.toString(); } @@ -1128,6 +1145,14 @@ public class GatewaySenderEventImpl return bucketId; } + public boolean isConcurrencyConflict() { + return isConcurrencyConflict; + } + + public boolean setConcurrencyConflict(boolean isConcurrencyConflict) { + return this.isConcurrencyConflict = isConcurrencyConflict; + } + /** * @param tailKey the tailKey to set */ @@ -1144,8 +1169,7 @@ public class GatewaySenderEventImpl @Override public Version[] getSerializationVersions() { - // TODO Auto-generated method stub - return null; + return new Version[] {Version.GEODE_150}; } public int getSerializedValueSize() { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index f9eb9c0..3fa4d6a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -34,6 +34,7 @@ import org.apache.geode.SystemFailure; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.CacheListener; import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.Operation; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.wan.GatewayQueueEvent; @@ -41,6 +42,7 @@ import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.internal.Assert; import org.apache.geode.internal.cache.DistributedRegion; +import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EnumListenerEvent; import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; @@ -377,6 +379,9 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven if (m != null) { for (EventWrapper ew : m.values()) { GatewaySenderEventImpl gatewayEvent = ew.event; + if (logger.isDebugEnabled()) { + logger.debug("releaseUnprocessedEvents:" + gatewayEvent); + } gatewayEvent.release(); } this.unprocessedEvents = null; @@ -421,9 +426,14 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven } else { // If it is not, create an uninitialized GatewayEventImpl and // put it into the map of unprocessed events. - senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false); // OFFHEAP - // ok - handleSecondaryEvent(senderEvent); + // 2 Special cases: + // 1) UPDATE_VERSION_STAMP: only enqueue to primary + // 2) CME && !originRemote: only enqueue to primary + if (!(event.getOperation().equals(Operation.UPDATE_VERSION_STAMP) + || ((EntryEventImpl) event).isConcurrencyConflict() && !event.isOriginRemote())) { + senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false); // OFFHEAP + handleSecondaryEvent(senderEvent); + } } } } diff --git a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java index 90e880c..ac96819 100644 --- a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java @@ -476,7 +476,9 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id, true, holder); vm0.invoke(new SerializableRunnable("check conflation count") { public void run() { - assertEquals("expected one conflated event", 1, + // after changed the 3rd try of AUO.doPutOrCreate to be ifOld=false ifNew=false + // ARM.updateEntry will be called one more time, so there will be 2 conflacted events + assertEquals("expected two conflated event", 2, CCRegion.getCachePerfStats().getConflatedEventsCount()); } }); diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index 3623071..b50e95c 100644 --- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -1988,9 +1988,11 @@ org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2 fromData,63,2a2bb700182a2bb80019b60017b500032abb00075905b7000ab500062bb9001a01003d033e1d1ca200172ab400062bb9001a0100b6001b57840301a7ffeab1 toData,87,2a2bb700112ab40003b800122bb800132ab40006c6003b2b2ab40006b60014b9001502002ab40006b600164d2cb9000c010099001a2cb9000d0100c0000e4e2b2db60017b900150200a7ffe3a7000a2b03b900150200b1 -org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,2 -fromData,183,2bb9007201003d1c10119f00032a04b5002b2a2bb900730100b500282a2bb900730100b500291c1011a200232bc1007499001c2bb80075b20076a60012bb0077592bc00074b20078b700794c2a2bb8007ac0007bb5002a2a2bb8007cb500102a2bb9007d0100b5002e2a2bb6007e2a2bb8007fb500302a2bb8007ac00020b500212a2bb900800100b500132a2bb900810100b500172a2bb900730100b500092a2bb900810100b80004b500052a2bb900810100b5001bb1 -toData,133,2ab600272b1011b9006702002b2ab40028b9006802002b2ab40029b9006802002ab4002a2bb800692ab400102bb8006a2b2ab4002eb9006b02002a2bb6006c2ab6002f2bb8006d2ab400212bb800692b2ab40013b9006e02002b2ab40017b9006f03002b2ab40009b9006802002b2ab40005b60070b9006f03002b2ab60071b9006f0300b1 +org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,4 +fromData,17,2a2bb600772a2bb80078b60079b50006b1 +fromDataPre_GEODE_1_5_0_0,183,2bb9007a01003d1c10119f00032a04b5002d2a2bb9007b0100b5002a2a2bb9007b0100b5002b1c1011a200232bc1007c99001c2bb8007db2007ea60012bb007f592bc0007cb20080b700814c2a2bb80082c00083b5002c2a2bb80084b500112a2bb900850100b500302a2bb600862a2bb80087b500322a2bb80082c00021b500222a2bb900880100b500142a2bb900890100b500182a2bb9007b0100b5000a2a2bb900890100b80004b500052a2bb900890100b5001cb1 +toData,17,2a2bb600692ab40006b8006a2bb8006bb1 +toDataPre_GEODE_1_5_0_0,133,2ab600282b1011b9006c02002b2ab4002ab9006d02002b2ab4002bb9006d02002ab4002c2bb8006e2ab400112bb8006f2b2ab40030b9007002002a2bb600712ab600312bb800722ab400222bb8006e2b2ab40014b9007302002b2ab40018b9007403002b2ab4000ab9006d02002b2ab40005b60075b9007403002b2ab60076b900740300b1 org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation$GatewaySenderQueueEntrySynchronizationEntry,2 fromData,20,2a2bb80006b500022a2bb80006c00007b50001b1 -- To stop receiving notification emails like this one, please contact zho...@apache.org.