Repository: geode Updated Branches: refs/heads/feature/GEODE-2937 [created] 9d5581b70
GEODE-2937: Restore removeFromQueueOnException Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/9d5581b7 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/9d5581b7 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/9d5581b7 Branch: refs/heads/feature/GEODE-2937 Commit: 9d5581b70799c32ccb45596e8734012e11e1e3f9 Parents: a22b940 Author: Jason Huynh <huyn...@gmail.com> Authored: Wed May 17 18:19:15 2017 -0700 Committer: Jason Huynh <huyn...@gmail.com> Committed: Thu May 18 10:05:42 2017 -0700 ---------------------------------------------------------------------- .../cache/wan/AbstractGatewaySender.java | 14 +++++++++ .../AbstractGatewaySenderEventProcessor.java | 6 ++-- .../GatewaySenderEventCallbackDispatcher.java | 4 +-- .../cache/wan/GatewaySenderEventDispatcher.java | 2 +- .../cache/wan/AsyncEventQueueTestBase.java | 26 +++++++++++++++ .../client/internal/GatewaySenderBatchOp.java | 16 ++++++---- .../cache/client/internal/SenderProxy.java | 6 ++-- .../wan/GatewaySenderEventRemoteDispatcher.java | 33 +++++++++++++++++--- 8 files changed, 90 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/9d5581b7/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 0ba40b4..7ed9b51 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -640,6 +640,20 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return this.myDSId; } + /** + * @param removeFromQueueOnException the removeFromQueueOnException to set + */ + public void setRemoveFromQueueOnException(boolean removeFromQueueOnException) { + this.removeFromQueueOnException = removeFromQueueOnException; + } + + /** + * @return the removeFromQueueOnException + */ + public boolean isRemoveFromQueueOnException() { + return removeFromQueueOnException; + } + public CancelCriterion getStopper() { return this.stopper; } http://git-wip-us.apache.org/repos/asf/geode/blob/9d5581b7/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 702438f..0c93755 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -606,7 +606,8 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { conflatedEventsToBeDispatched); } - boolean success = this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched, false); + boolean success = this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched, + sender.isRemoveFromQueueOnException(), false); if (success) { if (isDebugEnabled) { logger.debug( @@ -650,7 +651,8 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } else { handleUnSuccessfulBatchDispatch(events); if (!resetLastPeekedEvents) { - while (!this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched, true)) { + while (!this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched, + sender.isRemoveFromQueueOnException(), true)) { if (isDebugEnabled) { logger.debug( "During normal processing, unsuccessfully dispatched {} events (batch #{})", http://git-wip-us.apache.org/repos/asf/geode/blob/9d5581b7/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java index eb3c735..efdd0ce 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java @@ -65,10 +65,10 @@ public class GatewaySenderEventCallbackDispatcher implements GatewaySenderEventD * Dispatches a batch of messages to all registered <code>AsyncEventListener</code>s. * * @param events The <code>List</code> of events to send - * + * @param removeFromQueueOnException Unused. * @return whether the batch of messages was successfully processed */ - public boolean dispatchBatch(List events, boolean isRetry) { + public boolean dispatchBatch(List events, boolean removeFromQueueOnException, boolean isRetry) { GatewaySenderStats statistics = this.eventProcessor.sender.getStatistics(); boolean success = false; try { http://git-wip-us.apache.org/repos/asf/geode/blob/9d5581b7/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java index 807e386..5bb5333 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java @@ -22,7 +22,7 @@ import java.util.List; */ public interface GatewaySenderEventDispatcher { - public boolean dispatchBatch(List events, boolean isRetry); + public boolean dispatchBatch(List events, boolean removeFromQueueOnException, boolean isRetry); public boolean isRemoteDispatcher(); http://git-wip-us.apache.org/repos/asf/geode/blob/9d5581b7/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java index 1595e99..5d4fd98 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java @@ -809,6 +809,32 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase { + statistics.getUnprocessedTokensAddedByPrimary())); } + public static void setRemoveFromQueueOnException(String senderId, boolean removeFromQueue) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + assertNotNull(sender); + ((AbstractGatewaySender) sender).setRemoveFromQueueOnException(removeFromQueue); + } + + public static void unsetRemoveFromQueueOnException(String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + assertNotNull(sender); + ((AbstractGatewaySender) sender).setRemoveFromQueueOnException(false); + } + public static void waitForSenderToBecomePrimary(String senderId) { Set<GatewaySender> senders = ((GemFireCacheImpl) cache).getAllGatewaySenders(); final GatewaySender sender = getGatewaySenderById(senders, senderId); http://git-wip-us.apache.org/repos/asf/geode/blob/9d5581b7/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java index 7fc762fe6..b8616a9 100755 --- a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java @@ -48,18 +48,22 @@ public class GatewaySenderBatchOp { * @param pool the pool to use to communicate with the server. * @param events list of gateway events * @param batchId the ID of this batch + * @param removeFromQueueOnException true if the events should be processed even after some + * exception */ public static void executeOn(Connection con, ExecutablePool pool, List events, int batchId, - boolean isRetry) { + boolean removeFromQueueOnException, boolean isRetry) { AbstractOp op = null; // System.out.println("Version: "+con.getWanSiteVersion()); // Is this check even needed anymore? It looks like we just create the same exact op impl with // the same parameters... if (Version.GFE_651.compareTo(con.getWanSiteVersion()) >= 0) { - op = new GatewaySenderGFEBatchOpImpl(events, batchId, con.getDistributedSystemId(), isRetry); + op = new GatewaySenderGFEBatchOpImpl(events, batchId, removeFromQueueOnException, + con.getDistributedSystemId(), isRetry); } else { // Default should create a batch of server version (ACCEPTOR.VERSION) - op = new GatewaySenderGFEBatchOpImpl(events, batchId, con.getDistributedSystemId(), isRetry); + op = new GatewaySenderGFEBatchOpImpl(events, batchId, removeFromQueueOnException, + con.getDistributedSystemId(), isRetry); } pool.executeOn(con, op, true/* timeoutFatal */); } @@ -79,9 +83,9 @@ public class GatewaySenderBatchOp { /** * @throws org.apache.geode.SerializationException if serialization fails */ - public GatewaySenderGFEBatchOpImpl(List events, int batchId, int dsId, boolean isRetry) { + public GatewaySenderGFEBatchOpImpl(List events, int batchId, boolean removeFromQueueOnException, + int dsId, boolean isRetry) { super(MessageType.GATEWAY_RECEIVER_COMMAND, calcPartCount(events)); - boolean removeFromQueueOnException = true; if (isRetry) { getMessage().setIsRetry(); } @@ -258,7 +262,7 @@ public class GatewaySenderBatchOp { List<BatchException70> l = (List<BatchException70>) part0.getObject(); if (logger.isDebugEnabled()) { - logger.info( + logger.debug( "We got an exception from the GatewayReceiver. MessageType : {} obj :{}", msg.getMessageType(), obj); } http://git-wip-us.apache.org/repos/asf/geode/blob/9d5581b7/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java index 1ef9425..c6d283c 100644 --- a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java @@ -29,8 +29,10 @@ public class SenderProxy extends ServerProxy { super(pool); } - public void dispatchBatch_NewWAN(Connection con, List events, int batchId, boolean isRetry) { - GatewaySenderBatchOp.executeOn(con, this.pool, events, batchId, isRetry); + public void dispatchBatch_NewWAN(Connection con, List events, int batchId, + boolean removeFromQueueOnException, boolean isRetry) { + GatewaySenderBatchOp.executeOn(con, this.pool, events, batchId, removeFromQueueOnException, + isRetry); } public Object receiveAckFromReceiver(Connection con) { http://git-wip-us.apache.org/repos/asf/geode/blob/9d5581b7/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java index 3eec101..3a41972 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java @@ -140,7 +140,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis } @Override - public boolean dispatchBatch(List events, boolean isRetry) { + public boolean dispatchBatch(List events, boolean removeFromQueueOnException, boolean isRetry) { GatewaySenderStats statistics = this.sender.getStatistics(); boolean success = false; try { @@ -212,7 +212,8 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis this.connectionLifeCycleLock.readLock().lock(); try { if (connection != null) { - sp.dispatchBatch_NewWAN(connection, events, currentBatchId, isRetry); + sp.dispatchBatch_NewWAN(connection, events, currentBatchId, + sender.isRemoveFromQueueOnException(), isRetry); if (logger.isDebugEnabled()) { logger.debug( "{} : Dispatched batch (id={}) of {} events, queue size: {} on connection {}", @@ -621,8 +622,32 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis // log batch exceptions and remove all the events if remove from // exception is true // do not remove if it is false - logBatchExceptions(ack.getBatchException()); - processor.handleSuccessBatchAck(batchId); + if (sender.isRemoveFromQueueOnException()) { + // log the batchExceptions + logBatchExceptions(ack.getBatchException()); + processor.handleSuccessBatchAck(batchId); + } else { + // we assume that batch exception will not occur for PDX related + // events + List<GatewaySenderEventImpl> pdxEvents = + processor.getBatchIdToPDXEventsMap().get(ack.getBatchException().getBatchId()); + if (pdxEvents != null) { + for (GatewaySenderEventImpl senderEvent : pdxEvents) { + senderEvent.isAcked = true; + } + } + // log the batchExceptions + logBatchExceptions(ack.getBatchException()); + // remove the events that have been processed. + BatchException70 be = ack.getBatchException(); + List<BatchException70> exceptions = be.getExceptions(); + + for (int i = 0; i < exceptions.get(0).getIndex(); i++) { + processor.eventQueueRemove(1); + } + // reset the sender + processor.handleException(); + } } // unsuccessful batch else { // The batch was successful.