http://git-wip-us.apache.org/repos/asf/hbase-site/blob/bb78f24d/devapidocs/org/apache/hadoop/hbase/util/package-tree.html ---------------------------------------------------------------------- diff --git a/devapidocs/org/apache/hadoop/hbase/util/package-tree.html b/devapidocs/org/apache/hadoop/hbase/util/package-tree.html index 596cffe..7ee6a13 100644 --- a/devapidocs/org/apache/hadoop/hbase/util/package-tree.html +++ b/devapidocs/org/apache/hadoop/hbase/util/package-tree.html @@ -532,14 +532,14 @@ <ul> <li type="circle">java.lang.<a href="https://docs.oracle.com/javase/8/docs/api/java/lang/Enum.html?is-external=true" title="class or interface in java.lang"><span class="typeNameLink">Enum</span></a><E> (implements java.lang.<a href="https://docs.oracle.com/javase/8/docs/api/java/lang/Comparable.html?is-external=true" title="class or interface in java.lang">Comparable</a><T>, java.io.<a href="https://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html?is-external=true" title="class or interface in java.io">Serializable</a>) <ul> +<li type="circle">org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/PoolMap.PoolType.html" title="enum in org.apache.hadoop.hbase.util"><span class="typeNameLink">PoolMap.PoolType</span></a></li> <li type="circle">org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/Bytes.LexicographicalComparerHolder.UnsafeComparer.html" title="enum in org.apache.hadoop.hbase.util"><span class="typeNameLink">Bytes.LexicographicalComparerHolder.UnsafeComparer</span></a> (implements org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/Bytes.Comparer.html" title="interface in org.apache.hadoop.hbase.util">Bytes.Comparer</a><T>)</li> -<li type="circle">org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/HBaseFsck.ErrorReporter.ERROR_CODE.html" title="enum in org.apache.hadoop.hbase.util"><span class="typeNameLink">HBaseFsck.ErrorReporter.ERROR_CODE</span></a></li> +<li type="circle">org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/Bytes.LexicographicalComparerHolder.PureJavaComparer.html" title="enum in org.apache.hadoop.hbase.util"><span class="typeNameLink">Bytes.LexicographicalComparerHolder.PureJavaComparer</span></a> (implements org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/Bytes.Comparer.html" title="interface in org.apache.hadoop.hbase.util">Bytes.Comparer</a><T>)</li> <li type="circle">org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/PrettyPrinter.Unit.html" title="enum in org.apache.hadoop.hbase.util"><span class="typeNameLink">PrettyPrinter.Unit</span></a></li> +<li type="circle">org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/HBaseFsck.ErrorReporter.ERROR_CODE.html" title="enum in org.apache.hadoop.hbase.util"><span class="typeNameLink">HBaseFsck.ErrorReporter.ERROR_CODE</span></a></li> +<li type="circle">org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/ChecksumType.html" title="enum in org.apache.hadoop.hbase.util"><span class="typeNameLink">ChecksumType</span></a></li> <li type="circle">org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/Order.html" title="enum in org.apache.hadoop.hbase.util"><span class="typeNameLink">Order</span></a></li> <li type="circle">org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/IdReadWriteLock.ReferenceType.html" title="enum in org.apache.hadoop.hbase.util"><span class="typeNameLink">IdReadWriteLock.ReferenceType</span></a></li> -<li type="circle">org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/ChecksumType.html" title="enum in org.apache.hadoop.hbase.util"><span class="typeNameLink">ChecksumType</span></a></li> -<li type="circle">org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/PoolMap.PoolType.html" title="enum in org.apache.hadoop.hbase.util"><span class="typeNameLink">PoolMap.PoolType</span></a></li> -<li type="circle">org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/Bytes.LexicographicalComparerHolder.PureJavaComparer.html" title="enum in org.apache.hadoop.hbase.util"><span class="typeNameLink">Bytes.LexicographicalComparerHolder.PureJavaComparer</span></a> (implements org.apache.hadoop.hbase.util.<a href="../../../../../org/apache/hadoop/hbase/util/Bytes.Comparer.html" title="interface in org.apache.hadoop.hbase.util">Bytes.Comparer</a><T>)</li> </ul> </li> </ul>
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/bb78f24d/devapidocs/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html ---------------------------------------------------------------------- diff --git a/devapidocs/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html b/devapidocs/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html index c4df8e7..62a3538 100644 --- a/devapidocs/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html +++ b/devapidocs/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html @@ -1017,7 +1017,7 @@ implements org.apache.zookeeper.Watcher, <a href="../../../../../org/apache/hado <ul class="blockList"> <li class="blockList"> <h4>interruptedExceptionNoThrow</h4> -<pre>public void <a href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.618">interruptedExceptionNoThrow</a>(<a href="https://docs.oracle.com/javase/8/docs/api/java/lang/InterruptedException.html?is-external=true" title="class or interface in java.lang">InterruptedException</a> ie, +<pre>public void <a href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.620">interruptedExceptionNoThrow</a>(<a href="https://docs.oracle.com/javase/8/docs/api/java/lang/InterruptedException.html?is-external=true" title="class or interface in java.lang">InterruptedException</a> ie, boolean throwLater)</pre> <div class="block">Log the InterruptedException and interrupt current thread</div> <dl> @@ -1033,7 +1033,7 @@ implements org.apache.zookeeper.Watcher, <a href="../../../../../org/apache/hado <ul class="blockList"> <li class="blockList"> <h4>close</h4> -<pre>public void <a href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.631">close</a>()</pre> +<pre>public void <a href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.633">close</a>()</pre> <div class="block">Close the connection to ZooKeeper.</div> <dl> <dt><span class="overrideSpecifyLabel">Specified by:</span></dt> @@ -1049,7 +1049,7 @@ implements org.apache.zookeeper.Watcher, <a href="../../../../../org/apache/hado <ul class="blockList"> <li class="blockList"> <h4>getConfiguration</h4> -<pre>public org.apache.hadoop.conf.Configuration <a href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.639">getConfiguration</a>()</pre> +<pre>public org.apache.hadoop.conf.Configuration <a href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.641">getConfiguration</a>()</pre> </li> </ul> <a name="abort-java.lang.String-java.lang.Throwable-"> @@ -1058,7 +1058,7 @@ implements org.apache.zookeeper.Watcher, <a href="../../../../../org/apache/hado <ul class="blockList"> <li class="blockList"> <h4>abort</h4> -<pre>public void <a href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.644">abort</a>(<a href="https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a> why, +<pre>public void <a href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.646">abort</a>(<a href="https://docs.oracle.com/javase/8/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a> why, <a href="https://docs.oracle.com/javase/8/docs/api/java/lang/Throwable.html?is-external=true" title="class or interface in java.lang">Throwable</a> e)</pre> <div class="block"><span class="descfrmTypeLabel">Description copied from interface: <code><a href="../../../../../org/apache/hadoop/hbase/Abortable.html#abort-java.lang.String-java.lang.Throwable-">Abortable</a></code></span></div> <div class="block">Abort the server or client.</div> @@ -1077,7 +1077,7 @@ implements org.apache.zookeeper.Watcher, <a href="../../../../../org/apache/hado <ul class="blockListLast"> <li class="blockList"> <h4>isAborted</h4> -<pre>public boolean <a href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.653">isAborted</a>()</pre> +<pre>public boolean <a href="../../../../../src-html/org/apache/hadoop/hbase/zookeeper/ZKWatcher.html#line.655">isAborted</a>()</pre> <div class="block"><span class="descfrmTypeLabel">Description copied from interface: <code><a href="../../../../../org/apache/hadoop/hbase/Abortable.html#isAborted--">Abortable</a></code></span></div> <div class="block">Check if the server or client was aborted.</div> <dl> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/bb78f24d/devapidocs/src-html/org/apache/hadoop/hbase/Version.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/Version.html b/devapidocs/src-html/org/apache/hadoop/hbase/Version.html index 220ff10..45aaf8a 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/Version.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/Version.html @@ -16,11 +16,11 @@ <span class="sourceLineNo">008</span>@InterfaceAudience.Private<a name="line.8"></a> <span class="sourceLineNo">009</span>public class Version {<a name="line.9"></a> <span class="sourceLineNo">010</span> public static final String version = "3.0.0-SNAPSHOT";<a name="line.10"></a> -<span class="sourceLineNo">011</span> public static final String revision = "edf60b965be903f205ec689b59c55229e0cb1dbf";<a name="line.11"></a> +<span class="sourceLineNo">011</span> public static final String revision = "ec66434380aee62289ccf7b173d765bbe7083718";<a name="line.11"></a> <span class="sourceLineNo">012</span> public static final String user = "jenkins";<a name="line.12"></a> -<span class="sourceLineNo">013</span> public static final String date = "Tue Jun 12 14:39:50 UTC 2018";<a name="line.13"></a> +<span class="sourceLineNo">013</span> public static final String date = "Wed Jun 13 14:39:52 UTC 2018";<a name="line.13"></a> <span class="sourceLineNo">014</span> public static final String url = "git://jenkins-websites1.apache.org/home/jenkins/jenkins-slave/workspace/hbase_generate_website/hbase";<a name="line.14"></a> -<span class="sourceLineNo">015</span> public static final String srcChecksum = "5760e143e3a5a39f2d244a97b9689042";<a name="line.15"></a> +<span class="sourceLineNo">015</span> public static final String srcChecksum = "f82b1195e42117413b37d2d6088772bd";<a name="line.15"></a> <span class="sourceLineNo">016</span>}<a name="line.16"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/bb78f24d/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html index 2dc2148..dd02cc6 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html @@ -507,132 +507,152 @@ <span class="sourceLineNo">499</span> Collection<ReplicationSourceShipper> workers = workerThreads.values();<a name="line.499"></a> <span class="sourceLineNo">500</span> for (ReplicationSourceShipper worker : workers) {<a name="line.500"></a> <span class="sourceLineNo">501</span> worker.stopWorker();<a name="line.501"></a> -<span class="sourceLineNo">502</span> worker.entryReader.interrupt();<a name="line.502"></a> -<span class="sourceLineNo">503</span> worker.interrupt();<a name="line.503"></a> -<span class="sourceLineNo">504</span> }<a name="line.504"></a> -<span class="sourceLineNo">505</span> if (this.replicationEndpoint != null) {<a name="line.505"></a> -<span class="sourceLineNo">506</span> this.replicationEndpoint.stop();<a name="line.506"></a> -<span class="sourceLineNo">507</span> }<a name="line.507"></a> -<span class="sourceLineNo">508</span> if (join) {<a name="line.508"></a> -<span class="sourceLineNo">509</span> for (ReplicationSourceShipper worker : workers) {<a name="line.509"></a> -<span class="sourceLineNo">510</span> Threads.shutdown(worker, this.sleepForRetries);<a name="line.510"></a> -<span class="sourceLineNo">511</span> LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");<a name="line.511"></a> -<span class="sourceLineNo">512</span> }<a name="line.512"></a> -<span class="sourceLineNo">513</span> if (this.replicationEndpoint != null) {<a name="line.513"></a> -<span class="sourceLineNo">514</span> try {<a name="line.514"></a> -<span class="sourceLineNo">515</span> this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,<a name="line.515"></a> -<span class="sourceLineNo">516</span> TimeUnit.MILLISECONDS);<a name="line.516"></a> -<span class="sourceLineNo">517</span> } catch (TimeoutException te) {<a name="line.517"></a> -<span class="sourceLineNo">518</span> LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" +<a name="line.518"></a> -<span class="sourceLineNo">519</span> this.queueId, te);<a name="line.519"></a> -<span class="sourceLineNo">520</span> }<a name="line.520"></a> -<span class="sourceLineNo">521</span> }<a name="line.521"></a> -<span class="sourceLineNo">522</span> }<a name="line.522"></a> -<span class="sourceLineNo">523</span> this.metrics.clear();<a name="line.523"></a> -<span class="sourceLineNo">524</span> }<a name="line.524"></a> -<span class="sourceLineNo">525</span><a name="line.525"></a> -<span class="sourceLineNo">526</span> @Override<a name="line.526"></a> -<span class="sourceLineNo">527</span> public String getQueueId() {<a name="line.527"></a> -<span class="sourceLineNo">528</span> return this.queueId;<a name="line.528"></a> -<span class="sourceLineNo">529</span> }<a name="line.529"></a> -<span class="sourceLineNo">530</span><a name="line.530"></a> -<span class="sourceLineNo">531</span> @Override<a name="line.531"></a> -<span class="sourceLineNo">532</span> public String getPeerId() {<a name="line.532"></a> -<span class="sourceLineNo">533</span> return this.peerId;<a name="line.533"></a> -<span class="sourceLineNo">534</span> }<a name="line.534"></a> -<span class="sourceLineNo">535</span><a name="line.535"></a> -<span class="sourceLineNo">536</span> @Override<a name="line.536"></a> -<span class="sourceLineNo">537</span> public Path getCurrentPath() {<a name="line.537"></a> -<span class="sourceLineNo">538</span> // only for testing<a name="line.538"></a> -<span class="sourceLineNo">539</span> for (ReplicationSourceShipper worker : workerThreads.values()) {<a name="line.539"></a> -<span class="sourceLineNo">540</span> if (worker.getCurrentPath() != null) {<a name="line.540"></a> -<span class="sourceLineNo">541</span> return worker.getCurrentPath();<a name="line.541"></a> -<span class="sourceLineNo">542</span> }<a name="line.542"></a> -<span class="sourceLineNo">543</span> }<a name="line.543"></a> -<span class="sourceLineNo">544</span> return null;<a name="line.544"></a> -<span class="sourceLineNo">545</span> }<a name="line.545"></a> -<span class="sourceLineNo">546</span><a name="line.546"></a> -<span class="sourceLineNo">547</span> @Override<a name="line.547"></a> -<span class="sourceLineNo">548</span> public boolean isSourceActive() {<a name="line.548"></a> -<span class="sourceLineNo">549</span> return !this.server.isStopped() && this.sourceRunning;<a name="line.549"></a> -<span class="sourceLineNo">550</span> }<a name="line.550"></a> -<span class="sourceLineNo">551</span><a name="line.551"></a> -<span class="sourceLineNo">552</span> /**<a name="line.552"></a> -<span class="sourceLineNo">553</span> * Comparator used to compare logs together based on their start time<a name="line.553"></a> -<span class="sourceLineNo">554</span> */<a name="line.554"></a> -<span class="sourceLineNo">555</span> public static class LogsComparator implements Comparator<Path> {<a name="line.555"></a> -<span class="sourceLineNo">556</span><a name="line.556"></a> -<span class="sourceLineNo">557</span> @Override<a name="line.557"></a> -<span class="sourceLineNo">558</span> public int compare(Path o1, Path o2) {<a name="line.558"></a> -<span class="sourceLineNo">559</span> return Long.compare(getTS(o1), getTS(o2));<a name="line.559"></a> -<span class="sourceLineNo">560</span> }<a name="line.560"></a> -<span class="sourceLineNo">561</span><a name="line.561"></a> -<span class="sourceLineNo">562</span> /**<a name="line.562"></a> -<span class="sourceLineNo">563</span> * Split a path to get the start time<a name="line.563"></a> -<span class="sourceLineNo">564</span> * For example: 10.20.20.171%3A60020.1277499063250<a name="line.564"></a> -<span class="sourceLineNo">565</span> * @param p path to split<a name="line.565"></a> -<span class="sourceLineNo">566</span> * @return start time<a name="line.566"></a> -<span class="sourceLineNo">567</span> */<a name="line.567"></a> -<span class="sourceLineNo">568</span> private static long getTS(Path p) {<a name="line.568"></a> -<span class="sourceLineNo">569</span> int tsIndex = p.getName().lastIndexOf('.') + 1;<a name="line.569"></a> -<span class="sourceLineNo">570</span> return Long.parseLong(p.getName().substring(tsIndex));<a name="line.570"></a> -<span class="sourceLineNo">571</span> }<a name="line.571"></a> -<span class="sourceLineNo">572</span> }<a name="line.572"></a> -<span class="sourceLineNo">573</span><a name="line.573"></a> -<span class="sourceLineNo">574</span> @Override<a name="line.574"></a> -<span class="sourceLineNo">575</span> public String getStats() {<a name="line.575"></a> -<span class="sourceLineNo">576</span> StringBuilder sb = new StringBuilder();<a name="line.576"></a> -<span class="sourceLineNo">577</span> sb.append("Total replicated edits: ").append(totalReplicatedEdits)<a name="line.577"></a> -<span class="sourceLineNo">578</span> .append(", current progress: \n");<a name="line.578"></a> -<span class="sourceLineNo">579</span> for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {<a name="line.579"></a> -<span class="sourceLineNo">580</span> String walGroupId = entry.getKey();<a name="line.580"></a> -<span class="sourceLineNo">581</span> ReplicationSourceShipper worker = entry.getValue();<a name="line.581"></a> -<span class="sourceLineNo">582</span> long position = worker.getCurrentPosition();<a name="line.582"></a> -<span class="sourceLineNo">583</span> Path currentPath = worker.getCurrentPath();<a name="line.583"></a> -<span class="sourceLineNo">584</span> sb.append("walGroup [").append(walGroupId).append("]: ");<a name="line.584"></a> -<span class="sourceLineNo">585</span> if (currentPath != null) {<a name="line.585"></a> -<span class="sourceLineNo">586</span> sb.append("currently replicating from: ").append(currentPath).append(" at position: ")<a name="line.586"></a> -<span class="sourceLineNo">587</span> .append(position).append("\n");<a name="line.587"></a> -<span class="sourceLineNo">588</span> } else {<a name="line.588"></a> -<span class="sourceLineNo">589</span> sb.append("no replication ongoing, waiting for new log");<a name="line.589"></a> -<span class="sourceLineNo">590</span> }<a name="line.590"></a> +<span class="sourceLineNo">502</span> worker.entryReader.setReaderRunning(false);<a name="line.502"></a> +<span class="sourceLineNo">503</span> }<a name="line.503"></a> +<span class="sourceLineNo">504</span><a name="line.504"></a> +<span class="sourceLineNo">505</span> for (ReplicationSourceShipper worker : workers) {<a name="line.505"></a> +<span class="sourceLineNo">506</span> if (worker.isAlive() || worker.entryReader.isAlive()) {<a name="line.506"></a> +<span class="sourceLineNo">507</span> try {<a name="line.507"></a> +<span class="sourceLineNo">508</span> // Wait worker to stop<a name="line.508"></a> +<span class="sourceLineNo">509</span> Thread.sleep(this.sleepForRetries);<a name="line.509"></a> +<span class="sourceLineNo">510</span> } catch (InterruptedException e) {<a name="line.510"></a> +<span class="sourceLineNo">511</span> LOG.info("Interrupted while waiting " + worker.getName() + " to stop");<a name="line.511"></a> +<span class="sourceLineNo">512</span> Thread.currentThread().interrupt();<a name="line.512"></a> +<span class="sourceLineNo">513</span> }<a name="line.513"></a> +<span class="sourceLineNo">514</span> // If worker still is alive after waiting, interrupt it<a name="line.514"></a> +<span class="sourceLineNo">515</span> if (worker.isAlive()) {<a name="line.515"></a> +<span class="sourceLineNo">516</span> worker.interrupt();<a name="line.516"></a> +<span class="sourceLineNo">517</span> }<a name="line.517"></a> +<span class="sourceLineNo">518</span> // If entry reader is alive after waiting, interrupt it<a name="line.518"></a> +<span class="sourceLineNo">519</span> if (worker.entryReader.isAlive()) {<a name="line.519"></a> +<span class="sourceLineNo">520</span> worker.entryReader.interrupt();<a name="line.520"></a> +<span class="sourceLineNo">521</span> }<a name="line.521"></a> +<span class="sourceLineNo">522</span> }<a name="line.522"></a> +<span class="sourceLineNo">523</span> }<a name="line.523"></a> +<span class="sourceLineNo">524</span><a name="line.524"></a> +<span class="sourceLineNo">525</span> if (this.replicationEndpoint != null) {<a name="line.525"></a> +<span class="sourceLineNo">526</span> this.replicationEndpoint.stop();<a name="line.526"></a> +<span class="sourceLineNo">527</span> }<a name="line.527"></a> +<span class="sourceLineNo">528</span> if (join) {<a name="line.528"></a> +<span class="sourceLineNo">529</span> for (ReplicationSourceShipper worker : workers) {<a name="line.529"></a> +<span class="sourceLineNo">530</span> Threads.shutdown(worker, this.sleepForRetries);<a name="line.530"></a> +<span class="sourceLineNo">531</span> LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");<a name="line.531"></a> +<span class="sourceLineNo">532</span> }<a name="line.532"></a> +<span class="sourceLineNo">533</span> if (this.replicationEndpoint != null) {<a name="line.533"></a> +<span class="sourceLineNo">534</span> try {<a name="line.534"></a> +<span class="sourceLineNo">535</span> this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,<a name="line.535"></a> +<span class="sourceLineNo">536</span> TimeUnit.MILLISECONDS);<a name="line.536"></a> +<span class="sourceLineNo">537</span> } catch (TimeoutException te) {<a name="line.537"></a> +<span class="sourceLineNo">538</span> LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" +<a name="line.538"></a> +<span class="sourceLineNo">539</span> this.queueId, te);<a name="line.539"></a> +<span class="sourceLineNo">540</span> }<a name="line.540"></a> +<span class="sourceLineNo">541</span> }<a name="line.541"></a> +<span class="sourceLineNo">542</span> }<a name="line.542"></a> +<span class="sourceLineNo">543</span> this.metrics.clear();<a name="line.543"></a> +<span class="sourceLineNo">544</span> }<a name="line.544"></a> +<span class="sourceLineNo">545</span><a name="line.545"></a> +<span class="sourceLineNo">546</span> @Override<a name="line.546"></a> +<span class="sourceLineNo">547</span> public String getQueueId() {<a name="line.547"></a> +<span class="sourceLineNo">548</span> return this.queueId;<a name="line.548"></a> +<span class="sourceLineNo">549</span> }<a name="line.549"></a> +<span class="sourceLineNo">550</span><a name="line.550"></a> +<span class="sourceLineNo">551</span> @Override<a name="line.551"></a> +<span class="sourceLineNo">552</span> public String getPeerId() {<a name="line.552"></a> +<span class="sourceLineNo">553</span> return this.peerId;<a name="line.553"></a> +<span class="sourceLineNo">554</span> }<a name="line.554"></a> +<span class="sourceLineNo">555</span><a name="line.555"></a> +<span class="sourceLineNo">556</span> @Override<a name="line.556"></a> +<span class="sourceLineNo">557</span> public Path getCurrentPath() {<a name="line.557"></a> +<span class="sourceLineNo">558</span> // only for testing<a name="line.558"></a> +<span class="sourceLineNo">559</span> for (ReplicationSourceShipper worker : workerThreads.values()) {<a name="line.559"></a> +<span class="sourceLineNo">560</span> if (worker.getCurrentPath() != null) {<a name="line.560"></a> +<span class="sourceLineNo">561</span> return worker.getCurrentPath();<a name="line.561"></a> +<span class="sourceLineNo">562</span> }<a name="line.562"></a> +<span class="sourceLineNo">563</span> }<a name="line.563"></a> +<span class="sourceLineNo">564</span> return null;<a name="line.564"></a> +<span class="sourceLineNo">565</span> }<a name="line.565"></a> +<span class="sourceLineNo">566</span><a name="line.566"></a> +<span class="sourceLineNo">567</span> @Override<a name="line.567"></a> +<span class="sourceLineNo">568</span> public boolean isSourceActive() {<a name="line.568"></a> +<span class="sourceLineNo">569</span> return !this.server.isStopped() && this.sourceRunning;<a name="line.569"></a> +<span class="sourceLineNo">570</span> }<a name="line.570"></a> +<span class="sourceLineNo">571</span><a name="line.571"></a> +<span class="sourceLineNo">572</span> /**<a name="line.572"></a> +<span class="sourceLineNo">573</span> * Comparator used to compare logs together based on their start time<a name="line.573"></a> +<span class="sourceLineNo">574</span> */<a name="line.574"></a> +<span class="sourceLineNo">575</span> public static class LogsComparator implements Comparator<Path> {<a name="line.575"></a> +<span class="sourceLineNo">576</span><a name="line.576"></a> +<span class="sourceLineNo">577</span> @Override<a name="line.577"></a> +<span class="sourceLineNo">578</span> public int compare(Path o1, Path o2) {<a name="line.578"></a> +<span class="sourceLineNo">579</span> return Long.compare(getTS(o1), getTS(o2));<a name="line.579"></a> +<span class="sourceLineNo">580</span> }<a name="line.580"></a> +<span class="sourceLineNo">581</span><a name="line.581"></a> +<span class="sourceLineNo">582</span> /**<a name="line.582"></a> +<span class="sourceLineNo">583</span> * Split a path to get the start time<a name="line.583"></a> +<span class="sourceLineNo">584</span> * For example: 10.20.20.171%3A60020.1277499063250<a name="line.584"></a> +<span class="sourceLineNo">585</span> * @param p path to split<a name="line.585"></a> +<span class="sourceLineNo">586</span> * @return start time<a name="line.586"></a> +<span class="sourceLineNo">587</span> */<a name="line.587"></a> +<span class="sourceLineNo">588</span> private static long getTS(Path p) {<a name="line.588"></a> +<span class="sourceLineNo">589</span> int tsIndex = p.getName().lastIndexOf('.') + 1;<a name="line.589"></a> +<span class="sourceLineNo">590</span> return Long.parseLong(p.getName().substring(tsIndex));<a name="line.590"></a> <span class="sourceLineNo">591</span> }<a name="line.591"></a> -<span class="sourceLineNo">592</span> return sb.toString();<a name="line.592"></a> -<span class="sourceLineNo">593</span> }<a name="line.593"></a> -<span class="sourceLineNo">594</span><a name="line.594"></a> -<span class="sourceLineNo">595</span> @Override<a name="line.595"></a> -<span class="sourceLineNo">596</span> public MetricsSource getSourceMetrics() {<a name="line.596"></a> -<span class="sourceLineNo">597</span> return this.metrics;<a name="line.597"></a> -<span class="sourceLineNo">598</span> }<a name="line.598"></a> -<span class="sourceLineNo">599</span><a name="line.599"></a> -<span class="sourceLineNo">600</span> @Override<a name="line.600"></a> -<span class="sourceLineNo">601</span> //offsets totalBufferUsed by deducting shipped batchSize.<a name="line.601"></a> -<span class="sourceLineNo">602</span> public void postShipEdits(List<Entry> entries, int batchSize) {<a name="line.602"></a> -<span class="sourceLineNo">603</span> if (throttler.isEnabled()) {<a name="line.603"></a> -<span class="sourceLineNo">604</span> throttler.addPushSize(batchSize);<a name="line.604"></a> -<span class="sourceLineNo">605</span> }<a name="line.605"></a> -<span class="sourceLineNo">606</span> totalReplicatedEdits.addAndGet(entries.size());<a name="line.606"></a> -<span class="sourceLineNo">607</span> totalBufferUsed.addAndGet(-batchSize);<a name="line.607"></a> -<span class="sourceLineNo">608</span> }<a name="line.608"></a> -<span class="sourceLineNo">609</span><a name="line.609"></a> -<span class="sourceLineNo">610</span> @Override<a name="line.610"></a> -<span class="sourceLineNo">611</span> public WALFileLengthProvider getWALFileLengthProvider() {<a name="line.611"></a> -<span class="sourceLineNo">612</span> return walFileLengthProvider;<a name="line.612"></a> +<span class="sourceLineNo">592</span> }<a name="line.592"></a> +<span class="sourceLineNo">593</span><a name="line.593"></a> +<span class="sourceLineNo">594</span> @Override<a name="line.594"></a> +<span class="sourceLineNo">595</span> public String getStats() {<a name="line.595"></a> +<span class="sourceLineNo">596</span> StringBuilder sb = new StringBuilder();<a name="line.596"></a> +<span class="sourceLineNo">597</span> sb.append("Total replicated edits: ").append(totalReplicatedEdits)<a name="line.597"></a> +<span class="sourceLineNo">598</span> .append(", current progress: \n");<a name="line.598"></a> +<span class="sourceLineNo">599</span> for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {<a name="line.599"></a> +<span class="sourceLineNo">600</span> String walGroupId = entry.getKey();<a name="line.600"></a> +<span class="sourceLineNo">601</span> ReplicationSourceShipper worker = entry.getValue();<a name="line.601"></a> +<span class="sourceLineNo">602</span> long position = worker.getCurrentPosition();<a name="line.602"></a> +<span class="sourceLineNo">603</span> Path currentPath = worker.getCurrentPath();<a name="line.603"></a> +<span class="sourceLineNo">604</span> sb.append("walGroup [").append(walGroupId).append("]: ");<a name="line.604"></a> +<span class="sourceLineNo">605</span> if (currentPath != null) {<a name="line.605"></a> +<span class="sourceLineNo">606</span> sb.append("currently replicating from: ").append(currentPath).append(" at position: ")<a name="line.606"></a> +<span class="sourceLineNo">607</span> .append(position).append("\n");<a name="line.607"></a> +<span class="sourceLineNo">608</span> } else {<a name="line.608"></a> +<span class="sourceLineNo">609</span> sb.append("no replication ongoing, waiting for new log");<a name="line.609"></a> +<span class="sourceLineNo">610</span> }<a name="line.610"></a> +<span class="sourceLineNo">611</span> }<a name="line.611"></a> +<span class="sourceLineNo">612</span> return sb.toString();<a name="line.612"></a> <span class="sourceLineNo">613</span> }<a name="line.613"></a> <span class="sourceLineNo">614</span><a name="line.614"></a> <span class="sourceLineNo">615</span> @Override<a name="line.615"></a> -<span class="sourceLineNo">616</span> public ServerName getServerWALsBelongTo() {<a name="line.616"></a> -<span class="sourceLineNo">617</span> return server.getServerName();<a name="line.617"></a> +<span class="sourceLineNo">616</span> public MetricsSource getSourceMetrics() {<a name="line.616"></a> +<span class="sourceLineNo">617</span> return this.metrics;<a name="line.617"></a> <span class="sourceLineNo">618</span> }<a name="line.618"></a> <span class="sourceLineNo">619</span><a name="line.619"></a> -<span class="sourceLineNo">620</span> Server getServer() {<a name="line.620"></a> -<span class="sourceLineNo">621</span> return server;<a name="line.621"></a> -<span class="sourceLineNo">622</span> }<a name="line.622"></a> -<span class="sourceLineNo">623</span><a name="line.623"></a> -<span class="sourceLineNo">624</span> ReplicationQueueStorage getQueueStorage() {<a name="line.624"></a> -<span class="sourceLineNo">625</span> return queueStorage;<a name="line.625"></a> -<span class="sourceLineNo">626</span> }<a name="line.626"></a> -<span class="sourceLineNo">627</span>}<a name="line.627"></a> +<span class="sourceLineNo">620</span> @Override<a name="line.620"></a> +<span class="sourceLineNo">621</span> //offsets totalBufferUsed by deducting shipped batchSize.<a name="line.621"></a> +<span class="sourceLineNo">622</span> public void postShipEdits(List<Entry> entries, int batchSize) {<a name="line.622"></a> +<span class="sourceLineNo">623</span> if (throttler.isEnabled()) {<a name="line.623"></a> +<span class="sourceLineNo">624</span> throttler.addPushSize(batchSize);<a name="line.624"></a> +<span class="sourceLineNo">625</span> }<a name="line.625"></a> +<span class="sourceLineNo">626</span> totalReplicatedEdits.addAndGet(entries.size());<a name="line.626"></a> +<span class="sourceLineNo">627</span> totalBufferUsed.addAndGet(-batchSize);<a name="line.627"></a> +<span class="sourceLineNo">628</span> }<a name="line.628"></a> +<span class="sourceLineNo">629</span><a name="line.629"></a> +<span class="sourceLineNo">630</span> @Override<a name="line.630"></a> +<span class="sourceLineNo">631</span> public WALFileLengthProvider getWALFileLengthProvider() {<a name="line.631"></a> +<span class="sourceLineNo">632</span> return walFileLengthProvider;<a name="line.632"></a> +<span class="sourceLineNo">633</span> }<a name="line.633"></a> +<span class="sourceLineNo">634</span><a name="line.634"></a> +<span class="sourceLineNo">635</span> @Override<a name="line.635"></a> +<span class="sourceLineNo">636</span> public ServerName getServerWALsBelongTo() {<a name="line.636"></a> +<span class="sourceLineNo">637</span> return server.getServerName();<a name="line.637"></a> +<span class="sourceLineNo">638</span> }<a name="line.638"></a> +<span class="sourceLineNo">639</span><a name="line.639"></a> +<span class="sourceLineNo">640</span> Server getServer() {<a name="line.640"></a> +<span class="sourceLineNo">641</span> return server;<a name="line.641"></a> +<span class="sourceLineNo">642</span> }<a name="line.642"></a> +<span class="sourceLineNo">643</span><a name="line.643"></a> +<span class="sourceLineNo">644</span> ReplicationQueueStorage getQueueStorage() {<a name="line.644"></a> +<span class="sourceLineNo">645</span> return queueStorage;<a name="line.645"></a> +<span class="sourceLineNo">646</span> }<a name="line.646"></a> +<span class="sourceLineNo">647</span>}<a name="line.647"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/bb78f24d/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html index 2dc2148..dd02cc6 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html @@ -507,132 +507,152 @@ <span class="sourceLineNo">499</span> Collection<ReplicationSourceShipper> workers = workerThreads.values();<a name="line.499"></a> <span class="sourceLineNo">500</span> for (ReplicationSourceShipper worker : workers) {<a name="line.500"></a> <span class="sourceLineNo">501</span> worker.stopWorker();<a name="line.501"></a> -<span class="sourceLineNo">502</span> worker.entryReader.interrupt();<a name="line.502"></a> -<span class="sourceLineNo">503</span> worker.interrupt();<a name="line.503"></a> -<span class="sourceLineNo">504</span> }<a name="line.504"></a> -<span class="sourceLineNo">505</span> if (this.replicationEndpoint != null) {<a name="line.505"></a> -<span class="sourceLineNo">506</span> this.replicationEndpoint.stop();<a name="line.506"></a> -<span class="sourceLineNo">507</span> }<a name="line.507"></a> -<span class="sourceLineNo">508</span> if (join) {<a name="line.508"></a> -<span class="sourceLineNo">509</span> for (ReplicationSourceShipper worker : workers) {<a name="line.509"></a> -<span class="sourceLineNo">510</span> Threads.shutdown(worker, this.sleepForRetries);<a name="line.510"></a> -<span class="sourceLineNo">511</span> LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");<a name="line.511"></a> -<span class="sourceLineNo">512</span> }<a name="line.512"></a> -<span class="sourceLineNo">513</span> if (this.replicationEndpoint != null) {<a name="line.513"></a> -<span class="sourceLineNo">514</span> try {<a name="line.514"></a> -<span class="sourceLineNo">515</span> this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,<a name="line.515"></a> -<span class="sourceLineNo">516</span> TimeUnit.MILLISECONDS);<a name="line.516"></a> -<span class="sourceLineNo">517</span> } catch (TimeoutException te) {<a name="line.517"></a> -<span class="sourceLineNo">518</span> LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" +<a name="line.518"></a> -<span class="sourceLineNo">519</span> this.queueId, te);<a name="line.519"></a> -<span class="sourceLineNo">520</span> }<a name="line.520"></a> -<span class="sourceLineNo">521</span> }<a name="line.521"></a> -<span class="sourceLineNo">522</span> }<a name="line.522"></a> -<span class="sourceLineNo">523</span> this.metrics.clear();<a name="line.523"></a> -<span class="sourceLineNo">524</span> }<a name="line.524"></a> -<span class="sourceLineNo">525</span><a name="line.525"></a> -<span class="sourceLineNo">526</span> @Override<a name="line.526"></a> -<span class="sourceLineNo">527</span> public String getQueueId() {<a name="line.527"></a> -<span class="sourceLineNo">528</span> return this.queueId;<a name="line.528"></a> -<span class="sourceLineNo">529</span> }<a name="line.529"></a> -<span class="sourceLineNo">530</span><a name="line.530"></a> -<span class="sourceLineNo">531</span> @Override<a name="line.531"></a> -<span class="sourceLineNo">532</span> public String getPeerId() {<a name="line.532"></a> -<span class="sourceLineNo">533</span> return this.peerId;<a name="line.533"></a> -<span class="sourceLineNo">534</span> }<a name="line.534"></a> -<span class="sourceLineNo">535</span><a name="line.535"></a> -<span class="sourceLineNo">536</span> @Override<a name="line.536"></a> -<span class="sourceLineNo">537</span> public Path getCurrentPath() {<a name="line.537"></a> -<span class="sourceLineNo">538</span> // only for testing<a name="line.538"></a> -<span class="sourceLineNo">539</span> for (ReplicationSourceShipper worker : workerThreads.values()) {<a name="line.539"></a> -<span class="sourceLineNo">540</span> if (worker.getCurrentPath() != null) {<a name="line.540"></a> -<span class="sourceLineNo">541</span> return worker.getCurrentPath();<a name="line.541"></a> -<span class="sourceLineNo">542</span> }<a name="line.542"></a> -<span class="sourceLineNo">543</span> }<a name="line.543"></a> -<span class="sourceLineNo">544</span> return null;<a name="line.544"></a> -<span class="sourceLineNo">545</span> }<a name="line.545"></a> -<span class="sourceLineNo">546</span><a name="line.546"></a> -<span class="sourceLineNo">547</span> @Override<a name="line.547"></a> -<span class="sourceLineNo">548</span> public boolean isSourceActive() {<a name="line.548"></a> -<span class="sourceLineNo">549</span> return !this.server.isStopped() && this.sourceRunning;<a name="line.549"></a> -<span class="sourceLineNo">550</span> }<a name="line.550"></a> -<span class="sourceLineNo">551</span><a name="line.551"></a> -<span class="sourceLineNo">552</span> /**<a name="line.552"></a> -<span class="sourceLineNo">553</span> * Comparator used to compare logs together based on their start time<a name="line.553"></a> -<span class="sourceLineNo">554</span> */<a name="line.554"></a> -<span class="sourceLineNo">555</span> public static class LogsComparator implements Comparator<Path> {<a name="line.555"></a> -<span class="sourceLineNo">556</span><a name="line.556"></a> -<span class="sourceLineNo">557</span> @Override<a name="line.557"></a> -<span class="sourceLineNo">558</span> public int compare(Path o1, Path o2) {<a name="line.558"></a> -<span class="sourceLineNo">559</span> return Long.compare(getTS(o1), getTS(o2));<a name="line.559"></a> -<span class="sourceLineNo">560</span> }<a name="line.560"></a> -<span class="sourceLineNo">561</span><a name="line.561"></a> -<span class="sourceLineNo">562</span> /**<a name="line.562"></a> -<span class="sourceLineNo">563</span> * Split a path to get the start time<a name="line.563"></a> -<span class="sourceLineNo">564</span> * For example: 10.20.20.171%3A60020.1277499063250<a name="line.564"></a> -<span class="sourceLineNo">565</span> * @param p path to split<a name="line.565"></a> -<span class="sourceLineNo">566</span> * @return start time<a name="line.566"></a> -<span class="sourceLineNo">567</span> */<a name="line.567"></a> -<span class="sourceLineNo">568</span> private static long getTS(Path p) {<a name="line.568"></a> -<span class="sourceLineNo">569</span> int tsIndex = p.getName().lastIndexOf('.') + 1;<a name="line.569"></a> -<span class="sourceLineNo">570</span> return Long.parseLong(p.getName().substring(tsIndex));<a name="line.570"></a> -<span class="sourceLineNo">571</span> }<a name="line.571"></a> -<span class="sourceLineNo">572</span> }<a name="line.572"></a> -<span class="sourceLineNo">573</span><a name="line.573"></a> -<span class="sourceLineNo">574</span> @Override<a name="line.574"></a> -<span class="sourceLineNo">575</span> public String getStats() {<a name="line.575"></a> -<span class="sourceLineNo">576</span> StringBuilder sb = new StringBuilder();<a name="line.576"></a> -<span class="sourceLineNo">577</span> sb.append("Total replicated edits: ").append(totalReplicatedEdits)<a name="line.577"></a> -<span class="sourceLineNo">578</span> .append(", current progress: \n");<a name="line.578"></a> -<span class="sourceLineNo">579</span> for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {<a name="line.579"></a> -<span class="sourceLineNo">580</span> String walGroupId = entry.getKey();<a name="line.580"></a> -<span class="sourceLineNo">581</span> ReplicationSourceShipper worker = entry.getValue();<a name="line.581"></a> -<span class="sourceLineNo">582</span> long position = worker.getCurrentPosition();<a name="line.582"></a> -<span class="sourceLineNo">583</span> Path currentPath = worker.getCurrentPath();<a name="line.583"></a> -<span class="sourceLineNo">584</span> sb.append("walGroup [").append(walGroupId).append("]: ");<a name="line.584"></a> -<span class="sourceLineNo">585</span> if (currentPath != null) {<a name="line.585"></a> -<span class="sourceLineNo">586</span> sb.append("currently replicating from: ").append(currentPath).append(" at position: ")<a name="line.586"></a> -<span class="sourceLineNo">587</span> .append(position).append("\n");<a name="line.587"></a> -<span class="sourceLineNo">588</span> } else {<a name="line.588"></a> -<span class="sourceLineNo">589</span> sb.append("no replication ongoing, waiting for new log");<a name="line.589"></a> -<span class="sourceLineNo">590</span> }<a name="line.590"></a> +<span class="sourceLineNo">502</span> worker.entryReader.setReaderRunning(false);<a name="line.502"></a> +<span class="sourceLineNo">503</span> }<a name="line.503"></a> +<span class="sourceLineNo">504</span><a name="line.504"></a> +<span class="sourceLineNo">505</span> for (ReplicationSourceShipper worker : workers) {<a name="line.505"></a> +<span class="sourceLineNo">506</span> if (worker.isAlive() || worker.entryReader.isAlive()) {<a name="line.506"></a> +<span class="sourceLineNo">507</span> try {<a name="line.507"></a> +<span class="sourceLineNo">508</span> // Wait worker to stop<a name="line.508"></a> +<span class="sourceLineNo">509</span> Thread.sleep(this.sleepForRetries);<a name="line.509"></a> +<span class="sourceLineNo">510</span> } catch (InterruptedException e) {<a name="line.510"></a> +<span class="sourceLineNo">511</span> LOG.info("Interrupted while waiting " + worker.getName() + " to stop");<a name="line.511"></a> +<span class="sourceLineNo">512</span> Thread.currentThread().interrupt();<a name="line.512"></a> +<span class="sourceLineNo">513</span> }<a name="line.513"></a> +<span class="sourceLineNo">514</span> // If worker still is alive after waiting, interrupt it<a name="line.514"></a> +<span class="sourceLineNo">515</span> if (worker.isAlive()) {<a name="line.515"></a> +<span class="sourceLineNo">516</span> worker.interrupt();<a name="line.516"></a> +<span class="sourceLineNo">517</span> }<a name="line.517"></a> +<span class="sourceLineNo">518</span> // If entry reader is alive after waiting, interrupt it<a name="line.518"></a> +<span class="sourceLineNo">519</span> if (worker.entryReader.isAlive()) {<a name="line.519"></a> +<span class="sourceLineNo">520</span> worker.entryReader.interrupt();<a name="line.520"></a> +<span class="sourceLineNo">521</span> }<a name="line.521"></a> +<span class="sourceLineNo">522</span> }<a name="line.522"></a> +<span class="sourceLineNo">523</span> }<a name="line.523"></a> +<span class="sourceLineNo">524</span><a name="line.524"></a> +<span class="sourceLineNo">525</span> if (this.replicationEndpoint != null) {<a name="line.525"></a> +<span class="sourceLineNo">526</span> this.replicationEndpoint.stop();<a name="line.526"></a> +<span class="sourceLineNo">527</span> }<a name="line.527"></a> +<span class="sourceLineNo">528</span> if (join) {<a name="line.528"></a> +<span class="sourceLineNo">529</span> for (ReplicationSourceShipper worker : workers) {<a name="line.529"></a> +<span class="sourceLineNo">530</span> Threads.shutdown(worker, this.sleepForRetries);<a name="line.530"></a> +<span class="sourceLineNo">531</span> LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");<a name="line.531"></a> +<span class="sourceLineNo">532</span> }<a name="line.532"></a> +<span class="sourceLineNo">533</span> if (this.replicationEndpoint != null) {<a name="line.533"></a> +<span class="sourceLineNo">534</span> try {<a name="line.534"></a> +<span class="sourceLineNo">535</span> this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,<a name="line.535"></a> +<span class="sourceLineNo">536</span> TimeUnit.MILLISECONDS);<a name="line.536"></a> +<span class="sourceLineNo">537</span> } catch (TimeoutException te) {<a name="line.537"></a> +<span class="sourceLineNo">538</span> LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" +<a name="line.538"></a> +<span class="sourceLineNo">539</span> this.queueId, te);<a name="line.539"></a> +<span class="sourceLineNo">540</span> }<a name="line.540"></a> +<span class="sourceLineNo">541</span> }<a name="line.541"></a> +<span class="sourceLineNo">542</span> }<a name="line.542"></a> +<span class="sourceLineNo">543</span> this.metrics.clear();<a name="line.543"></a> +<span class="sourceLineNo">544</span> }<a name="line.544"></a> +<span class="sourceLineNo">545</span><a name="line.545"></a> +<span class="sourceLineNo">546</span> @Override<a name="line.546"></a> +<span class="sourceLineNo">547</span> public String getQueueId() {<a name="line.547"></a> +<span class="sourceLineNo">548</span> return this.queueId;<a name="line.548"></a> +<span class="sourceLineNo">549</span> }<a name="line.549"></a> +<span class="sourceLineNo">550</span><a name="line.550"></a> +<span class="sourceLineNo">551</span> @Override<a name="line.551"></a> +<span class="sourceLineNo">552</span> public String getPeerId() {<a name="line.552"></a> +<span class="sourceLineNo">553</span> return this.peerId;<a name="line.553"></a> +<span class="sourceLineNo">554</span> }<a name="line.554"></a> +<span class="sourceLineNo">555</span><a name="line.555"></a> +<span class="sourceLineNo">556</span> @Override<a name="line.556"></a> +<span class="sourceLineNo">557</span> public Path getCurrentPath() {<a name="line.557"></a> +<span class="sourceLineNo">558</span> // only for testing<a name="line.558"></a> +<span class="sourceLineNo">559</span> for (ReplicationSourceShipper worker : workerThreads.values()) {<a name="line.559"></a> +<span class="sourceLineNo">560</span> if (worker.getCurrentPath() != null) {<a name="line.560"></a> +<span class="sourceLineNo">561</span> return worker.getCurrentPath();<a name="line.561"></a> +<span class="sourceLineNo">562</span> }<a name="line.562"></a> +<span class="sourceLineNo">563</span> }<a name="line.563"></a> +<span class="sourceLineNo">564</span> return null;<a name="line.564"></a> +<span class="sourceLineNo">565</span> }<a name="line.565"></a> +<span class="sourceLineNo">566</span><a name="line.566"></a> +<span class="sourceLineNo">567</span> @Override<a name="line.567"></a> +<span class="sourceLineNo">568</span> public boolean isSourceActive() {<a name="line.568"></a> +<span class="sourceLineNo">569</span> return !this.server.isStopped() && this.sourceRunning;<a name="line.569"></a> +<span class="sourceLineNo">570</span> }<a name="line.570"></a> +<span class="sourceLineNo">571</span><a name="line.571"></a> +<span class="sourceLineNo">572</span> /**<a name="line.572"></a> +<span class="sourceLineNo">573</span> * Comparator used to compare logs together based on their start time<a name="line.573"></a> +<span class="sourceLineNo">574</span> */<a name="line.574"></a> +<span class="sourceLineNo">575</span> public static class LogsComparator implements Comparator<Path> {<a name="line.575"></a> +<span class="sourceLineNo">576</span><a name="line.576"></a> +<span class="sourceLineNo">577</span> @Override<a name="line.577"></a> +<span class="sourceLineNo">578</span> public int compare(Path o1, Path o2) {<a name="line.578"></a> +<span class="sourceLineNo">579</span> return Long.compare(getTS(o1), getTS(o2));<a name="line.579"></a> +<span class="sourceLineNo">580</span> }<a name="line.580"></a> +<span class="sourceLineNo">581</span><a name="line.581"></a> +<span class="sourceLineNo">582</span> /**<a name="line.582"></a> +<span class="sourceLineNo">583</span> * Split a path to get the start time<a name="line.583"></a> +<span class="sourceLineNo">584</span> * For example: 10.20.20.171%3A60020.1277499063250<a name="line.584"></a> +<span class="sourceLineNo">585</span> * @param p path to split<a name="line.585"></a> +<span class="sourceLineNo">586</span> * @return start time<a name="line.586"></a> +<span class="sourceLineNo">587</span> */<a name="line.587"></a> +<span class="sourceLineNo">588</span> private static long getTS(Path p) {<a name="line.588"></a> +<span class="sourceLineNo">589</span> int tsIndex = p.getName().lastIndexOf('.') + 1;<a name="line.589"></a> +<span class="sourceLineNo">590</span> return Long.parseLong(p.getName().substring(tsIndex));<a name="line.590"></a> <span class="sourceLineNo">591</span> }<a name="line.591"></a> -<span class="sourceLineNo">592</span> return sb.toString();<a name="line.592"></a> -<span class="sourceLineNo">593</span> }<a name="line.593"></a> -<span class="sourceLineNo">594</span><a name="line.594"></a> -<span class="sourceLineNo">595</span> @Override<a name="line.595"></a> -<span class="sourceLineNo">596</span> public MetricsSource getSourceMetrics() {<a name="line.596"></a> -<span class="sourceLineNo">597</span> return this.metrics;<a name="line.597"></a> -<span class="sourceLineNo">598</span> }<a name="line.598"></a> -<span class="sourceLineNo">599</span><a name="line.599"></a> -<span class="sourceLineNo">600</span> @Override<a name="line.600"></a> -<span class="sourceLineNo">601</span> //offsets totalBufferUsed by deducting shipped batchSize.<a name="line.601"></a> -<span class="sourceLineNo">602</span> public void postShipEdits(List<Entry> entries, int batchSize) {<a name="line.602"></a> -<span class="sourceLineNo">603</span> if (throttler.isEnabled()) {<a name="line.603"></a> -<span class="sourceLineNo">604</span> throttler.addPushSize(batchSize);<a name="line.604"></a> -<span class="sourceLineNo">605</span> }<a name="line.605"></a> -<span class="sourceLineNo">606</span> totalReplicatedEdits.addAndGet(entries.size());<a name="line.606"></a> -<span class="sourceLineNo">607</span> totalBufferUsed.addAndGet(-batchSize);<a name="line.607"></a> -<span class="sourceLineNo">608</span> }<a name="line.608"></a> -<span class="sourceLineNo">609</span><a name="line.609"></a> -<span class="sourceLineNo">610</span> @Override<a name="line.610"></a> -<span class="sourceLineNo">611</span> public WALFileLengthProvider getWALFileLengthProvider() {<a name="line.611"></a> -<span class="sourceLineNo">612</span> return walFileLengthProvider;<a name="line.612"></a> +<span class="sourceLineNo">592</span> }<a name="line.592"></a> +<span class="sourceLineNo">593</span><a name="line.593"></a> +<span class="sourceLineNo">594</span> @Override<a name="line.594"></a> +<span class="sourceLineNo">595</span> public String getStats() {<a name="line.595"></a> +<span class="sourceLineNo">596</span> StringBuilder sb = new StringBuilder();<a name="line.596"></a> +<span class="sourceLineNo">597</span> sb.append("Total replicated edits: ").append(totalReplicatedEdits)<a name="line.597"></a> +<span class="sourceLineNo">598</span> .append(", current progress: \n");<a name="line.598"></a> +<span class="sourceLineNo">599</span> for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {<a name="line.599"></a> +<span class="sourceLineNo">600</span> String walGroupId = entry.getKey();<a name="line.600"></a> +<span class="sourceLineNo">601</span> ReplicationSourceShipper worker = entry.getValue();<a name="line.601"></a> +<span class="sourceLineNo">602</span> long position = worker.getCurrentPosition();<a name="line.602"></a> +<span class="sourceLineNo">603</span> Path currentPath = worker.getCurrentPath();<a name="line.603"></a> +<span class="sourceLineNo">604</span> sb.append("walGroup [").append(walGroupId).append("]: ");<a name="line.604"></a> +<span class="sourceLineNo">605</span> if (currentPath != null) {<a name="line.605"></a> +<span class="sourceLineNo">606</span> sb.append("currently replicating from: ").append(currentPath).append(" at position: ")<a name="line.606"></a> +<span class="sourceLineNo">607</span> .append(position).append("\n");<a name="line.607"></a> +<span class="sourceLineNo">608</span> } else {<a name="line.608"></a> +<span class="sourceLineNo">609</span> sb.append("no replication ongoing, waiting for new log");<a name="line.609"></a> +<span class="sourceLineNo">610</span> }<a name="line.610"></a> +<span class="sourceLineNo">611</span> }<a name="line.611"></a> +<span class="sourceLineNo">612</span> return sb.toString();<a name="line.612"></a> <span class="sourceLineNo">613</span> }<a name="line.613"></a> <span class="sourceLineNo">614</span><a name="line.614"></a> <span class="sourceLineNo">615</span> @Override<a name="line.615"></a> -<span class="sourceLineNo">616</span> public ServerName getServerWALsBelongTo() {<a name="line.616"></a> -<span class="sourceLineNo">617</span> return server.getServerName();<a name="line.617"></a> +<span class="sourceLineNo">616</span> public MetricsSource getSourceMetrics() {<a name="line.616"></a> +<span class="sourceLineNo">617</span> return this.metrics;<a name="line.617"></a> <span class="sourceLineNo">618</span> }<a name="line.618"></a> <span class="sourceLineNo">619</span><a name="line.619"></a> -<span class="sourceLineNo">620</span> Server getServer() {<a name="line.620"></a> -<span class="sourceLineNo">621</span> return server;<a name="line.621"></a> -<span class="sourceLineNo">622</span> }<a name="line.622"></a> -<span class="sourceLineNo">623</span><a name="line.623"></a> -<span class="sourceLineNo">624</span> ReplicationQueueStorage getQueueStorage() {<a name="line.624"></a> -<span class="sourceLineNo">625</span> return queueStorage;<a name="line.625"></a> -<span class="sourceLineNo">626</span> }<a name="line.626"></a> -<span class="sourceLineNo">627</span>}<a name="line.627"></a> +<span class="sourceLineNo">620</span> @Override<a name="line.620"></a> +<span class="sourceLineNo">621</span> //offsets totalBufferUsed by deducting shipped batchSize.<a name="line.621"></a> +<span class="sourceLineNo">622</span> public void postShipEdits(List<Entry> entries, int batchSize) {<a name="line.622"></a> +<span class="sourceLineNo">623</span> if (throttler.isEnabled()) {<a name="line.623"></a> +<span class="sourceLineNo">624</span> throttler.addPushSize(batchSize);<a name="line.624"></a> +<span class="sourceLineNo">625</span> }<a name="line.625"></a> +<span class="sourceLineNo">626</span> totalReplicatedEdits.addAndGet(entries.size());<a name="line.626"></a> +<span class="sourceLineNo">627</span> totalBufferUsed.addAndGet(-batchSize);<a name="line.627"></a> +<span class="sourceLineNo">628</span> }<a name="line.628"></a> +<span class="sourceLineNo">629</span><a name="line.629"></a> +<span class="sourceLineNo">630</span> @Override<a name="line.630"></a> +<span class="sourceLineNo">631</span> public WALFileLengthProvider getWALFileLengthProvider() {<a name="line.631"></a> +<span class="sourceLineNo">632</span> return walFileLengthProvider;<a name="line.632"></a> +<span class="sourceLineNo">633</span> }<a name="line.633"></a> +<span class="sourceLineNo">634</span><a name="line.634"></a> +<span class="sourceLineNo">635</span> @Override<a name="line.635"></a> +<span class="sourceLineNo">636</span> public ServerName getServerWALsBelongTo() {<a name="line.636"></a> +<span class="sourceLineNo">637</span> return server.getServerName();<a name="line.637"></a> +<span class="sourceLineNo">638</span> }<a name="line.638"></a> +<span class="sourceLineNo">639</span><a name="line.639"></a> +<span class="sourceLineNo">640</span> Server getServer() {<a name="line.640"></a> +<span class="sourceLineNo">641</span> return server;<a name="line.641"></a> +<span class="sourceLineNo">642</span> }<a name="line.642"></a> +<span class="sourceLineNo">643</span><a name="line.643"></a> +<span class="sourceLineNo">644</span> ReplicationQueueStorage getQueueStorage() {<a name="line.644"></a> +<span class="sourceLineNo">645</span> return queueStorage;<a name="line.645"></a> +<span class="sourceLineNo">646</span> }<a name="line.646"></a> +<span class="sourceLineNo">647</span>}<a name="line.647"></a>