xiaojian zhou created GEODE-4659: ------------------------------------ Summary: AbstractGatewaySenderEventProcessor put loop of filter in wrong place Key: GEODE-4659 URL: https://issues.apache.org/jira/browse/GEODE-4659 Project: Geode Issue Type: New Feature Components: wan Reporter: xiaojian zhou
{noformat} When fixing GEODE-3967, I found the loop of filter is in wrong place. If there's no filter defined, the processing to ignore UPDATE_VERSION_STAMP and events with CME should have nothing to do with filters. But if there's no filter defined, the code will not ignore the UPDATE_VERSION_STAMP and events with CME. However, if fixed this problem. the GEODE-3967 have more race conditions to be fixed. (I have fixed several of them). It looks like this bug hided other race conditions from blowing out. GIving the time constrain, I will not fix the filter issue in GEODE_3967 and log this bug for future reference. Here are the diff to fix or this bug: diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java index 8739a8f72..a3a89fbd0 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java @@ -81,40 +81,8 @@ public class RemoteParallelGatewaySenderEventProcessor extends ParallelGatewaySe * @param disp * @return true if remote site Gemfire Version is >= 7.0.1 */ - private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) - throws GatewaySenderException { - try { - GatewaySenderEventRemoteDispatcher remoteDispatcher = - (GatewaySenderEventRemoteDispatcher) disp; - // This will create a new connection if no batch has been sent till - // now. - Connection conn = remoteDispatcher.getConnection(false); - if (conn != null) { - short remoteSiteVersion = conn.getWanSiteVersion(); - if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) { - return true; - } - } - } catch (GatewaySenderException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException || e instanceof GatewaySenderConfigurationException - || cause instanceof ConnectionDestroyedException) { - try { - int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL; - if (logger.isDebugEnabled()) { - logger.debug("Sleeping for {} milliseconds", sleepInterval); - } - Thread.sleep(sleepInterval); - } catch (InterruptedException ie) { - // log the exception - if (logger.isDebugEnabled()) { - logger.debug(ie.getMessage(), ie); - } - } - } - throw e; - } - return false; + protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) { + return true; } } diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java index 69005e02b..da5d1baee 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java @@ -19,6 +19,7 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher; +import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher; import org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher; import org.apache.geode.internal.logging.LogService; @@ -44,4 +45,14 @@ public class RemoteSerialGatewaySenderEventProcessor extends SerialGatewaySender } } + /** + * Returns if corresponding receiver WAN site of this GatewaySender has GemfireVersion > 7.0.1 + * + * @param disp + * @return true if remote site Gemfire Version is >= 7.0.1 + */ + protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) { + return true; + } + } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 7e67e9bfb..439394382 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -509,27 +509,38 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } // Filter the events - for (GatewayEventFilter filter : sender.getGatewayEventFilters()) { - Iterator<GatewaySenderEventImpl> itr = filteredList.iterator(); - while (itr.hasNext()) { - GatewayQueueEvent event = itr.next(); - - // This seems right place to prevent transmission of UPDATE_VERSION events if - // receiver's - // version is < 7.0.1, especially to prevent another loop over events. - if (!sendUpdateVersionEvents - && event.getOperation() == Operation.UPDATE_VERSION_STAMP) { - if (isTraceEnabled) { - logger.trace( - "Update Event Version event: {} removed from Gateway Sender queue: {}", event, - sender); - } + Iterator<GatewaySenderEventImpl> itr = filteredList.iterator(); + while (itr.hasNext()) { + GatewayQueueEvent event = itr.next(); + + // This seems right place to prevent transmission of UPDATE_VERSION events if + // receiver's + // version is < 7.0.1, especially to prevent another loop over events. + if (!sendUpdateVersionEvents + && event.getOperation() == Operation.UPDATE_VERSION_STAMP) { + if (isDebugEnabled) { + logger.debug("Update Event Version event: {} removed from Gateway Sender queue: {}", + event, sender); + } - itr.remove(); - statistics.incEventsNotQueued(); - continue; + itr.remove(); + statistics.incEventsNotQueued(); + continue; + } + + if (((GatewaySenderEventImpl) event).isConcurrencyConflict()) { + if (isDebugEnabled) { + logger.debug( + "Event with concurrent modification conflict: {} will be removed from Gateway Sender queue: {}", + event, sender); } + itr.remove(); + statistics.incEventsNotQueued(); + continue; + } + + for (GatewayEventFilter filter : sender.getGatewayEventFilters()) { boolean transmit = filter.beforeTransmit(event); if (!transmit) { if (isDebugEnabled) { @@ -538,6 +549,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } itr.remove(); statistics.incEventsFiltered(); + break; } } } @@ -550,9 +562,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { // AsyncEventQueue since possibleDuplicate flag is not used in WAN. if (this.getSender().isParallel() && (this.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) { - Iterator<GatewaySenderEventImpl> itr = filteredList.iterator(); - while (itr.hasNext()) { - GatewaySenderEventImpl event = (GatewaySenderEventImpl) itr.next(); + Iterator<GatewaySenderEventImpl> eventItr = filteredList.iterator(); + while (eventItr.hasNext()) { + GatewaySenderEventImpl event = (GatewaySenderEventImpl) eventItr.next(); PartitionedRegion qpr = null; if (this.getQueue() instanceof ConcurrentParallelGatewaySenderQueue) { qpr = ((ConcurrentParallelGatewaySenderQueue) this.getQueue()) @@ -726,7 +738,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } // for } - private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher dispatcher) { + protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher dispatcher) { // onyly in case of remote dispatcher we send versioned events return false; }{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)