Repository: incubator-geode Updated Branches: refs/heads/develop f49ee2e18 -> cf3fb80ac
GEODE-1588: AckReader and Dispatching thread are shut down before sending gateway sender close connection messages * There was an issue where the gateway sender thread was reading off the same socket as the ack reader. Instead, we force the ack reader thread to stop first, and close the inputstream to prevent reading garbled data * Another issue was the ack reader thread was being spun up after being shut down. Now we prevent the dispatching thread from doing so by checking to see if it is being shut down. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/cf3fb80a Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/cf3fb80a Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/cf3fb80a Branch: refs/heads/develop Commit: cf3fb80acbd86cbd8b1ee2ecd9f955c04e5def11 Parents: f49ee2e Author: Jason Huynh <huyn...@gmail.com> Authored: Tue Jul 5 08:56:14 2016 -0700 Committer: Jason Huynh <huyn...@gmail.com> Committed: Mon Jul 11 09:59:00 2016 -0700 ---------------------------------------------------------------------- .../AbstractGatewaySenderEventProcessor.java | 5 ++- ...rentParallelGatewaySenderEventProcessor.java | 8 +++-- ...urrentSerialGatewaySenderEventProcessor.java | 5 +-- .../wan/GatewaySenderEventRemoteDispatcher.java | 34 ++++++++++++++---- .../gemfire/internal/cache/wan/WANTestBase.java | 36 +++++++------------- 5 files changed, 49 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf3fb80a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index ce08e8d..e3e1a9e 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -1150,10 +1150,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } } } - - dispatcher.stop(); - //set isStopped to true + setIsStopped(true); + dispatcher.stop(); if (this.isAlive()) { this.interrupt(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf3fb80a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java index 07a3be5..82a53d3 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java @@ -237,6 +237,9 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew if (!this.isAlive()) { return; } + + setIsStopped(true); + final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup .createThreadGroup("ConcurrentParallelGatewaySenderEventProcessor Logger Group", logger); @@ -248,12 +251,12 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew return thread; } }; - + List<SenderStopperCallable> stopperCallables = new ArrayList<SenderStopperCallable>(); for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) { stopperCallables.add(new SenderStopperCallable(parallelProcessor)); } - + ExecutorService stopperService = Executors.newFixedThreadPool(processors.length, threadFactory); try { List<Future<Boolean>> futures = stopperService.invokeAll(stopperCallables); @@ -275,7 +278,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew throw rejectedExecutionEx; } - setIsStopped(true); stopperService.shutdown(); closeProcessor(); if (logger.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf3fb80a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java index ff810ec..a557ce1 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java @@ -268,6 +268,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor extends return; } + setIsStopped(true); + final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup .createThreadGroup( "ConcurrentSerialGatewaySenderEventProcessor Logger Group", @@ -312,8 +314,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor extends } //shutdown the stopperService. This will release all the stopper threads stopperService.shutdown(); - setIsStopped(true); - + closeProcessor(); if (logger.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf3fb80a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java index b178192..746ec46 100644 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java +++ b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java @@ -301,6 +301,9 @@ public class GatewaySenderEventRemoteDispatcher implements * @throws GatewaySenderException */ public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException{ + if (this.processor.isStopped()) { + return null; + } // IF the connection is null // OR the connection's ServerLocation doesn't match with the one stored in sender // THEN initialize the connection @@ -343,7 +346,7 @@ public class GatewaySenderEventRemoteDispatcher implements if (con != null) { if (!con.isDestroyed()) { con.destroy(); - this.sender.getProxy().returnConnection(con); + this.sender.getProxy().returnConnection(con); } // Reset the connection so the next time through a new one will be @@ -625,9 +628,9 @@ public class GatewaySenderEventRemoteDispatcher implements } } else { // If we have received IOException. - // if (logger.isDebugEnabled()) { + if (logger.isDebugEnabled()) { logger.debug("{}: Received null ack from remote site.", processor.getSender()); - //} + } processor.handleException(); try { // This wait is before trying to getting new connection to // receive ack. Without this there will be continuous call to @@ -723,9 +726,11 @@ public class GatewaySenderEventRemoteDispatcher implements // not. No need to take lock as the reader thread may be blocked and we might not // get chance to destroy unless that returns. if (connection != null) { - if (!connection.isDestroyed()) { - connection.destroy(); - sender.getProxy().returnConnection(connection); + Connection conn = connection; + shutDownAckReaderConnection(); + if (!conn.isDestroyed()) { + conn.destroy(); + sender.getProxy().returnConnection(conn); } } this.shutdown = true; @@ -743,12 +748,27 @@ public class GatewaySenderEventRemoteDispatcher implements logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION)); } } + + private void shutDownAckReaderConnection() { + Connection conn = connection; + //attempt to unblock the ackreader thread by shutting down the inputStream, if it was stuck on a read + try { + if (conn != null && conn.getInputStream() != null) { + conn.getInputStream().close(); + } + } catch (IOException e) { + logger.warn("Unable to shutdown AckReaderThread Connection"); + } catch (ConnectionDestroyedException e) { + logger.info("AckReader shutting down and connection already destroyed"); + } + + } } public void stopAckReaderThread() { if (this.ackReaderThread != null) { this.ackReaderThread.shutdown(); - } + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf3fb80a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java index 358ffaf..79648e1 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java @@ -407,8 +407,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { fact.setOffHeap(offHeap); Region r = cache.createRegionFactory(fact.create()).create(regionName); assertNotNull(r); - } - finally { + } finally { exp.remove(); exp1.remove(); exp2.remove(); @@ -463,8 +462,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { RegionFactory regionFactory = cache.createRegionFactory(fact.create()); Region r = regionFactory.create(regionName); assertNotNull(r); - } - finally { + } finally { exp1.remove(); } } @@ -489,8 +487,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { regionFactory.addAsyncEventQueueId(asyncChannelId); Region r = regionFactory.create(regionName); assertNotNull(r); - } - finally { + } finally { exp.remove(); } } @@ -563,8 +560,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { fact.setOffHeap(offHeap); Region r = cache.createRegionFactory(fact.create()).create(regionName); assertNotNull(r); - } - finally { + } finally { exp.remove(); exp1.remove(); } @@ -594,8 +590,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); Region r = cache.createRegionFactory(fact.create()).create(regionName); assertNotNull(r); - } - finally { + } finally { exp.remove(); exp1.remove(); } @@ -623,8 +618,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { fact.setPartitionAttributes(pfact.create()); Region r = cache.createRegionFactory(fact.create()).create(regionName); assertNotNull(r); - } - finally { + } finally { exp.remove(); exp1.remove(); } @@ -718,8 +712,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { fact.setOffHeap(offHeap); Region r = cache.createRegionFactory(fact.create()).create(regionName); assertNotNull(r); - } - finally { + } finally { exp.remove(); exp1.remove(); } @@ -799,8 +792,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { LogWriterUtils.getLogWriter().info( "Partitioned Region SHIPMENT created Successfully :" + shipmentRegion.toString()); - } - finally { + } finally { exp.remove(); } } @@ -1578,8 +1570,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { sender.pause(); ((AbstractGatewaySender) sender).getEventProcessor().waitForDispatcherToPause(); - } - finally { + } finally { exp.remove(); exln.remove(); } @@ -1599,8 +1590,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { } } sender.resume(); - } - finally { + } finally { exp.remove(); exln.remove(); } @@ -1634,8 +1624,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { } } } - } - finally { + } finally { exp.remove(); exln.remove(); } @@ -1833,8 +1822,7 @@ public class WANTestBase extends JUnit4DistributedTestCase { gateway.setDiskSynchronous(isDiskSync); gateway.create(dsName, remoteDsId); } - } - finally { + } finally { exp1.remove(); } }