Author: stack Date: Sat Mar 8 20:41:57 2014 New Revision: 1575606 URL: http://svn.apache.org/r1575606 Log: HBASE-10651 Fix incorrect handling of IE that restores current thread's interrupt status within while/for loops in Replication (Honghua Feng)
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1575606&r1=1575605&r2=1575606&view=diff ============================================================================== --- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original) +++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Sat Mar 8 20:41:57 2014 @@ -710,6 +710,9 @@ public class ReplicationSource extends T } catch (InterruptedException e) { LOG.debug("Interrupted while sleeping for throttling control"); Thread.currentThread().interrupt(); + // current thread might be interrupted to terminate + // directly go back to while() for confirm this + continue; } // reset throttler's cycle start tick when sleep for throttling occurs this.throttler.resetStartTick(); @@ -753,6 +756,11 @@ public class ReplicationSource extends T + "Replication cannot proceed without losing data.", sleepMultiplier)) { sleepMultiplier++; } + // current thread might be interrupted to terminate + // directly go back to while() for confirm this + if (isInterrupted()) { + continue; + } } } else { if (ioe instanceof SocketTimeoutException) { @@ -763,6 +771,11 @@ public class ReplicationSource extends T "call to the remote cluster timed out, which is usually " + "caused by a machine failure or a massive slowdown", this.socketTimeoutMultiplier); + // current thread might be interrupted to terminate + // directly go back to while() for confirm this + if (isInterrupted()) { + continue; + } } else if (ioe instanceof ConnectException) { LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); replicationSinkMgr.chooseSinks(); @@ -849,7 +862,8 @@ public class ReplicationSource extends T + " because an error occurred: " + reason, cause); } this.running = false; - Threads.shutdown(this, this.sleepForRetries); + this.interrupt(); + Threads.shutdown(this, this.sleepForRetries * this.maxRetriesMultiplier); } public String getPeerClusterZnode() { @@ -865,7 +879,7 @@ public class ReplicationSource extends T } private boolean isActive() { - return !this.stopper.isStopped() && this.running; + return !this.stopper.isStopped() && this.running && !isInterrupted(); } /**