This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-3967 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 5d1c2657612bdb931724893f4e29dbd6056dfbae Author: zhouxh <gz...@pivotal.io> AuthorDate: Sat Jan 20 17:39:00 2018 -0800 GEODE-3967: There're following 6 problems fixed here: 1) When ConcurrentCacheModificationException happened, GatewaySenderEventImpl should save the status and notify gatewaysender if it hold primary queue, because other member might have put the event into the secondary queue. 2) In AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the 3rd try should allow both create and update. 3) Let event with CME not to dispatch. The old logic does not allow CME event to enqueue. This is wrong, because an event without CME might have been added into the secondary queue. So we should enqueue it, but not to dispatch. 4) Let UPDATE_VERSION_STAMP not to enqueue if not primary queue, because the event did not fire in pair. 5) AbstractGatewaySenderEventProcessor put loop of filter in wrong place, which caused UPDATE_VERSION_STAMP and CME events are not ignored. 6) shouldSendVersionEvents for Remote sender should return true, since we no longer support 7.0.1 any more. 7) change version to 150 8) CME event should not retry in AUO.doPutOrCreate, because retry will end up with CME too. 9) CME && !originRemote: only enqueue to primary --- .../internal/cache/AbstractUpdateOperation.java | 8 +++- .../apache/geode/internal/cache/LocalRegion.java | 6 ++- .../wan/AbstractGatewaySenderEventProcessor.java | 56 +++++++++++++--------- .../internal/cache/wan/GatewaySenderEventImpl.java | 34 +++++++++++-- .../serial/SerialGatewaySenderEventProcessor.java | 16 +++++-- .../cache30/DistributedAckRegionCCEDUnitTest.java | 4 +- .../codeAnalysis/sanctionedDataSerializables.txt | 8 ++-- .../RemoteParallelGatewaySenderEventProcessor.java | 36 +------------- .../RemoteSerialGatewaySenderEventProcessor.java | 11 +++++ 9 files changed, 108 insertions(+), 71 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java index 585e131..1eb2761 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java @@ -142,6 +142,12 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation } doUpdate = false; } + if (ev.isConcurrencyConflict()) { + if (logger.isDebugEnabled()) { + logger.debug("basicUpdate failed with CME, not to retry:" + ev); + } + doUpdate = false; + } } } finally { if (isBucket) { @@ -175,7 +181,7 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation || (rgn.getDataPolicy().withReplication() && rgn.getConcurrencyChecksEnabled())) { overwriteDestroyed = true; ev.makeCreate(); - rgn.basicUpdate(ev, true /* ifNew */, false/* ifOld */, lastMod, + rgn.basicUpdate(ev, false /* ifNew */, false/* ifOld */, lastMod, overwriteDestroyed); rgn.getCachePerfStats().endPut(startPut, ev.isOriginRemote()); updated = true; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 914ce5d..1434c2c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -5627,6 +5627,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, logger.debug("caught concurrent modification attempt when applying {}", event); } notifyBridgeClients(event); + notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE + : EnumListenerEvent.AFTER_CREATE, event); return false; } @@ -6110,8 +6112,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { - if (isPdxTypesRegion() || event.isConcurrencyConflict()) { - // isConcurrencyConflict is usually a concurrent cache modification problem + if (isPdxTypesRegion()) { return; } @@ -6501,6 +6502,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, // Notify clients only if its NOT a gateway event. if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) { notifyBridgeClients(event); + notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event); } return true; // event was elided diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 7e67e9b..4393943 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -509,27 +509,38 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } // Filter the events - for (GatewayEventFilter filter : sender.getGatewayEventFilters()) { - Iterator<GatewaySenderEventImpl> itr = filteredList.iterator(); - while (itr.hasNext()) { - GatewayQueueEvent event = itr.next(); - - // This seems right place to prevent transmission of UPDATE_VERSION events if - // receiver's - // version is < 7.0.1, especially to prevent another loop over events. - if (!sendUpdateVersionEvents - && event.getOperation() == Operation.UPDATE_VERSION_STAMP) { - if (isTraceEnabled) { - logger.trace( - "Update Event Version event: {} removed from Gateway Sender queue: {}", event, - sender); - } + Iterator<GatewaySenderEventImpl> itr = filteredList.iterator(); + while (itr.hasNext()) { + GatewayQueueEvent event = itr.next(); + + // This seems right place to prevent transmission of UPDATE_VERSION events if + // receiver's + // version is < 7.0.1, especially to prevent another loop over events. + if (!sendUpdateVersionEvents + && event.getOperation() == Operation.UPDATE_VERSION_STAMP) { + if (isDebugEnabled) { + logger.debug("Update Event Version event: {} removed from Gateway Sender queue: {}", + event, sender); + } - itr.remove(); - statistics.incEventsNotQueued(); - continue; + itr.remove(); + statistics.incEventsNotQueued(); + continue; + } + + if (((GatewaySenderEventImpl) event).isConcurrencyConflict()) { + if (isDebugEnabled) { + logger.debug( + "Event with concurrent modification conflict: {} will be removed from Gateway Sender queue: {}", + event, sender); } + itr.remove(); + statistics.incEventsNotQueued(); + continue; + } + + for (GatewayEventFilter filter : sender.getGatewayEventFilters()) { boolean transmit = filter.beforeTransmit(event); if (!transmit) { if (isDebugEnabled) { @@ -538,6 +549,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } itr.remove(); statistics.incEventsFiltered(); + break; } } } @@ -550,9 +562,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { // AsyncEventQueue since possibleDuplicate flag is not used in WAN. if (this.getSender().isParallel() && (this.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) { - Iterator<GatewaySenderEventImpl> itr = filteredList.iterator(); - while (itr.hasNext()) { - GatewaySenderEventImpl event = (GatewaySenderEventImpl) itr.next(); + Iterator<GatewaySenderEventImpl> eventItr = filteredList.iterator(); + while (eventItr.hasNext()) { + GatewaySenderEventImpl event = (GatewaySenderEventImpl) eventItr.next(); PartitionedRegion qpr = null; if (this.getQueue() instanceof ConcurrentParallelGatewaySenderQueue) { qpr = ((ConcurrentParallelGatewaySenderQueue) this.getQueue()) @@ -726,7 +738,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } // for } - private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher dispatcher) { + protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher dispatcher) { // onyly in case of remote dispatcher we send versioned events return false; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index 2748c7d..d314664 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -33,6 +33,7 @@ import org.apache.geode.internal.DataSerializableFixedID; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.Version; import org.apache.geode.internal.VersionedDataInputStream; +import org.apache.geode.internal.VersionedDataSerializable; import org.apache.geode.internal.cache.CachedDeserializable; import org.apache.geode.internal.cache.CachedDeserializableFactory; import org.apache.geode.internal.cache.Conflatable; @@ -61,8 +62,8 @@ import org.apache.geode.internal.size.Sizeable; * @since GemFire 7.0 * */ -public class GatewaySenderEventImpl - implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable { +public class GatewaySenderEventImpl implements AsyncEvent, DataSerializableFixedID, Conflatable, + Sizeable, Releasable, VersionedDataSerializable { private static final long serialVersionUID = -5690172020872255422L; protected static final Object TOKEN_NULL = new Object(); @@ -171,6 +172,8 @@ public class GatewaySenderEventImpl protected boolean isInitialized; + private transient boolean isConcurrencyConflict = false; + /** * Is this thread in the process of serializing this event? */ @@ -312,6 +315,7 @@ public class GatewaySenderEventImpl if (initialize) { initialize(); } + this.isConcurrencyConflict = event.isConcurrencyConflict(); } /** @@ -673,7 +677,13 @@ public class GatewaySenderEventImpl return GATEWAY_SENDER_EVENT_IMPL; } + @Override public void toData(DataOutput out) throws IOException { + toDataPre_GEODE_1_5_0_0(out); + DataSerializer.writeBoolean(this.isConcurrencyConflict, out); + } + + public void toDataPre_GEODE_1_5_0_0(DataOutput out) throws IOException { // Make sure we are initialized before we serialize. initialize(); out.writeShort(VERSION); @@ -697,7 +707,13 @@ public class GatewaySenderEventImpl DataSerializer.writeObject(this.key, out); } + @Override public void fromData(DataInput in) throws IOException, ClassNotFoundException { + fromDataPre_GEODE_1_5_0_0(in); + this.isConcurrencyConflict = DataSerializer.readBoolean(in); + } + + public void fromDataPre_GEODE_1_5_0_0(DataInput in) throws IOException, ClassNotFoundException { short version = in.readShort(); if (version != VERSION) { // warning?` @@ -744,7 +760,8 @@ public class GatewaySenderEventImpl .append(";creationTime=").append(this.creationTime).append(";shadowKey= ") .append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp) .append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched) - .append(";bucketId=").append(this.bucketId).append("]"); + .append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=") + .append(this.isConcurrencyConflict).append("]"); return buffer.toString(); } @@ -1128,6 +1145,14 @@ public class GatewaySenderEventImpl return bucketId; } + public boolean isConcurrencyConflict() { + return isConcurrencyConflict; + } + + public boolean setConcurrencyConflict(boolean isConcurrencyConflict) { + return this.isConcurrencyConflict = isConcurrencyConflict; + } + /** * @param tailKey the tailKey to set */ @@ -1144,8 +1169,7 @@ public class GatewaySenderEventImpl @Override public Version[] getSerializationVersions() { - // TODO Auto-generated method stub - return null; + return new Version[] {Version.GEODE_150}; } public int getSerializedValueSize() { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index f9eb9c0..3fa4d6a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -34,6 +34,7 @@ import org.apache.geode.SystemFailure; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.CacheListener; import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.Operation; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.wan.GatewayQueueEvent; @@ -41,6 +42,7 @@ import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.internal.Assert; import org.apache.geode.internal.cache.DistributedRegion; +import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EnumListenerEvent; import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; @@ -377,6 +379,9 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven if (m != null) { for (EventWrapper ew : m.values()) { GatewaySenderEventImpl gatewayEvent = ew.event; + if (logger.isDebugEnabled()) { + logger.debug("releaseUnprocessedEvents:" + gatewayEvent); + } gatewayEvent.release(); } this.unprocessedEvents = null; @@ -421,9 +426,14 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven } else { // If it is not, create an uninitialized GatewayEventImpl and // put it into the map of unprocessed events. - senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false); // OFFHEAP - // ok - handleSecondaryEvent(senderEvent); + // 2 Special cases: + // 1) UPDATE_VERSION_STAMP: only enqueue to primary + // 2) CME && !originRemote: only enqueue to primary + if (!(event.getOperation().equals(Operation.UPDATE_VERSION_STAMP) + || ((EntryEventImpl) event).isConcurrencyConflict() && !event.isOriginRemote())) { + senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false); // OFFHEAP + handleSecondaryEvent(senderEvent); + } } } } diff --git a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java index 90e880c..ac96819 100644 --- a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java @@ -476,7 +476,9 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id, true, holder); vm0.invoke(new SerializableRunnable("check conflation count") { public void run() { - assertEquals("expected one conflated event", 1, + // after changed the 3rd try of AUO.doPutOrCreate to be ifOld=false ifNew=false + // ARM.updateEntry will be called one more time, so there will be 2 conflacted events + assertEquals("expected two conflated event", 2, CCRegion.getCachePerfStats().getConflatedEventsCount()); } }); diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index 3623071..b50e95c 100644 --- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -1988,9 +1988,11 @@ org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2 fromData,63,2a2bb700182a2bb80019b60017b500032abb00075905b7000ab500062bb9001a01003d033e1d1ca200172ab400062bb9001a0100b6001b57840301a7ffeab1 toData,87,2a2bb700112ab40003b800122bb800132ab40006c6003b2b2ab40006b60014b9001502002ab40006b600164d2cb9000c010099001a2cb9000d0100c0000e4e2b2db60017b900150200a7ffe3a7000a2b03b900150200b1 -org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,2 -fromData,183,2bb9007201003d1c10119f00032a04b5002b2a2bb900730100b500282a2bb900730100b500291c1011a200232bc1007499001c2bb80075b20076a60012bb0077592bc00074b20078b700794c2a2bb8007ac0007bb5002a2a2bb8007cb500102a2bb9007d0100b5002e2a2bb6007e2a2bb8007fb500302a2bb8007ac00020b500212a2bb900800100b500132a2bb900810100b500172a2bb900730100b500092a2bb900810100b80004b500052a2bb900810100b5001bb1 -toData,133,2ab600272b1011b9006702002b2ab40028b9006802002b2ab40029b9006802002ab4002a2bb800692ab400102bb8006a2b2ab4002eb9006b02002a2bb6006c2ab6002f2bb8006d2ab400212bb800692b2ab40013b9006e02002b2ab40017b9006f03002b2ab40009b9006802002b2ab40005b60070b9006f03002b2ab60071b9006f0300b1 +org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,4 +fromData,17,2a2bb600772a2bb80078b60079b50006b1 +fromDataPre_GEODE_1_5_0_0,183,2bb9007a01003d1c10119f00032a04b5002d2a2bb9007b0100b5002a2a2bb9007b0100b5002b1c1011a200232bc1007c99001c2bb8007db2007ea60012bb007f592bc0007cb20080b700814c2a2bb80082c00083b5002c2a2bb80084b500112a2bb900850100b500302a2bb600862a2bb80087b500322a2bb80082c00021b500222a2bb900880100b500142a2bb900890100b500182a2bb9007b0100b5000a2a2bb900890100b80004b500052a2bb900890100b5001cb1 +toData,17,2a2bb600692ab40006b8006a2bb8006bb1 +toDataPre_GEODE_1_5_0_0,133,2ab600282b1011b9006c02002b2ab4002ab9006d02002b2ab4002bb9006d02002ab4002c2bb8006e2ab400112bb8006f2b2ab40030b9007002002a2bb600712ab600312bb800722ab400222bb8006e2b2ab40014b9007302002b2ab40018b9007403002b2ab4000ab9006d02002b2ab40005b60075b9007403002b2ab60076b900740300b1 org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation$GatewaySenderQueueEntrySynchronizationEntry,2 fromData,20,2a2bb80006b500022a2bb80006c00007b50001b1 diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java index 8739a8f..a3a89fb 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java @@ -81,40 +81,8 @@ public class RemoteParallelGatewaySenderEventProcessor extends ParallelGatewaySe * @param disp * @return true if remote site Gemfire Version is >= 7.0.1 */ - private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) - throws GatewaySenderException { - try { - GatewaySenderEventRemoteDispatcher remoteDispatcher = - (GatewaySenderEventRemoteDispatcher) disp; - // This will create a new connection if no batch has been sent till - // now. - Connection conn = remoteDispatcher.getConnection(false); - if (conn != null) { - short remoteSiteVersion = conn.getWanSiteVersion(); - if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) { - return true; - } - } - } catch (GatewaySenderException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException || e instanceof GatewaySenderConfigurationException - || cause instanceof ConnectionDestroyedException) { - try { - int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL; - if (logger.isDebugEnabled()) { - logger.debug("Sleeping for {} milliseconds", sleepInterval); - } - Thread.sleep(sleepInterval); - } catch (InterruptedException ie) { - // log the exception - if (logger.isDebugEnabled()) { - logger.debug(ie.getMessage(), ie); - } - } - } - throw e; - } - return false; + protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) { + return true; } } diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java index 69005e0..da5d1ba 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java @@ -19,6 +19,7 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher; +import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher; import org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher; import org.apache.geode.internal.logging.LogService; @@ -44,4 +45,14 @@ public class RemoteSerialGatewaySenderEventProcessor extends SerialGatewaySender } } + /** + * Returns if corresponding receiver WAN site of this GatewaySender has GemfireVersion > 7.0.1 + * + * @param disp + * @return true if remote site Gemfire Version is >= 7.0.1 + */ + protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) { + return true; + } + } -- To stop receiving notification emails like this one, please contact zho...@apache.org.