Author: stack
Date: Sat Mar  8 20:41:57 2014
New Revision: 1575606

URL: http://svn.apache.org/r1575606
Log:
HBASE-10651 Fix incorrect handling of IE that restores current thread's 
interrupt status within while/for loops in Replication (Honghua Feng)

Modified:
    
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Modified: 
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1575606&r1=1575605&r2=1575606&view=diff
==============================================================================
--- 
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 (original)
+++ 
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 Sat Mar  8 20:41:57 2014
@@ -710,6 +710,9 @@ public class ReplicationSource extends T
             } catch (InterruptedException e) {
               LOG.debug("Interrupted while sleeping for throttling control");
               Thread.currentThread().interrupt();
+              // current thread might be interrupted to terminate
+              // directly go back to while() for confirm this
+              continue;
             }
             // reset throttler's cycle start tick when sleep for throttling 
occurs
             this.throttler.resetStartTick();
@@ -753,6 +756,11 @@ public class ReplicationSource extends T
                 + "Replication cannot proceed without losing data.", 
sleepMultiplier)) {
               sleepMultiplier++;
             }
+            // current thread might be interrupted to terminate
+            // directly go back to while() for confirm this
+            if (isInterrupted()) {
+              continue;
+            }
           }
         } else {
           if (ioe instanceof SocketTimeoutException) {
@@ -763,6 +771,11 @@ public class ReplicationSource extends T
               "call to the remote cluster timed out, which is usually " +
               "caused by a machine failure or a massive slowdown",
               this.socketTimeoutMultiplier);
+            // current thread might be interrupted to terminate
+            // directly go back to while() for confirm this
+            if (isInterrupted()) {
+              continue;
+            }
           } else if (ioe instanceof ConnectException) {
             LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
             replicationSinkMgr.chooseSinks();
@@ -849,7 +862,8 @@ public class ReplicationSource extends T
           + " because an error occurred: " + reason, cause);
     }
     this.running = false;
-    Threads.shutdown(this, this.sleepForRetries);
+    this.interrupt();
+    Threads.shutdown(this, this.sleepForRetries * this.maxRetriesMultiplier);
   }
 
   public String getPeerClusterZnode() {
@@ -865,7 +879,7 @@ public class ReplicationSource extends T
   }
 
   private boolean isActive() {
-    return !this.stopper.isStopped() && this.running;
+    return !this.stopper.isStopped() && this.running && !isInterrupted();
   }
 
   /**


Reply via email to