Repository: incubator-geode
Updated Branches:
  refs/heads/develop f49ee2e18 -> cf3fb80ac


GEODE-1588: AckReader and Dispatching thread are shut down before sending 
gateway sender close connection messages

* There was an issue where the gateway sender thread was reading off the same 
socket as the ack reader.
Instead, we force the ack reader thread to stop first, and close the 
inputstream to prevent reading garbled data
* Another issue was the ack reader thread was being spun up after being shut 
down.  Now we prevent the dispatching thread
from doing so by checking to see if it is being shut down.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/cf3fb80a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/cf3fb80a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/cf3fb80a

Branch: refs/heads/develop
Commit: cf3fb80acbd86cbd8b1ee2ecd9f955c04e5def11
Parents: f49ee2e
Author: Jason Huynh <huyn...@gmail.com>
Authored: Tue Jul 5 08:56:14 2016 -0700
Committer: Jason Huynh <huyn...@gmail.com>
Committed: Mon Jul 11 09:59:00 2016 -0700

----------------------------------------------------------------------
 .../AbstractGatewaySenderEventProcessor.java    |  5 ++-
 ...rentParallelGatewaySenderEventProcessor.java |  8 +++--
 ...urrentSerialGatewaySenderEventProcessor.java |  5 +--
 .../wan/GatewaySenderEventRemoteDispatcher.java | 34 ++++++++++++++----
 .../gemfire/internal/cache/wan/WANTestBase.java | 36 +++++++-------------
 5 files changed, 49 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf3fb80a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index ce08e8d..e3e1a9e 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -1150,10 +1150,9 @@ public abstract class 
AbstractGatewaySenderEventProcessor extends Thread {
         }
       }
     }
-   
-    dispatcher.stop();
-    //set isStopped to true
+
     setIsStopped(true);
+    dispatcher.stop();
 
     if (this.isAlive()) {
       this.interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf3fb80a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 07a3be5..82a53d3 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -237,6 +237,9 @@ public class ConcurrentParallelGatewaySenderEventProcessor 
extends AbstractGatew
     if (!this.isAlive()) {
       return;
     }
+
+    setIsStopped(true);
+
     final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup
         .createThreadGroup("ConcurrentParallelGatewaySenderEventProcessor 
Logger Group", logger);
 
@@ -248,12 +251,12 @@ public class 
ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
         return thread;
       }
     };
-    
+
     List<SenderStopperCallable> stopperCallables = new 
ArrayList<SenderStopperCallable>();
     for (ParallelGatewaySenderEventProcessor parallelProcessor : 
this.processors) {
       stopperCallables.add(new SenderStopperCallable(parallelProcessor));
     }
-    
+
     ExecutorService stopperService = 
Executors.newFixedThreadPool(processors.length, threadFactory);
     try {
       List<Future<Boolean>> futures = 
stopperService.invokeAll(stopperCallables);
@@ -275,7 +278,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor 
extends AbstractGatew
       throw rejectedExecutionEx;
     }
     
-    setIsStopped(true);
     stopperService.shutdown();
     closeProcessor();
     if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf3fb80a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index ff810ec..a557ce1 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -268,6 +268,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor 
extends
       return;
     }
 
+    setIsStopped(true);
+
     final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup
         .createThreadGroup(
             "ConcurrentSerialGatewaySenderEventProcessor Logger Group",
@@ -312,8 +314,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor 
extends
     }
     //shutdown the stopperService. This will release all the stopper threads
     stopperService.shutdown();
-    setIsStopped(true);
-    
+
     closeProcessor();
     
     if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf3fb80a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
 
b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index b178192..746ec46 100644
--- 
a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ 
b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -301,6 +301,9 @@ public class GatewaySenderEventRemoteDispatcher implements
    * @throws GatewaySenderException
    */
   public Connection getConnection(boolean startAckReaderThread) throws 
GatewaySenderException{
+    if (this.processor.isStopped()) {
+      return null;
+    }
     // IF the connection is null 
     // OR the connection's ServerLocation doesn't match with the one stored in 
sender
     // THEN initialize the connection
@@ -343,7 +346,7 @@ public class GatewaySenderEventRemoteDispatcher implements
       if (con != null) {
         if (!con.isDestroyed()) {
           con.destroy();
-         this.sender.getProxy().returnConnection(con);
+          this.sender.getProxy().returnConnection(con);
         }
         
         // Reset the connection so the next time through a new one will be
@@ -625,9 +628,9 @@ public class GatewaySenderEventRemoteDispatcher implements
             }
           } else {
             // If we have received IOException.
-           // if (logger.isDebugEnabled()) {
+            if (logger.isDebugEnabled()) {
               logger.debug("{}: Received null ack from remote site.", 
processor.getSender());
-            //}
+            }
             processor.handleException();
             try { // This wait is before trying to getting new connection to
                   // receive ack. Without this there will be continuous call to
@@ -723,9 +726,11 @@ public class GatewaySenderEventRemoteDispatcher implements
       // not. No need to take lock as the reader thread may be blocked and we 
might not
       // get chance to destroy unless that returns.
       if (connection != null) {
-        if (!connection.isDestroyed()) {
-          connection.destroy();
-          sender.getProxy().returnConnection(connection);
+        Connection conn = connection;
+        shutDownAckReaderConnection();
+        if (!conn.isDestroyed()) {
+          conn.destroy();
+          sender.getProxy().returnConnection(conn);
         }
       }
       this.shutdown = true;
@@ -743,12 +748,27 @@ public class GatewaySenderEventRemoteDispatcher implements
         
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION));
       }
     }
+
+    private void shutDownAckReaderConnection() {
+      Connection conn = connection;
+      //attempt to unblock the ackreader thread by shutting down the 
inputStream, if it was stuck on a read
+      try {
+        if (conn != null && conn.getInputStream() != null) {
+          conn.getInputStream().close();
+        }
+      } catch (IOException e) {
+        logger.warn("Unable to shutdown AckReaderThread Connection");
+      } catch (ConnectionDestroyedException e) {
+        logger.info("AckReader shutting down and connection already 
destroyed");
+      }
+
+    }
   }
     
   public void stopAckReaderThread() {
     if (this.ackReaderThread != null) {
       this.ackReaderThread.shutdown();
-    }    
+    }
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf3fb80a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
 
b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
index 358ffaf..79648e1 100644
--- 
a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
@@ -407,8 +407,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
       fact.setOffHeap(offHeap);
       Region r = cache.createRegionFactory(fact.create()).create(regionName);
       assertNotNull(r);
-    }
-    finally {
+    } finally {
       exp.remove();
       exp1.remove();
       exp2.remove();
@@ -463,8 +462,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
       RegionFactory regionFactory = cache.createRegionFactory(fact.create());
       Region r = regionFactory.create(regionName);
       assertNotNull(r);
-    }
-    finally {
+    } finally {
       exp1.remove();
     }
   }
@@ -489,8 +487,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
       regionFactory.addAsyncEventQueueId(asyncChannelId);
       Region r = regionFactory.create(regionName);
       assertNotNull(r);
-    }
-    finally {
+    } finally {
       exp.remove();
     }
   }
@@ -563,8 +560,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
       fact.setOffHeap(offHeap);
       Region r = cache.createRegionFactory(fact.create()).create(regionName);
       assertNotNull(r);
-    }
-    finally {
+    } finally {
       exp.remove();
       exp1.remove();
     }
@@ -594,8 +590,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
       fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
       Region r = cache.createRegionFactory(fact.create()).create(regionName);
       assertNotNull(r);
-    }
-    finally {
+    } finally {
       exp.remove();
       exp1.remove();
     }
@@ -623,8 +618,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
          fact.setPartitionAttributes(pfact.create());
          Region r = 
cache.createRegionFactory(fact.create()).create(regionName);
          assertNotNull(r);
-       }
-       finally {
+       } finally {
                exp.remove();
                exp1.remove();
        }
@@ -718,8 +712,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
       fact.setOffHeap(offHeap);
       Region r = cache.createRegionFactory(fact.create()).create(regionName);
       assertNotNull(r);
-    }
-    finally {
+    } finally {
       exp.remove();
       exp1.remove();
     }
@@ -799,8 +792,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
       LogWriterUtils.getLogWriter().info(
           "Partitioned Region SHIPMENT created Successfully :"
               + shipmentRegion.toString());
-    }
-    finally {
+    } finally {
       exp.remove();
     }
   }
@@ -1578,8 +1570,7 @@ public class WANTestBase extends 
JUnit4DistributedTestCase {
       sender.pause();
       ((AbstractGatewaySender) 
sender).getEventProcessor().waitForDispatcherToPause();
 
-    }
-    finally {
+    } finally {
       exp.remove();
       exln.remove();
     }
@@ -1599,8 +1590,7 @@ public class WANTestBase extends 
JUnit4DistributedTestCase {
         }
       }
       sender.resume();
-    }
-    finally {
+    } finally {
       exp.remove();
       exln.remove();
     }
@@ -1634,8 +1624,7 @@ public class WANTestBase extends 
JUnit4DistributedTestCase {
           }
         }
       }
-    }
-    finally {
+    } finally {
       exp.remove();
       exln.remove();
     }
@@ -1833,8 +1822,7 @@ public class WANTestBase extends 
JUnit4DistributedTestCase {
         gateway.setDiskSynchronous(isDiskSync);
         gateway.create(dsName, remoteDsId);
       }
-    }
-    finally {
+    } finally {
       exp1.remove();
     }
   }

Reply via email to