[ 
https://issues.apache.org/jira/browse/GEODE-4659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364482#comment-16364482
 ] 

ASF subversion and git services commented on GEODE-4659:
--------------------------------------------------------

Commit 5717053d08cdaa7ece96cb6c23905e444627dee3 in geode's branch 
refs/heads/feature/GEODE-3967 from zhouxh
[ https://gitbox.apache.org/repos/asf?p=geode.git;h=5717053 ]

GEODE-3967: There're following 9 problems fixed here:
1) When ConcurrentCacheModificationException happened, GatewaySenderEventImpl
 should save the status and notify gatewaysender if it hold primary queue,
because other member might have put the event into the secondary queue.
2) In AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the
3rd try should allow both create and update.
3) Let event with CME not to dispatch. The old logic does not allow CME event
to  enqueue. This is wrong, because an event without CME might have been
added into the secondary queue. So we should enqueue it, but not to dispatch.
4) Let UPDATE_VERSION_STAMP not to enqueue if not primary queue, because
the event did not fire in pair.
5) AbstractGatewaySenderEventProcessor put loop of filter in wrong place,
which caused UPDATE_VERSION_STAMP and CME events are not ignored.
However, not to fix it for now. Leave it in GEODE-4659.
6) shouldSendVersionEvents for Remote sender should return true, since
we no longer support 7.0.1 any more.
7) change version to 150
8) CME event should not retry in AUO.doPutOrCreate, because retry will end up 
with CME too.
9) CME && !originRemote: only enqueue to primary


> AbstractGatewaySenderEventProcessor put loop of filter in wrong place
> ---------------------------------------------------------------------
>
>                 Key: GEODE-4659
>                 URL: https://issues.apache.org/jira/browse/GEODE-4659
>             Project: Geode
>          Issue Type: Bug
>          Components: wan
>            Reporter: xiaojian zhou
>            Assignee: xiaojian zhou
>            Priority: Major
>
> {noformat}
> When fixing GEODE-3967, I found the loop of filter is in wrong place. 
>  
> If there's no filter defined, the processing  to ignore UPDATE_VERSION_STAMP 
> and events with CME should have nothing to do with filters. But if there's no 
> filter defined, the code will not ignore the UPDATE_VERSION_STAMP and events 
> with CME.
>  
> However, if fixed this problem. the GEODE-3967 have more race conditions to 
> be fixed. (I have fixed several of them). It looks like this bug hided other 
> race conditions from blowing out. 
>  
> GIving the time constrain, I will not fix the filter issue in GEODE_3967 and 
> log this bug for future reference. 
>  
> Here are the diff to fix or this bug:
> diff --git 
> a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
>  
> b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
> index 8739a8f72..a3a89fbd0 100644
> --- 
> a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
> +++ 
> b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
> @@ -81,40 +81,8 @@ public class RemoteParallelGatewaySenderEventProcessor 
> extends ParallelGatewaySe
>     * @param disp
>     * @return true if remote site Gemfire Version is >= 7.0.1
>     */
> -  private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp)
> -      throws GatewaySenderException {
> -    try {
> -      GatewaySenderEventRemoteDispatcher remoteDispatcher =
> -          (GatewaySenderEventRemoteDispatcher) disp;
> -      // This will create a new connection if no batch has been sent till
> -      // now.
> -      Connection conn = remoteDispatcher.getConnection(false);
> -      if (conn != null) {
> -        short remoteSiteVersion = conn.getWanSiteVersion();
> -        if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) {
> -          return true;
> -        }
> -      }
> -    } catch (GatewaySenderException e) {
> -      Throwable cause = e.getCause();
> -      if (cause instanceof IOException || e instanceof 
> GatewaySenderConfigurationException
> -          || cause instanceof ConnectionDestroyedException) {
> -        try {
> -          int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL;
> -          if (logger.isDebugEnabled()) {
> -            logger.debug("Sleeping for {} milliseconds", sleepInterval);
> -          }
> -          Thread.sleep(sleepInterval);
> -        } catch (InterruptedException ie) {
> -          // log the exception
> -          if (logger.isDebugEnabled()) {
> -            logger.debug(ie.getMessage(), ie);
> -          }
> -        }
> -      }
> -      throw e;
> -    }
> -    return false;
> +  protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher 
> disp) {
> +    return true;
>    }
> }
> diff --git 
> a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
>  
> b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
> index 69005e02b..da5d1baee 100644
> --- 
> a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
> +++ 
> b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
> @@ -19,6 +19,7 @@ import org.apache.logging.log4j.Logger;
> import org.apache.geode.cache.wan.GatewaySender;
> import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
> import 
> org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
> +import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
> import org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
> import org.apache.geode.internal.logging.LogService;
> @@ -44,4 +45,14 @@ public class RemoteSerialGatewaySenderEventProcessor 
> extends SerialGatewaySender
>      }
>    }
> +  /**
> +   * Returns if corresponding receiver WAN site of this GatewaySender has 
> GemfireVersion > 7.0.1
> +   *
> +   * @param disp
> +   * @return true if remote site Gemfire Version is >= 7.0.1
> +   */
> +  protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher 
> disp) {
> +    return true;
> +  }
> +
> }
> diff --git 
> a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
>  
> b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
> index 7e67e9bfb..439394382 100644
> --- 
> a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
> +++ 
> b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
> @@ -509,27 +509,38 @@ public abstract class 
> AbstractGatewaySenderEventProcessor extends Thread {
>            }
>            // Filter the events
> -          for (GatewayEventFilter filter : sender.getGatewayEventFilters()) {
> -            Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
> -            while (itr.hasNext()) {
> -              GatewayQueueEvent event = itr.next();
> -
> -              // This seems right place to prevent transmission of 
> UPDATE_VERSION events if
> -              // receiver's
> -              // version is < 7.0.1, especially to prevent another loop over 
> events.
> -              if (!sendUpdateVersionEvents
> -                  && event.getOperation() == Operation.UPDATE_VERSION_STAMP) 
> {
> -                if (isTraceEnabled) {
> -                  logger.trace(
> -                      "Update Event Version event: {} removed from Gateway 
> Sender queue: {}", event,
> -                      sender);
> -                }
> +          Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
> +          while (itr.hasNext()) {
> +            GatewayQueueEvent event = itr.next();
> +
> +            // This seems right place to prevent transmission of 
> UPDATE_VERSION events if
> +            // receiver's
> +            // version is < 7.0.1, especially to prevent another loop over 
> events.
> +            if (!sendUpdateVersionEvents
> +                && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
> +              if (isDebugEnabled) {
> +                logger.debug("Update Event Version event: {} removed from 
> Gateway Sender queue: {}",
> +                    event, sender);
> +              }
> -                itr.remove();
> -                statistics.incEventsNotQueued();
> -                continue;
> +              itr.remove();
> +              statistics.incEventsNotQueued();
> +              continue;
> +            }
> +
> +            if (((GatewaySenderEventImpl) event).isConcurrencyConflict()) {
> +              if (isDebugEnabled) {
> +                logger.debug(
> +                    "Event with concurrent modification conflict: {} will be 
> removed from Gateway Sender queue: {}",
> +                    event, sender);
>                }
> +              itr.remove();
> +              statistics.incEventsNotQueued();
> +              continue;
> +            }
> +
> +            for (GatewayEventFilter filter : 
> sender.getGatewayEventFilters()) {
>                boolean transmit = filter.beforeTransmit(event);
>                if (!transmit) {
>                  if (isDebugEnabled) {
> @@ -538,6 +549,7 @@ public abstract class AbstractGatewaySenderEventProcessor 
> extends Thread {
>                  }
>                  itr.remove();
>                  statistics.incEventsFiltered();
> +                break;
>                }
>              }
>            }
> @@ -550,9 +562,9 @@ public abstract class AbstractGatewaySenderEventProcessor 
> extends Thread {
>            // AsyncEventQueue since possibleDuplicate flag is not used in WAN.
>            if (this.getSender().isParallel()
>                && (this.getDispatcher() instanceof 
> GatewaySenderEventCallbackDispatcher)) {
> -            Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
> -            while (itr.hasNext()) {
> -              GatewaySenderEventImpl event = (GatewaySenderEventImpl) 
> itr.next();
> +            Iterator<GatewaySenderEventImpl> eventItr = 
> filteredList.iterator();
> +            while (eventItr.hasNext()) {
> +              GatewaySenderEventImpl event = (GatewaySenderEventImpl) 
> eventItr.next();
>                PartitionedRegion qpr = null;
>                if (this.getQueue() instanceof 
> ConcurrentParallelGatewaySenderQueue) {
>                  qpr = ((ConcurrentParallelGatewaySenderQueue) 
> this.getQueue())
> @@ -726,7 +738,7 @@ public abstract class AbstractGatewaySenderEventProcessor 
> extends Thread {
>      } // for
>    }
> -  private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher 
> dispatcher) {
> +  protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher 
> dispatcher) {
>      // onyly in case of remote dispatcher we send versioned events
>      return false;
>    }{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to