http://git-wip-us.apache.org/repos/asf/hbase-site/blob/bb78f24d/devapidocs/org/apache/hadoop/hbase/util/package-tree.html
----------------------------------------------------------------------
diff --git a/devapidocs/org/apache/hadoop/hbase/util/package-tree.html 
b/devapidocs/org/apache/hadoop/hbase/util/package-tree.html
index 596cffe..7ee6a13 100644
--- a/devapidocs/org/apache/hadoop/hbase/util/package-tree.html
+++ b/devapidocs/org/apache/hadoop/hbase/util/package-tree.html
@@ -532,14 +532,14 @@
 <ul>
 <li type="circle">java.lang.<a 
href="https://docs.oracle.com/javase/8/docs/api/java/lang/Enum.html?is-external=true";
 title="class or interface in java.lang"><span 
class="typeNameLink">Enum</span></a>&lt;E&gt; (implements java.lang.<a 
href="https://docs.oracle.com/javase/8/docs/api/java/lang/Comparable.html?is-external=true";
 title="class or interface in java.lang">Comparable</a>&lt;T&gt;, java.io.<a 
href="https://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html?is-external=true";
 title="class or interface in java.io">Serializable</a>)
 <ul>
+<li type="circle">org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/PoolMap.PoolType.html" 
title="enum in org.apache.hadoop.hbase.util"><span 
class="typeNameLink">PoolMap.PoolType</span></a></li>
 <li type="circle">org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/Bytes.LexicographicalComparerHolder.UnsafeComparer.html"
 title="enum in org.apache.hadoop.hbase.util"><span 
class="typeNameLink">Bytes.LexicographicalComparerHolder.UnsafeComparer</span></a>
 (implements org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/Bytes.Comparer.html" 
title="interface in 
org.apache.hadoop.hbase.util">Bytes.Comparer</a>&lt;T&gt;)</li>
-<li type="circle">org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/HBaseFsck.ErrorReporter.ERROR_CODE.html"
 title="enum in org.apache.hadoop.hbase.util"><span 
class="typeNameLink">HBaseFsck.ErrorReporter.ERROR_CODE</span></a></li>
+<li type="circle">org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/Bytes.LexicographicalComparerHolder.PureJavaComparer.html"
 title="enum in org.apache.hadoop.hbase.util"><span 
class="typeNameLink">Bytes.LexicographicalComparerHolder.PureJavaComparer</span></a>
 (implements org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/Bytes.Comparer.html" 
title="interface in 
org.apache.hadoop.hbase.util">Bytes.Comparer</a>&lt;T&gt;)</li>
 <li type="circle">org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/PrettyPrinter.Unit.html" 
title="enum in org.apache.hadoop.hbase.util"><span 
class="typeNameLink">PrettyPrinter.Unit</span></a></li>
+<li type="circle">org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/HBaseFsck.ErrorReporter.ERROR_CODE.html"
 title="enum in org.apache.hadoop.hbase.util"><span 
class="typeNameLink">HBaseFsck.ErrorReporter.ERROR_CODE</span></a></li>
+<li type="circle">org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/ChecksumType.html" 
title="enum in org.apache.hadoop.hbase.util"><span 
class="typeNameLink">ChecksumType</span></a></li>
 <li type="circle">org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/Order.html" title="enum in 
org.apache.hadoop.hbase.util"><span class="typeNameLink">Order</span></a></li>
 <li type="circle">org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/IdReadWriteLock.ReferenceType.html"
 title="enum in org.apache.hadoop.hbase.util"><span 
class="typeNameLink">IdReadWriteLock.ReferenceType</span></a></li>
-<li type="circle">org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/ChecksumType.html" 
title="enum in org.apache.hadoop.hbase.util"><span 
class="typeNameLink">ChecksumType</span></a></li>
-<li type="circle">org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/PoolMap.PoolType.html" 
title="enum in org.apache.hadoop.hbase.util"><span 
class="typeNameLink">PoolMap.PoolType</span></a></li>
-<li type="circle">org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/Bytes.LexicographicalComparerHolder.PureJavaComparer.html"
 title="enum in org.apache.hadoop.hbase.util"><span 
class="typeNameLink">Bytes.LexicographicalComparerHolder.PureJavaComparer</span></a>
 (implements org.apache.hadoop.hbase.util.<a 
href="../../../../../org/apache/hadoop/hbase/util/Bytes.Comparer.html" 
title="interface in 
org.apache.hadoop.hbase.util">Bytes.Comparer</a>&lt;T&gt;)</li>
 </ul>
 </li>
 </ul>

http://git-wip-us.apache.org/repos/asf/hbase-site/blob/bb78f24d/devapidocs/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html
----------------------------------------------------------------------
diff --git a/devapidocs/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html 
b/devapidocs/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html
index c4df8e7..62a3538 100644
--- a/devapidocs/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html
+++ b/devapidocs/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html
@@ -1017,7 +1017,7 @@ implements org.apache.zookeeper.Watcher, <a 
href="../../../../../org/apache/hado
 <ul class="blockList">
 <li class="blockList">
 <h4>interruptedExceptionNoThrow</h4>
-<pre>public&nbsp;void&nbsp;<a 
href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.618">interruptedExceptionNoThrow</a>(<a
 
href="https://docs.oracle.com/javase/8/docs/api/java/lang/InterruptedException.html?is-external=true";
 title="class or interface in java.lang">InterruptedException</a>&nbsp;ie,
+<pre>public&nbsp;void&nbsp;<a 
href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.620">interruptedExceptionNoThrow</a>(<a
 
href="https://docs.oracle.com/javase/8/docs/api/java/lang/InterruptedException.html?is-external=true";
 title="class or interface in java.lang">InterruptedException</a>&nbsp;ie,
                                         boolean&nbsp;throwLater)</pre>
 <div class="block">Log the InterruptedException and interrupt current 
thread</div>
 <dl>
@@ -1033,7 +1033,7 @@ implements org.apache.zookeeper.Watcher, <a 
href="../../../../../org/apache/hado
 <ul class="blockList">
 <li class="blockList">
 <h4>close</h4>
-<pre>public&nbsp;void&nbsp;<a 
href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.631">close</a>()</pre>
+<pre>public&nbsp;void&nbsp;<a 
href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.633">close</a>()</pre>
 <div class="block">Close the connection to ZooKeeper.</div>
 <dl>
 <dt><span class="overrideSpecifyLabel">Specified by:</span></dt>
@@ -1049,7 +1049,7 @@ implements org.apache.zookeeper.Watcher, <a 
href="../../../../../org/apache/hado
 <ul class="blockList">
 <li class="blockList">
 <h4>getConfiguration</h4>
-<pre>public&nbsp;org.apache.hadoop.conf.Configuration&nbsp;<a 
href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.639">getConfiguration</a>()</pre>
+<pre>public&nbsp;org.apache.hadoop.conf.Configuration&nbsp;<a 
href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.641">getConfiguration</a>()</pre>
 </li>
 </ul>
 <a name="abort-java.lang.String-java.lang.Throwable-">
@@ -1058,7 +1058,7 @@ implements org.apache.zookeeper.Watcher, <a 
href="../../../../../org/apache/hado
 <ul class="blockList">
 <li class="blockList">
 <h4>abort</h4>
-<pre>public&nbsp;void&nbsp;<a 
href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.644">abort</a>(<a
 
href="https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true";
 title="class or interface in java.lang">String</a>&nbsp;why,
+<pre>public&nbsp;void&nbsp;<a 
href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.646">abort</a>(<a
 
href="https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true";
 title="class or interface in java.lang">String</a>&nbsp;why,
                   <a 
href="https://docs.oracle.com/javase/8/docs/api/java/lang/Throwable.html?is-external=true";
 title="class or interface in java.lang">Throwable</a>&nbsp;e)</pre>
 <div class="block"><span class="descfrmTypeLabel">Description copied from 
interface:&nbsp;<code><a 
href="../../../../../org/apache/hadoop/hbase/Abortable.html#abort-java.lang.String-java.lang.Throwable-">Abortable</a></code></span></div>
 <div class="block">Abort the server or client.</div>
@@ -1077,7 +1077,7 @@ implements org.apache.zookeeper.Watcher, <a 
href="../../../../../org/apache/hado
 <ul class="blockListLast">
 <li class="blockList">
 <h4>isAborted</h4>
-<pre>public&nbsp;boolean&nbsp;<a 
href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.653">isAborted</a>()</pre>
+<pre>public&nbsp;boolean&nbsp;<a 
href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.655">isAborted</a>()</pre>
 <div class="block"><span class="descfrmTypeLabel">Description copied from 
interface:&nbsp;<code><a 
href="../../../../../org/apache/hadoop/hbase/Abortable.html#isAborted--">Abortable</a></code></span></div>
 <div class="block">Check if the server or client was aborted.</div>
 <dl>

http://git-wip-us.apache.org/repos/asf/hbase-site/blob/bb78f24d/devapidocs/src-html/org/apache/hadoop/hbase/Version.html
----------------------------------------------------------------------
diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/Version.html 
b/devapidocs/src-html/org/apache/hadoop/hbase/Version.html
index 220ff10..45aaf8a 100644
--- a/devapidocs/src-html/org/apache/hadoop/hbase/Version.html
+++ b/devapidocs/src-html/org/apache/hadoop/hbase/Version.html
@@ -16,11 +16,11 @@
 <span class="sourceLineNo">008</span>@InterfaceAudience.Private<a 
name="line.8"></a>
 <span class="sourceLineNo">009</span>public class Version {<a 
name="line.9"></a>
 <span class="sourceLineNo">010</span>  public static final String version = 
"3.0.0-SNAPSHOT";<a name="line.10"></a>
-<span class="sourceLineNo">011</span>  public static final String revision = 
"edf60b965be903f205ec689b59c55229e0cb1dbf";<a name="line.11"></a>
+<span class="sourceLineNo">011</span>  public static final String revision = 
"ec66434380aee62289ccf7b173d765bbe7083718";<a name="line.11"></a>
 <span class="sourceLineNo">012</span>  public static final String user = 
"jenkins";<a name="line.12"></a>
-<span class="sourceLineNo">013</span>  public static final String date = "Tue 
Jun 12 14:39:50 UTC 2018";<a name="line.13"></a>
+<span class="sourceLineNo">013</span>  public static final String date = "Wed 
Jun 13 14:39:52 UTC 2018";<a name="line.13"></a>
 <span class="sourceLineNo">014</span>  public static final String url = 
"git://jenkins-websites1.apache.org/home/jenkins/jenkins-slave/workspace/hbase_generate_website/hbase";<a
 name="line.14"></a>
-<span class="sourceLineNo">015</span>  public static final String srcChecksum 
= "5760e143e3a5a39f2d244a97b9689042";<a name="line.15"></a>
+<span class="sourceLineNo">015</span>  public static final String srcChecksum 
= "f82b1195e42117413b37d2d6088772bd";<a name="line.15"></a>
 <span class="sourceLineNo">016</span>}<a name="line.16"></a>
 
 

http://git-wip-us.apache.org/repos/asf/hbase-site/blob/bb78f24d/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html
----------------------------------------------------------------------
diff --git 
a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html
 
b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html
index 2dc2148..dd02cc6 100644
--- 
a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html
+++ 
b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html
@@ -507,132 +507,152 @@
 <span class="sourceLineNo">499</span>    
Collection&lt;ReplicationSourceShipper&gt; workers = workerThreads.values();<a 
name="line.499"></a>
 <span class="sourceLineNo">500</span>    for (ReplicationSourceShipper worker 
: workers) {<a name="line.500"></a>
 <span class="sourceLineNo">501</span>      worker.stopWorker();<a 
name="line.501"></a>
-<span class="sourceLineNo">502</span>      worker.entryReader.interrupt();<a 
name="line.502"></a>
-<span class="sourceLineNo">503</span>      worker.interrupt();<a 
name="line.503"></a>
-<span class="sourceLineNo">504</span>    }<a name="line.504"></a>
-<span class="sourceLineNo">505</span>    if (this.replicationEndpoint != null) 
{<a name="line.505"></a>
-<span class="sourceLineNo">506</span>      this.replicationEndpoint.stop();<a 
name="line.506"></a>
-<span class="sourceLineNo">507</span>    }<a name="line.507"></a>
-<span class="sourceLineNo">508</span>    if (join) {<a name="line.508"></a>
-<span class="sourceLineNo">509</span>      for (ReplicationSourceShipper 
worker : workers) {<a name="line.509"></a>
-<span class="sourceLineNo">510</span>        Threads.shutdown(worker, 
this.sleepForRetries);<a name="line.510"></a>
-<span class="sourceLineNo">511</span>        LOG.info("ReplicationSourceWorker 
" + worker.getName() + " terminated");<a name="line.511"></a>
-<span class="sourceLineNo">512</span>      }<a name="line.512"></a>
-<span class="sourceLineNo">513</span>      if (this.replicationEndpoint != 
null) {<a name="line.513"></a>
-<span class="sourceLineNo">514</span>        try {<a name="line.514"></a>
-<span class="sourceLineNo">515</span>          
this.replicationEndpoint.awaitTerminated(sleepForRetries * 
maxRetriesMultiplier,<a name="line.515"></a>
-<span class="sourceLineNo">516</span>            TimeUnit.MILLISECONDS);<a 
name="line.516"></a>
-<span class="sourceLineNo">517</span>        } catch (TimeoutException te) {<a 
name="line.517"></a>
-<span class="sourceLineNo">518</span>          LOG.warn("Got exception while 
waiting for endpoint to shutdown for replication source :" +<a 
name="line.518"></a>
-<span class="sourceLineNo">519</span>            this.queueId, te);<a 
name="line.519"></a>
-<span class="sourceLineNo">520</span>        }<a name="line.520"></a>
-<span class="sourceLineNo">521</span>      }<a name="line.521"></a>
-<span class="sourceLineNo">522</span>    }<a name="line.522"></a>
-<span class="sourceLineNo">523</span>    this.metrics.clear();<a 
name="line.523"></a>
-<span class="sourceLineNo">524</span>  }<a name="line.524"></a>
-<span class="sourceLineNo">525</span><a name="line.525"></a>
-<span class="sourceLineNo">526</span>  @Override<a name="line.526"></a>
-<span class="sourceLineNo">527</span>  public String getQueueId() {<a 
name="line.527"></a>
-<span class="sourceLineNo">528</span>    return this.queueId;<a 
name="line.528"></a>
-<span class="sourceLineNo">529</span>  }<a name="line.529"></a>
-<span class="sourceLineNo">530</span><a name="line.530"></a>
-<span class="sourceLineNo">531</span>  @Override<a name="line.531"></a>
-<span class="sourceLineNo">532</span>  public String getPeerId() {<a 
name="line.532"></a>
-<span class="sourceLineNo">533</span>    return this.peerId;<a 
name="line.533"></a>
-<span class="sourceLineNo">534</span>  }<a name="line.534"></a>
-<span class="sourceLineNo">535</span><a name="line.535"></a>
-<span class="sourceLineNo">536</span>  @Override<a name="line.536"></a>
-<span class="sourceLineNo">537</span>  public Path getCurrentPath() {<a 
name="line.537"></a>
-<span class="sourceLineNo">538</span>    // only for testing<a 
name="line.538"></a>
-<span class="sourceLineNo">539</span>    for (ReplicationSourceShipper worker 
: workerThreads.values()) {<a name="line.539"></a>
-<span class="sourceLineNo">540</span>      if (worker.getCurrentPath() != 
null) {<a name="line.540"></a>
-<span class="sourceLineNo">541</span>        return worker.getCurrentPath();<a 
name="line.541"></a>
-<span class="sourceLineNo">542</span>      }<a name="line.542"></a>
-<span class="sourceLineNo">543</span>    }<a name="line.543"></a>
-<span class="sourceLineNo">544</span>    return null;<a name="line.544"></a>
-<span class="sourceLineNo">545</span>  }<a name="line.545"></a>
-<span class="sourceLineNo">546</span><a name="line.546"></a>
-<span class="sourceLineNo">547</span>  @Override<a name="line.547"></a>
-<span class="sourceLineNo">548</span>  public boolean isSourceActive() {<a 
name="line.548"></a>
-<span class="sourceLineNo">549</span>    return !this.server.isStopped() 
&amp;&amp; this.sourceRunning;<a name="line.549"></a>
-<span class="sourceLineNo">550</span>  }<a name="line.550"></a>
-<span class="sourceLineNo">551</span><a name="line.551"></a>
-<span class="sourceLineNo">552</span>  /**<a name="line.552"></a>
-<span class="sourceLineNo">553</span>   * Comparator used to compare logs 
together based on their start time<a name="line.553"></a>
-<span class="sourceLineNo">554</span>   */<a name="line.554"></a>
-<span class="sourceLineNo">555</span>  public static class LogsComparator 
implements Comparator&lt;Path&gt; {<a name="line.555"></a>
-<span class="sourceLineNo">556</span><a name="line.556"></a>
-<span class="sourceLineNo">557</span>    @Override<a name="line.557"></a>
-<span class="sourceLineNo">558</span>    public int compare(Path o1, Path o2) 
{<a name="line.558"></a>
-<span class="sourceLineNo">559</span>      return Long.compare(getTS(o1), 
getTS(o2));<a name="line.559"></a>
-<span class="sourceLineNo">560</span>    }<a name="line.560"></a>
-<span class="sourceLineNo">561</span><a name="line.561"></a>
-<span class="sourceLineNo">562</span>    /**<a name="line.562"></a>
-<span class="sourceLineNo">563</span>     * Split a path to get the start 
time<a name="line.563"></a>
-<span class="sourceLineNo">564</span>     * For example: 
10.20.20.171%3A60020.1277499063250<a name="line.564"></a>
-<span class="sourceLineNo">565</span>     * @param p path to split<a 
name="line.565"></a>
-<span class="sourceLineNo">566</span>     * @return start time<a 
name="line.566"></a>
-<span class="sourceLineNo">567</span>     */<a name="line.567"></a>
-<span class="sourceLineNo">568</span>    private static long getTS(Path p) {<a 
name="line.568"></a>
-<span class="sourceLineNo">569</span>      int tsIndex = 
p.getName().lastIndexOf('.') + 1;<a name="line.569"></a>
-<span class="sourceLineNo">570</span>      return 
Long.parseLong(p.getName().substring(tsIndex));<a name="line.570"></a>
-<span class="sourceLineNo">571</span>    }<a name="line.571"></a>
-<span class="sourceLineNo">572</span>  }<a name="line.572"></a>
-<span class="sourceLineNo">573</span><a name="line.573"></a>
-<span class="sourceLineNo">574</span>  @Override<a name="line.574"></a>
-<span class="sourceLineNo">575</span>  public String getStats() {<a 
name="line.575"></a>
-<span class="sourceLineNo">576</span>    StringBuilder sb = new 
StringBuilder();<a name="line.576"></a>
-<span class="sourceLineNo">577</span>    sb.append("Total replicated edits: 
").append(totalReplicatedEdits)<a name="line.577"></a>
-<span class="sourceLineNo">578</span>        .append(", current progress: 
\n");<a name="line.578"></a>
-<span class="sourceLineNo">579</span>    for (Map.Entry&lt;String, 
ReplicationSourceShipper&gt; entry : workerThreads.entrySet()) {<a 
name="line.579"></a>
-<span class="sourceLineNo">580</span>      String walGroupId = 
entry.getKey();<a name="line.580"></a>
-<span class="sourceLineNo">581</span>      ReplicationSourceShipper worker = 
entry.getValue();<a name="line.581"></a>
-<span class="sourceLineNo">582</span>      long position = 
worker.getCurrentPosition();<a name="line.582"></a>
-<span class="sourceLineNo">583</span>      Path currentPath = 
worker.getCurrentPath();<a name="line.583"></a>
-<span class="sourceLineNo">584</span>      sb.append("walGroup 
[").append(walGroupId).append("]: ");<a name="line.584"></a>
-<span class="sourceLineNo">585</span>      if (currentPath != null) {<a 
name="line.585"></a>
-<span class="sourceLineNo">586</span>        sb.append("currently replicating 
from: ").append(currentPath).append(" at position: ")<a name="line.586"></a>
-<span class="sourceLineNo">587</span>            
.append(position).append("\n");<a name="line.587"></a>
-<span class="sourceLineNo">588</span>      } else {<a name="line.588"></a>
-<span class="sourceLineNo">589</span>        sb.append("no replication 
ongoing, waiting for new log");<a name="line.589"></a>
-<span class="sourceLineNo">590</span>      }<a name="line.590"></a>
+<span class="sourceLineNo">502</span>      
worker.entryReader.setReaderRunning(false);<a name="line.502"></a>
+<span class="sourceLineNo">503</span>    }<a name="line.503"></a>
+<span class="sourceLineNo">504</span><a name="line.504"></a>
+<span class="sourceLineNo">505</span>    for (ReplicationSourceShipper worker 
: workers) {<a name="line.505"></a>
+<span class="sourceLineNo">506</span>      if (worker.isAlive() || 
worker.entryReader.isAlive()) {<a name="line.506"></a>
+<span class="sourceLineNo">507</span>        try {<a name="line.507"></a>
+<span class="sourceLineNo">508</span>          // Wait worker to stop<a 
name="line.508"></a>
+<span class="sourceLineNo">509</span>          
Thread.sleep(this.sleepForRetries);<a name="line.509"></a>
+<span class="sourceLineNo">510</span>        } catch (InterruptedException e) 
{<a name="line.510"></a>
+<span class="sourceLineNo">511</span>          LOG.info("Interrupted while 
waiting " + worker.getName() + " to stop");<a name="line.511"></a>
+<span class="sourceLineNo">512</span>          
Thread.currentThread().interrupt();<a name="line.512"></a>
+<span class="sourceLineNo">513</span>        }<a name="line.513"></a>
+<span class="sourceLineNo">514</span>        // If worker still is alive after 
waiting, interrupt it<a name="line.514"></a>
+<span class="sourceLineNo">515</span>        if (worker.isAlive()) {<a 
name="line.515"></a>
+<span class="sourceLineNo">516</span>          worker.interrupt();<a 
name="line.516"></a>
+<span class="sourceLineNo">517</span>        }<a name="line.517"></a>
+<span class="sourceLineNo">518</span>        // If entry reader is alive after 
waiting, interrupt it<a name="line.518"></a>
+<span class="sourceLineNo">519</span>        if (worker.entryReader.isAlive()) 
{<a name="line.519"></a>
+<span class="sourceLineNo">520</span>          
worker.entryReader.interrupt();<a name="line.520"></a>
+<span class="sourceLineNo">521</span>        }<a name="line.521"></a>
+<span class="sourceLineNo">522</span>      }<a name="line.522"></a>
+<span class="sourceLineNo">523</span>    }<a name="line.523"></a>
+<span class="sourceLineNo">524</span><a name="line.524"></a>
+<span class="sourceLineNo">525</span>    if (this.replicationEndpoint != null) 
{<a name="line.525"></a>
+<span class="sourceLineNo">526</span>      this.replicationEndpoint.stop();<a 
name="line.526"></a>
+<span class="sourceLineNo">527</span>    }<a name="line.527"></a>
+<span class="sourceLineNo">528</span>    if (join) {<a name="line.528"></a>
+<span class="sourceLineNo">529</span>      for (ReplicationSourceShipper 
worker : workers) {<a name="line.529"></a>
+<span class="sourceLineNo">530</span>        Threads.shutdown(worker, 
this.sleepForRetries);<a name="line.530"></a>
+<span class="sourceLineNo">531</span>        LOG.info("ReplicationSourceWorker 
" + worker.getName() + " terminated");<a name="line.531"></a>
+<span class="sourceLineNo">532</span>      }<a name="line.532"></a>
+<span class="sourceLineNo">533</span>      if (this.replicationEndpoint != 
null) {<a name="line.533"></a>
+<span class="sourceLineNo">534</span>        try {<a name="line.534"></a>
+<span class="sourceLineNo">535</span>          
this.replicationEndpoint.awaitTerminated(sleepForRetries * 
maxRetriesMultiplier,<a name="line.535"></a>
+<span class="sourceLineNo">536</span>            TimeUnit.MILLISECONDS);<a 
name="line.536"></a>
+<span class="sourceLineNo">537</span>        } catch (TimeoutException te) {<a 
name="line.537"></a>
+<span class="sourceLineNo">538</span>          LOG.warn("Got exception while 
waiting for endpoint to shutdown for replication source :" +<a 
name="line.538"></a>
+<span class="sourceLineNo">539</span>            this.queueId, te);<a 
name="line.539"></a>
+<span class="sourceLineNo">540</span>        }<a name="line.540"></a>
+<span class="sourceLineNo">541</span>      }<a name="line.541"></a>
+<span class="sourceLineNo">542</span>    }<a name="line.542"></a>
+<span class="sourceLineNo">543</span>    this.metrics.clear();<a 
name="line.543"></a>
+<span class="sourceLineNo">544</span>  }<a name="line.544"></a>
+<span class="sourceLineNo">545</span><a name="line.545"></a>
+<span class="sourceLineNo">546</span>  @Override<a name="line.546"></a>
+<span class="sourceLineNo">547</span>  public String getQueueId() {<a 
name="line.547"></a>
+<span class="sourceLineNo">548</span>    return this.queueId;<a 
name="line.548"></a>
+<span class="sourceLineNo">549</span>  }<a name="line.549"></a>
+<span class="sourceLineNo">550</span><a name="line.550"></a>
+<span class="sourceLineNo">551</span>  @Override<a name="line.551"></a>
+<span class="sourceLineNo">552</span>  public String getPeerId() {<a 
name="line.552"></a>
+<span class="sourceLineNo">553</span>    return this.peerId;<a 
name="line.553"></a>
+<span class="sourceLineNo">554</span>  }<a name="line.554"></a>
+<span class="sourceLineNo">555</span><a name="line.555"></a>
+<span class="sourceLineNo">556</span>  @Override<a name="line.556"></a>
+<span class="sourceLineNo">557</span>  public Path getCurrentPath() {<a 
name="line.557"></a>
+<span class="sourceLineNo">558</span>    // only for testing<a 
name="line.558"></a>
+<span class="sourceLineNo">559</span>    for (ReplicationSourceShipper worker 
: workerThreads.values()) {<a name="line.559"></a>
+<span class="sourceLineNo">560</span>      if (worker.getCurrentPath() != 
null) {<a name="line.560"></a>
+<span class="sourceLineNo">561</span>        return worker.getCurrentPath();<a 
name="line.561"></a>
+<span class="sourceLineNo">562</span>      }<a name="line.562"></a>
+<span class="sourceLineNo">563</span>    }<a name="line.563"></a>
+<span class="sourceLineNo">564</span>    return null;<a name="line.564"></a>
+<span class="sourceLineNo">565</span>  }<a name="line.565"></a>
+<span class="sourceLineNo">566</span><a name="line.566"></a>
+<span class="sourceLineNo">567</span>  @Override<a name="line.567"></a>
+<span class="sourceLineNo">568</span>  public boolean isSourceActive() {<a 
name="line.568"></a>
+<span class="sourceLineNo">569</span>    return !this.server.isStopped() 
&amp;&amp; this.sourceRunning;<a name="line.569"></a>
+<span class="sourceLineNo">570</span>  }<a name="line.570"></a>
+<span class="sourceLineNo">571</span><a name="line.571"></a>
+<span class="sourceLineNo">572</span>  /**<a name="line.572"></a>
+<span class="sourceLineNo">573</span>   * Comparator used to compare logs 
together based on their start time<a name="line.573"></a>
+<span class="sourceLineNo">574</span>   */<a name="line.574"></a>
+<span class="sourceLineNo">575</span>  public static class LogsComparator 
implements Comparator&lt;Path&gt; {<a name="line.575"></a>
+<span class="sourceLineNo">576</span><a name="line.576"></a>
+<span class="sourceLineNo">577</span>    @Override<a name="line.577"></a>
+<span class="sourceLineNo">578</span>    public int compare(Path o1, Path o2) 
{<a name="line.578"></a>
+<span class="sourceLineNo">579</span>      return Long.compare(getTS(o1), 
getTS(o2));<a name="line.579"></a>
+<span class="sourceLineNo">580</span>    }<a name="line.580"></a>
+<span class="sourceLineNo">581</span><a name="line.581"></a>
+<span class="sourceLineNo">582</span>    /**<a name="line.582"></a>
+<span class="sourceLineNo">583</span>     * Split a path to get the start 
time<a name="line.583"></a>
+<span class="sourceLineNo">584</span>     * For example: 
10.20.20.171%3A60020.1277499063250<a name="line.584"></a>
+<span class="sourceLineNo">585</span>     * @param p path to split<a 
name="line.585"></a>
+<span class="sourceLineNo">586</span>     * @return start time<a 
name="line.586"></a>
+<span class="sourceLineNo">587</span>     */<a name="line.587"></a>
+<span class="sourceLineNo">588</span>    private static long getTS(Path p) {<a 
name="line.588"></a>
+<span class="sourceLineNo">589</span>      int tsIndex = 
p.getName().lastIndexOf('.') + 1;<a name="line.589"></a>
+<span class="sourceLineNo">590</span>      return 
Long.parseLong(p.getName().substring(tsIndex));<a name="line.590"></a>
 <span class="sourceLineNo">591</span>    }<a name="line.591"></a>
-<span class="sourceLineNo">592</span>    return sb.toString();<a 
name="line.592"></a>
-<span class="sourceLineNo">593</span>  }<a name="line.593"></a>
-<span class="sourceLineNo">594</span><a name="line.594"></a>
-<span class="sourceLineNo">595</span>  @Override<a name="line.595"></a>
-<span class="sourceLineNo">596</span>  public MetricsSource getSourceMetrics() 
{<a name="line.596"></a>
-<span class="sourceLineNo">597</span>    return this.metrics;<a 
name="line.597"></a>
-<span class="sourceLineNo">598</span>  }<a name="line.598"></a>
-<span class="sourceLineNo">599</span><a name="line.599"></a>
-<span class="sourceLineNo">600</span>  @Override<a name="line.600"></a>
-<span class="sourceLineNo">601</span>  //offsets totalBufferUsed by deducting 
shipped batchSize.<a name="line.601"></a>
-<span class="sourceLineNo">602</span>  public void 
postShipEdits(List&lt;Entry&gt; entries, int batchSize) {<a name="line.602"></a>
-<span class="sourceLineNo">603</span>    if (throttler.isEnabled()) {<a 
name="line.603"></a>
-<span class="sourceLineNo">604</span>      throttler.addPushSize(batchSize);<a 
name="line.604"></a>
-<span class="sourceLineNo">605</span>    }<a name="line.605"></a>
-<span class="sourceLineNo">606</span>    
totalReplicatedEdits.addAndGet(entries.size());<a name="line.606"></a>
-<span class="sourceLineNo">607</span>    
totalBufferUsed.addAndGet(-batchSize);<a name="line.607"></a>
-<span class="sourceLineNo">608</span>  }<a name="line.608"></a>
-<span class="sourceLineNo">609</span><a name="line.609"></a>
-<span class="sourceLineNo">610</span>  @Override<a name="line.610"></a>
-<span class="sourceLineNo">611</span>  public WALFileLengthProvider 
getWALFileLengthProvider() {<a name="line.611"></a>
-<span class="sourceLineNo">612</span>    return walFileLengthProvider;<a 
name="line.612"></a>
+<span class="sourceLineNo">592</span>  }<a name="line.592"></a>
+<span class="sourceLineNo">593</span><a name="line.593"></a>
+<span class="sourceLineNo">594</span>  @Override<a name="line.594"></a>
+<span class="sourceLineNo">595</span>  public String getStats() {<a 
name="line.595"></a>
+<span class="sourceLineNo">596</span>    StringBuilder sb = new 
StringBuilder();<a name="line.596"></a>
+<span class="sourceLineNo">597</span>    sb.append("Total replicated edits: 
").append(totalReplicatedEdits)<a name="line.597"></a>
+<span class="sourceLineNo">598</span>        .append(", current progress: 
\n");<a name="line.598"></a>
+<span class="sourceLineNo">599</span>    for (Map.Entry&lt;String, 
ReplicationSourceShipper&gt; entry : workerThreads.entrySet()) {<a 
name="line.599"></a>
+<span class="sourceLineNo">600</span>      String walGroupId = 
entry.getKey();<a name="line.600"></a>
+<span class="sourceLineNo">601</span>      ReplicationSourceShipper worker = 
entry.getValue();<a name="line.601"></a>
+<span class="sourceLineNo">602</span>      long position = 
worker.getCurrentPosition();<a name="line.602"></a>
+<span class="sourceLineNo">603</span>      Path currentPath = 
worker.getCurrentPath();<a name="line.603"></a>
+<span class="sourceLineNo">604</span>      sb.append("walGroup 
[").append(walGroupId).append("]: ");<a name="line.604"></a>
+<span class="sourceLineNo">605</span>      if (currentPath != null) {<a 
name="line.605"></a>
+<span class="sourceLineNo">606</span>        sb.append("currently replicating 
from: ").append(currentPath).append(" at position: ")<a name="line.606"></a>
+<span class="sourceLineNo">607</span>            
.append(position).append("\n");<a name="line.607"></a>
+<span class="sourceLineNo">608</span>      } else {<a name="line.608"></a>
+<span class="sourceLineNo">609</span>        sb.append("no replication 
ongoing, waiting for new log");<a name="line.609"></a>
+<span class="sourceLineNo">610</span>      }<a name="line.610"></a>
+<span class="sourceLineNo">611</span>    }<a name="line.611"></a>
+<span class="sourceLineNo">612</span>    return sb.toString();<a 
name="line.612"></a>
 <span class="sourceLineNo">613</span>  }<a name="line.613"></a>
 <span class="sourceLineNo">614</span><a name="line.614"></a>
 <span class="sourceLineNo">615</span>  @Override<a name="line.615"></a>
-<span class="sourceLineNo">616</span>  public ServerName 
getServerWALsBelongTo() {<a name="line.616"></a>
-<span class="sourceLineNo">617</span>    return server.getServerName();<a 
name="line.617"></a>
+<span class="sourceLineNo">616</span>  public MetricsSource getSourceMetrics() 
{<a name="line.616"></a>
+<span class="sourceLineNo">617</span>    return this.metrics;<a 
name="line.617"></a>
 <span class="sourceLineNo">618</span>  }<a name="line.618"></a>
 <span class="sourceLineNo">619</span><a name="line.619"></a>
-<span class="sourceLineNo">620</span>  Server getServer() {<a 
name="line.620"></a>
-<span class="sourceLineNo">621</span>    return server;<a name="line.621"></a>
-<span class="sourceLineNo">622</span>  }<a name="line.622"></a>
-<span class="sourceLineNo">623</span><a name="line.623"></a>
-<span class="sourceLineNo">624</span>  ReplicationQueueStorage 
getQueueStorage() {<a name="line.624"></a>
-<span class="sourceLineNo">625</span>    return queueStorage;<a 
name="line.625"></a>
-<span class="sourceLineNo">626</span>  }<a name="line.626"></a>
-<span class="sourceLineNo">627</span>}<a name="line.627"></a>
+<span class="sourceLineNo">620</span>  @Override<a name="line.620"></a>
+<span class="sourceLineNo">621</span>  //offsets totalBufferUsed by deducting 
shipped batchSize.<a name="line.621"></a>
+<span class="sourceLineNo">622</span>  public void 
postShipEdits(List&lt;Entry&gt; entries, int batchSize) {<a name="line.622"></a>
+<span class="sourceLineNo">623</span>    if (throttler.isEnabled()) {<a 
name="line.623"></a>
+<span class="sourceLineNo">624</span>      throttler.addPushSize(batchSize);<a 
name="line.624"></a>
+<span class="sourceLineNo">625</span>    }<a name="line.625"></a>
+<span class="sourceLineNo">626</span>    
totalReplicatedEdits.addAndGet(entries.size());<a name="line.626"></a>
+<span class="sourceLineNo">627</span>    
totalBufferUsed.addAndGet(-batchSize);<a name="line.627"></a>
+<span class="sourceLineNo">628</span>  }<a name="line.628"></a>
+<span class="sourceLineNo">629</span><a name="line.629"></a>
+<span class="sourceLineNo">630</span>  @Override<a name="line.630"></a>
+<span class="sourceLineNo">631</span>  public WALFileLengthProvider 
getWALFileLengthProvider() {<a name="line.631"></a>
+<span class="sourceLineNo">632</span>    return walFileLengthProvider;<a 
name="line.632"></a>
+<span class="sourceLineNo">633</span>  }<a name="line.633"></a>
+<span class="sourceLineNo">634</span><a name="line.634"></a>
+<span class="sourceLineNo">635</span>  @Override<a name="line.635"></a>
+<span class="sourceLineNo">636</span>  public ServerName 
getServerWALsBelongTo() {<a name="line.636"></a>
+<span class="sourceLineNo">637</span>    return server.getServerName();<a 
name="line.637"></a>
+<span class="sourceLineNo">638</span>  }<a name="line.638"></a>
+<span class="sourceLineNo">639</span><a name="line.639"></a>
+<span class="sourceLineNo">640</span>  Server getServer() {<a 
name="line.640"></a>
+<span class="sourceLineNo">641</span>    return server;<a name="line.641"></a>
+<span class="sourceLineNo">642</span>  }<a name="line.642"></a>
+<span class="sourceLineNo">643</span><a name="line.643"></a>
+<span class="sourceLineNo">644</span>  ReplicationQueueStorage 
getQueueStorage() {<a name="line.644"></a>
+<span class="sourceLineNo">645</span>    return queueStorage;<a 
name="line.645"></a>
+<span class="sourceLineNo">646</span>  }<a name="line.646"></a>
+<span class="sourceLineNo">647</span>}<a name="line.647"></a>
 
 
 

http://git-wip-us.apache.org/repos/asf/hbase-site/blob/bb78f24d/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html
----------------------------------------------------------------------
diff --git 
a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html
 
b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html
index 2dc2148..dd02cc6 100644
--- 
a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html
+++ 
b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html
@@ -507,132 +507,152 @@
 <span class="sourceLineNo">499</span>    
Collection&lt;ReplicationSourceShipper&gt; workers = workerThreads.values();<a 
name="line.499"></a>
 <span class="sourceLineNo">500</span>    for (ReplicationSourceShipper worker 
: workers) {<a name="line.500"></a>
 <span class="sourceLineNo">501</span>      worker.stopWorker();<a 
name="line.501"></a>
-<span class="sourceLineNo">502</span>      worker.entryReader.interrupt();<a 
name="line.502"></a>
-<span class="sourceLineNo">503</span>      worker.interrupt();<a 
name="line.503"></a>
-<span class="sourceLineNo">504</span>    }<a name="line.504"></a>
-<span class="sourceLineNo">505</span>    if (this.replicationEndpoint != null) 
{<a name="line.505"></a>
-<span class="sourceLineNo">506</span>      this.replicationEndpoint.stop();<a 
name="line.506"></a>
-<span class="sourceLineNo">507</span>    }<a name="line.507"></a>
-<span class="sourceLineNo">508</span>    if (join) {<a name="line.508"></a>
-<span class="sourceLineNo">509</span>      for (ReplicationSourceShipper 
worker : workers) {<a name="line.509"></a>
-<span class="sourceLineNo">510</span>        Threads.shutdown(worker, 
this.sleepForRetries);<a name="line.510"></a>
-<span class="sourceLineNo">511</span>        LOG.info("ReplicationSourceWorker 
" + worker.getName() + " terminated");<a name="line.511"></a>
-<span class="sourceLineNo">512</span>      }<a name="line.512"></a>
-<span class="sourceLineNo">513</span>      if (this.replicationEndpoint != 
null) {<a name="line.513"></a>
-<span class="sourceLineNo">514</span>        try {<a name="line.514"></a>
-<span class="sourceLineNo">515</span>          
this.replicationEndpoint.awaitTerminated(sleepForRetries * 
maxRetriesMultiplier,<a name="line.515"></a>
-<span class="sourceLineNo">516</span>            TimeUnit.MILLISECONDS);<a 
name="line.516"></a>
-<span class="sourceLineNo">517</span>        } catch (TimeoutException te) {<a 
name="line.517"></a>
-<span class="sourceLineNo">518</span>          LOG.warn("Got exception while 
waiting for endpoint to shutdown for replication source :" +<a 
name="line.518"></a>
-<span class="sourceLineNo">519</span>            this.queueId, te);<a 
name="line.519"></a>
-<span class="sourceLineNo">520</span>        }<a name="line.520"></a>
-<span class="sourceLineNo">521</span>      }<a name="line.521"></a>
-<span class="sourceLineNo">522</span>    }<a name="line.522"></a>
-<span class="sourceLineNo">523</span>    this.metrics.clear();<a 
name="line.523"></a>
-<span class="sourceLineNo">524</span>  }<a name="line.524"></a>
-<span class="sourceLineNo">525</span><a name="line.525"></a>
-<span class="sourceLineNo">526</span>  @Override<a name="line.526"></a>
-<span class="sourceLineNo">527</span>  public String getQueueId() {<a 
name="line.527"></a>
-<span class="sourceLineNo">528</span>    return this.queueId;<a 
name="line.528"></a>
-<span class="sourceLineNo">529</span>  }<a name="line.529"></a>
-<span class="sourceLineNo">530</span><a name="line.530"></a>
-<span class="sourceLineNo">531</span>  @Override<a name="line.531"></a>
-<span class="sourceLineNo">532</span>  public String getPeerId() {<a 
name="line.532"></a>
-<span class="sourceLineNo">533</span>    return this.peerId;<a 
name="line.533"></a>
-<span class="sourceLineNo">534</span>  }<a name="line.534"></a>
-<span class="sourceLineNo">535</span><a name="line.535"></a>
-<span class="sourceLineNo">536</span>  @Override<a name="line.536"></a>
-<span class="sourceLineNo">537</span>  public Path getCurrentPath() {<a 
name="line.537"></a>
-<span class="sourceLineNo">538</span>    // only for testing<a 
name="line.538"></a>
-<span class="sourceLineNo">539</span>    for (ReplicationSourceShipper worker 
: workerThreads.values()) {<a name="line.539"></a>
-<span class="sourceLineNo">540</span>      if (worker.getCurrentPath() != 
null) {<a name="line.540"></a>
-<span class="sourceLineNo">541</span>        return worker.getCurrentPath();<a 
name="line.541"></a>
-<span class="sourceLineNo">542</span>      }<a name="line.542"></a>
-<span class="sourceLineNo">543</span>    }<a name="line.543"></a>
-<span class="sourceLineNo">544</span>    return null;<a name="line.544"></a>
-<span class="sourceLineNo">545</span>  }<a name="line.545"></a>
-<span class="sourceLineNo">546</span><a name="line.546"></a>
-<span class="sourceLineNo">547</span>  @Override<a name="line.547"></a>
-<span class="sourceLineNo">548</span>  public boolean isSourceActive() {<a 
name="line.548"></a>
-<span class="sourceLineNo">549</span>    return !this.server.isStopped() 
&amp;&amp; this.sourceRunning;<a name="line.549"></a>
-<span class="sourceLineNo">550</span>  }<a name="line.550"></a>
-<span class="sourceLineNo">551</span><a name="line.551"></a>
-<span class="sourceLineNo">552</span>  /**<a name="line.552"></a>
-<span class="sourceLineNo">553</span>   * Comparator used to compare logs 
together based on their start time<a name="line.553"></a>
-<span class="sourceLineNo">554</span>   */<a name="line.554"></a>
-<span class="sourceLineNo">555</span>  public static class LogsComparator 
implements Comparator&lt;Path&gt; {<a name="line.555"></a>
-<span class="sourceLineNo">556</span><a name="line.556"></a>
-<span class="sourceLineNo">557</span>    @Override<a name="line.557"></a>
-<span class="sourceLineNo">558</span>    public int compare(Path o1, Path o2) 
{<a name="line.558"></a>
-<span class="sourceLineNo">559</span>      return Long.compare(getTS(o1), 
getTS(o2));<a name="line.559"></a>
-<span class="sourceLineNo">560</span>    }<a name="line.560"></a>
-<span class="sourceLineNo">561</span><a name="line.561"></a>
-<span class="sourceLineNo">562</span>    /**<a name="line.562"></a>
-<span class="sourceLineNo">563</span>     * Split a path to get the start 
time<a name="line.563"></a>
-<span class="sourceLineNo">564</span>     * For example: 
10.20.20.171%3A60020.1277499063250<a name="line.564"></a>
-<span class="sourceLineNo">565</span>     * @param p path to split<a 
name="line.565"></a>
-<span class="sourceLineNo">566</span>     * @return start time<a 
name="line.566"></a>
-<span class="sourceLineNo">567</span>     */<a name="line.567"></a>
-<span class="sourceLineNo">568</span>    private static long getTS(Path p) {<a 
name="line.568"></a>
-<span class="sourceLineNo">569</span>      int tsIndex = 
p.getName().lastIndexOf('.') + 1;<a name="line.569"></a>
-<span class="sourceLineNo">570</span>      return 
Long.parseLong(p.getName().substring(tsIndex));<a name="line.570"></a>
-<span class="sourceLineNo">571</span>    }<a name="line.571"></a>
-<span class="sourceLineNo">572</span>  }<a name="line.572"></a>
-<span class="sourceLineNo">573</span><a name="line.573"></a>
-<span class="sourceLineNo">574</span>  @Override<a name="line.574"></a>
-<span class="sourceLineNo">575</span>  public String getStats() {<a 
name="line.575"></a>
-<span class="sourceLineNo">576</span>    StringBuilder sb = new 
StringBuilder();<a name="line.576"></a>
-<span class="sourceLineNo">577</span>    sb.append("Total replicated edits: 
").append(totalReplicatedEdits)<a name="line.577"></a>
-<span class="sourceLineNo">578</span>        .append(", current progress: 
\n");<a name="line.578"></a>
-<span class="sourceLineNo">579</span>    for (Map.Entry&lt;String, 
ReplicationSourceShipper&gt; entry : workerThreads.entrySet()) {<a 
name="line.579"></a>
-<span class="sourceLineNo">580</span>      String walGroupId = 
entry.getKey();<a name="line.580"></a>
-<span class="sourceLineNo">581</span>      ReplicationSourceShipper worker = 
entry.getValue();<a name="line.581"></a>
-<span class="sourceLineNo">582</span>      long position = 
worker.getCurrentPosition();<a name="line.582"></a>
-<span class="sourceLineNo">583</span>      Path currentPath = 
worker.getCurrentPath();<a name="line.583"></a>
-<span class="sourceLineNo">584</span>      sb.append("walGroup 
[").append(walGroupId).append("]: ");<a name="line.584"></a>
-<span class="sourceLineNo">585</span>      if (currentPath != null) {<a 
name="line.585"></a>
-<span class="sourceLineNo">586</span>        sb.append("currently replicating 
from: ").append(currentPath).append(" at position: ")<a name="line.586"></a>
-<span class="sourceLineNo">587</span>            
.append(position).append("\n");<a name="line.587"></a>
-<span class="sourceLineNo">588</span>      } else {<a name="line.588"></a>
-<span class="sourceLineNo">589</span>        sb.append("no replication 
ongoing, waiting for new log");<a name="line.589"></a>
-<span class="sourceLineNo">590</span>      }<a name="line.590"></a>
+<span class="sourceLineNo">502</span>      
worker.entryReader.setReaderRunning(false);<a name="line.502"></a>
+<span class="sourceLineNo">503</span>    }<a name="line.503"></a>
+<span class="sourceLineNo">504</span><a name="line.504"></a>
+<span class="sourceLineNo">505</span>    for (ReplicationSourceShipper worker 
: workers) {<a name="line.505"></a>
+<span class="sourceLineNo">506</span>      if (worker.isAlive() || 
worker.entryReader.isAlive()) {<a name="line.506"></a>
+<span class="sourceLineNo">507</span>        try {<a name="line.507"></a>
+<span class="sourceLineNo">508</span>          // Wait worker to stop<a 
name="line.508"></a>
+<span class="sourceLineNo">509</span>          
Thread.sleep(this.sleepForRetries);<a name="line.509"></a>
+<span class="sourceLineNo">510</span>        } catch (InterruptedException e) 
{<a name="line.510"></a>
+<span class="sourceLineNo">511</span>          LOG.info("Interrupted while 
waiting " + worker.getName() + " to stop");<a name="line.511"></a>
+<span class="sourceLineNo">512</span>          
Thread.currentThread().interrupt();<a name="line.512"></a>
+<span class="sourceLineNo">513</span>        }<a name="line.513"></a>
+<span class="sourceLineNo">514</span>        // If worker still is alive after 
waiting, interrupt it<a name="line.514"></a>
+<span class="sourceLineNo">515</span>        if (worker.isAlive()) {<a 
name="line.515"></a>
+<span class="sourceLineNo">516</span>          worker.interrupt();<a 
name="line.516"></a>
+<span class="sourceLineNo">517</span>        }<a name="line.517"></a>
+<span class="sourceLineNo">518</span>        // If entry reader is alive after 
waiting, interrupt it<a name="line.518"></a>
+<span class="sourceLineNo">519</span>        if (worker.entryReader.isAlive()) 
{<a name="line.519"></a>
+<span class="sourceLineNo">520</span>          
worker.entryReader.interrupt();<a name="line.520"></a>
+<span class="sourceLineNo">521</span>        }<a name="line.521"></a>
+<span class="sourceLineNo">522</span>      }<a name="line.522"></a>
+<span class="sourceLineNo">523</span>    }<a name="line.523"></a>
+<span class="sourceLineNo">524</span><a name="line.524"></a>
+<span class="sourceLineNo">525</span>    if (this.replicationEndpoint != null) 
{<a name="line.525"></a>
+<span class="sourceLineNo">526</span>      this.replicationEndpoint.stop();<a 
name="line.526"></a>
+<span class="sourceLineNo">527</span>    }<a name="line.527"></a>
+<span class="sourceLineNo">528</span>    if (join) {<a name="line.528"></a>
+<span class="sourceLineNo">529</span>      for (ReplicationSourceShipper 
worker : workers) {<a name="line.529"></a>
+<span class="sourceLineNo">530</span>        Threads.shutdown(worker, 
this.sleepForRetries);<a name="line.530"></a>
+<span class="sourceLineNo">531</span>        LOG.info("ReplicationSourceWorker 
" + worker.getName() + " terminated");<a name="line.531"></a>
+<span class="sourceLineNo">532</span>      }<a name="line.532"></a>
+<span class="sourceLineNo">533</span>      if (this.replicationEndpoint != 
null) {<a name="line.533"></a>
+<span class="sourceLineNo">534</span>        try {<a name="line.534"></a>
+<span class="sourceLineNo">535</span>          
this.replicationEndpoint.awaitTerminated(sleepForRetries * 
maxRetriesMultiplier,<a name="line.535"></a>
+<span class="sourceLineNo">536</span>            TimeUnit.MILLISECONDS);<a 
name="line.536"></a>
+<span class="sourceLineNo">537</span>        } catch (TimeoutException te) {<a 
name="line.537"></a>
+<span class="sourceLineNo">538</span>          LOG.warn("Got exception while 
waiting for endpoint to shutdown for replication source :" +<a 
name="line.538"></a>
+<span class="sourceLineNo">539</span>            this.queueId, te);<a 
name="line.539"></a>
+<span class="sourceLineNo">540</span>        }<a name="line.540"></a>
+<span class="sourceLineNo">541</span>      }<a name="line.541"></a>
+<span class="sourceLineNo">542</span>    }<a name="line.542"></a>
+<span class="sourceLineNo">543</span>    this.metrics.clear();<a 
name="line.543"></a>
+<span class="sourceLineNo">544</span>  }<a name="line.544"></a>
+<span class="sourceLineNo">545</span><a name="line.545"></a>
+<span class="sourceLineNo">546</span>  @Override<a name="line.546"></a>
+<span class="sourceLineNo">547</span>  public String getQueueId() {<a 
name="line.547"></a>
+<span class="sourceLineNo">548</span>    return this.queueId;<a 
name="line.548"></a>
+<span class="sourceLineNo">549</span>  }<a name="line.549"></a>
+<span class="sourceLineNo">550</span><a name="line.550"></a>
+<span class="sourceLineNo">551</span>  @Override<a name="line.551"></a>
+<span class="sourceLineNo">552</span>  public String getPeerId() {<a 
name="line.552"></a>
+<span class="sourceLineNo">553</span>    return this.peerId;<a 
name="line.553"></a>
+<span class="sourceLineNo">554</span>  }<a name="line.554"></a>
+<span class="sourceLineNo">555</span><a name="line.555"></a>
+<span class="sourceLineNo">556</span>  @Override<a name="line.556"></a>
+<span class="sourceLineNo">557</span>  public Path getCurrentPath() {<a 
name="line.557"></a>
+<span class="sourceLineNo">558</span>    // only for testing<a 
name="line.558"></a>
+<span class="sourceLineNo">559</span>    for (ReplicationSourceShipper worker 
: workerThreads.values()) {<a name="line.559"></a>
+<span class="sourceLineNo">560</span>      if (worker.getCurrentPath() != 
null) {<a name="line.560"></a>
+<span class="sourceLineNo">561</span>        return worker.getCurrentPath();<a 
name="line.561"></a>
+<span class="sourceLineNo">562</span>      }<a name="line.562"></a>
+<span class="sourceLineNo">563</span>    }<a name="line.563"></a>
+<span class="sourceLineNo">564</span>    return null;<a name="line.564"></a>
+<span class="sourceLineNo">565</span>  }<a name="line.565"></a>
+<span class="sourceLineNo">566</span><a name="line.566"></a>
+<span class="sourceLineNo">567</span>  @Override<a name="line.567"></a>
+<span class="sourceLineNo">568</span>  public boolean isSourceActive() {<a 
name="line.568"></a>
+<span class="sourceLineNo">569</span>    return !this.server.isStopped() 
&amp;&amp; this.sourceRunning;<a name="line.569"></a>
+<span class="sourceLineNo">570</span>  }<a name="line.570"></a>
+<span class="sourceLineNo">571</span><a name="line.571"></a>
+<span class="sourceLineNo">572</span>  /**<a name="line.572"></a>
+<span class="sourceLineNo">573</span>   * Comparator used to compare logs 
together based on their start time<a name="line.573"></a>
+<span class="sourceLineNo">574</span>   */<a name="line.574"></a>
+<span class="sourceLineNo">575</span>  public static class LogsComparator 
implements Comparator&lt;Path&gt; {<a name="line.575"></a>
+<span class="sourceLineNo">576</span><a name="line.576"></a>
+<span class="sourceLineNo">577</span>    @Override<a name="line.577"></a>
+<span class="sourceLineNo">578</span>    public int compare(Path o1, Path o2) 
{<a name="line.578"></a>
+<span class="sourceLineNo">579</span>      return Long.compare(getTS(o1), 
getTS(o2));<a name="line.579"></a>
+<span class="sourceLineNo">580</span>    }<a name="line.580"></a>
+<span class="sourceLineNo">581</span><a name="line.581"></a>
+<span class="sourceLineNo">582</span>    /**<a name="line.582"></a>
+<span class="sourceLineNo">583</span>     * Split a path to get the start 
time<a name="line.583"></a>
+<span class="sourceLineNo">584</span>     * For example: 
10.20.20.171%3A60020.1277499063250<a name="line.584"></a>
+<span class="sourceLineNo">585</span>     * @param p path to split<a 
name="line.585"></a>
+<span class="sourceLineNo">586</span>     * @return start time<a 
name="line.586"></a>
+<span class="sourceLineNo">587</span>     */<a name="line.587"></a>
+<span class="sourceLineNo">588</span>    private static long getTS(Path p) {<a 
name="line.588"></a>
+<span class="sourceLineNo">589</span>      int tsIndex = 
p.getName().lastIndexOf('.') + 1;<a name="line.589"></a>
+<span class="sourceLineNo">590</span>      return 
Long.parseLong(p.getName().substring(tsIndex));<a name="line.590"></a>
 <span class="sourceLineNo">591</span>    }<a name="line.591"></a>
-<span class="sourceLineNo">592</span>    return sb.toString();<a 
name="line.592"></a>
-<span class="sourceLineNo">593</span>  }<a name="line.593"></a>
-<span class="sourceLineNo">594</span><a name="line.594"></a>
-<span class="sourceLineNo">595</span>  @Override<a name="line.595"></a>
-<span class="sourceLineNo">596</span>  public MetricsSource getSourceMetrics() 
{<a name="line.596"></a>
-<span class="sourceLineNo">597</span>    return this.metrics;<a 
name="line.597"></a>
-<span class="sourceLineNo">598</span>  }<a name="line.598"></a>
-<span class="sourceLineNo">599</span><a name="line.599"></a>
-<span class="sourceLineNo">600</span>  @Override<a name="line.600"></a>
-<span class="sourceLineNo">601</span>  //offsets totalBufferUsed by deducting 
shipped batchSize.<a name="line.601"></a>
-<span class="sourceLineNo">602</span>  public void 
postShipEdits(List&lt;Entry&gt; entries, int batchSize) {<a name="line.602"></a>
-<span class="sourceLineNo">603</span>    if (throttler.isEnabled()) {<a 
name="line.603"></a>
-<span class="sourceLineNo">604</span>      throttler.addPushSize(batchSize);<a 
name="line.604"></a>
-<span class="sourceLineNo">605</span>    }<a name="line.605"></a>
-<span class="sourceLineNo">606</span>    
totalReplicatedEdits.addAndGet(entries.size());<a name="line.606"></a>
-<span class="sourceLineNo">607</span>    
totalBufferUsed.addAndGet(-batchSize);<a name="line.607"></a>
-<span class="sourceLineNo">608</span>  }<a name="line.608"></a>
-<span class="sourceLineNo">609</span><a name="line.609"></a>
-<span class="sourceLineNo">610</span>  @Override<a name="line.610"></a>
-<span class="sourceLineNo">611</span>  public WALFileLengthProvider 
getWALFileLengthProvider() {<a name="line.611"></a>
-<span class="sourceLineNo">612</span>    return walFileLengthProvider;<a 
name="line.612"></a>
+<span class="sourceLineNo">592</span>  }<a name="line.592"></a>
+<span class="sourceLineNo">593</span><a name="line.593"></a>
+<span class="sourceLineNo">594</span>  @Override<a name="line.594"></a>
+<span class="sourceLineNo">595</span>  public String getStats() {<a 
name="line.595"></a>
+<span class="sourceLineNo">596</span>    StringBuilder sb = new 
StringBuilder();<a name="line.596"></a>
+<span class="sourceLineNo">597</span>    sb.append("Total replicated edits: 
").append(totalReplicatedEdits)<a name="line.597"></a>
+<span class="sourceLineNo">598</span>        .append(", current progress: 
\n");<a name="line.598"></a>
+<span class="sourceLineNo">599</span>    for (Map.Entry&lt;String, 
ReplicationSourceShipper&gt; entry : workerThreads.entrySet()) {<a 
name="line.599"></a>
+<span class="sourceLineNo">600</span>      String walGroupId = 
entry.getKey();<a name="line.600"></a>
+<span class="sourceLineNo">601</span>      ReplicationSourceShipper worker = 
entry.getValue();<a name="line.601"></a>
+<span class="sourceLineNo">602</span>      long position = 
worker.getCurrentPosition();<a name="line.602"></a>
+<span class="sourceLineNo">603</span>      Path currentPath = 
worker.getCurrentPath();<a name="line.603"></a>
+<span class="sourceLineNo">604</span>      sb.append("walGroup 
[").append(walGroupId).append("]: ");<a name="line.604"></a>
+<span class="sourceLineNo">605</span>      if (currentPath != null) {<a 
name="line.605"></a>
+<span class="sourceLineNo">606</span>        sb.append("currently replicating 
from: ").append(currentPath).append(" at position: ")<a name="line.606"></a>
+<span class="sourceLineNo">607</span>            
.append(position).append("\n");<a name="line.607"></a>
+<span class="sourceLineNo">608</span>      } else {<a name="line.608"></a>
+<span class="sourceLineNo">609</span>        sb.append("no replication 
ongoing, waiting for new log");<a name="line.609"></a>
+<span class="sourceLineNo">610</span>      }<a name="line.610"></a>
+<span class="sourceLineNo">611</span>    }<a name="line.611"></a>
+<span class="sourceLineNo">612</span>    return sb.toString();<a 
name="line.612"></a>
 <span class="sourceLineNo">613</span>  }<a name="line.613"></a>
 <span class="sourceLineNo">614</span><a name="line.614"></a>
 <span class="sourceLineNo">615</span>  @Override<a name="line.615"></a>
-<span class="sourceLineNo">616</span>  public ServerName 
getServerWALsBelongTo() {<a name="line.616"></a>
-<span class="sourceLineNo">617</span>    return server.getServerName();<a 
name="line.617"></a>
+<span class="sourceLineNo">616</span>  public MetricsSource getSourceMetrics() 
{<a name="line.616"></a>
+<span class="sourceLineNo">617</span>    return this.metrics;<a 
name="line.617"></a>
 <span class="sourceLineNo">618</span>  }<a name="line.618"></a>
 <span class="sourceLineNo">619</span><a name="line.619"></a>
-<span class="sourceLineNo">620</span>  Server getServer() {<a 
name="line.620"></a>
-<span class="sourceLineNo">621</span>    return server;<a name="line.621"></a>
-<span class="sourceLineNo">622</span>  }<a name="line.622"></a>
-<span class="sourceLineNo">623</span><a name="line.623"></a>
-<span class="sourceLineNo">624</span>  ReplicationQueueStorage 
getQueueStorage() {<a name="line.624"></a>
-<span class="sourceLineNo">625</span>    return queueStorage;<a 
name="line.625"></a>
-<span class="sourceLineNo">626</span>  }<a name="line.626"></a>
-<span class="sourceLineNo">627</span>}<a name="line.627"></a>
+<span class="sourceLineNo">620</span>  @Override<a name="line.620"></a>
+<span class="sourceLineNo">621</span>  //offsets totalBufferUsed by deducting 
shipped batchSize.<a name="line.621"></a>
+<span class="sourceLineNo">622</span>  public void 
postShipEdits(List&lt;Entry&gt; entries, int batchSize) {<a name="line.622"></a>
+<span class="sourceLineNo">623</span>    if (throttler.isEnabled()) {<a 
name="line.623"></a>
+<span class="sourceLineNo">624</span>      throttler.addPushSize(batchSize);<a 
name="line.624"></a>
+<span class="sourceLineNo">625</span>    }<a name="line.625"></a>
+<span class="sourceLineNo">626</span>    
totalReplicatedEdits.addAndGet(entries.size());<a name="line.626"></a>
+<span class="sourceLineNo">627</span>    
totalBufferUsed.addAndGet(-batchSize);<a name="line.627"></a>
+<span class="sourceLineNo">628</span>  }<a name="line.628"></a>
+<span class="sourceLineNo">629</span><a name="line.629"></a>
+<span class="sourceLineNo">630</span>  @Override<a name="line.630"></a>
+<span class="sourceLineNo">631</span>  public WALFileLengthProvider 
getWALFileLengthProvider() {<a name="line.631"></a>
+<span class="sourceLineNo">632</span>    return walFileLengthProvider;<a 
name="line.632"></a>
+<span class="sourceLineNo">633</span>  }<a name="line.633"></a>
+<span class="sourceLineNo">634</span><a name="line.634"></a>
+<span class="sourceLineNo">635</span>  @Override<a name="line.635"></a>
+<span class="sourceLineNo">636</span>  public ServerName 
getServerWALsBelongTo() {<a name="line.636"></a>
+<span class="sourceLineNo">637</span>    return server.getServerName();<a 
name="line.637"></a>
+<span class="sourceLineNo">638</span>  }<a name="line.638"></a>
+<span class="sourceLineNo">639</span><a name="line.639"></a>
+<span class="sourceLineNo">640</span>  Server getServer() {<a 
name="line.640"></a>
+<span class="sourceLineNo">641</span>    return server;<a name="line.641"></a>
+<span class="sourceLineNo">642</span>  }<a name="line.642"></a>
+<span class="sourceLineNo">643</span><a name="line.643"></a>
+<span class="sourceLineNo">644</span>  ReplicationQueueStorage 
getQueueStorage() {<a name="line.644"></a>
+<span class="sourceLineNo">645</span>    return queueStorage;<a 
name="line.645"></a>
+<span class="sourceLineNo">646</span>  }<a name="line.646"></a>
+<span class="sourceLineNo">647</span>}<a name="line.647"></a>
 
 
 

Reply via email to