http://git-wip-us.apache.org/repos/asf/hbase-site/blob/de18d468/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.html b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.html index c2e0da8..16d68f4 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.html @@ -157,27 +157,28 @@ <span class="sourceLineNo">149</span> Threads.sleep(100);// wait a short while for other worker thread to fully exit<a name="line.149"></a> <span class="sourceLineNo">150</span> boolean allTasksDone = workerThreads.values().stream().allMatch(w -> w.isFinished());<a name="line.150"></a> <span class="sourceLineNo">151</span> if (allTasksDone) {<a name="line.151"></a> -<span class="sourceLineNo">152</span> manager.removeRecoveredSource(this);<a name="line.152"></a> -<span class="sourceLineNo">153</span> LOG.info("Finished recovering queue {} with the following stats: {}", queueId, getStats());<a name="line.153"></a> -<span class="sourceLineNo">154</span> }<a name="line.154"></a> -<span class="sourceLineNo">155</span> }<a name="line.155"></a> -<span class="sourceLineNo">156</span> }<a name="line.156"></a> -<span class="sourceLineNo">157</span><a name="line.157"></a> -<span class="sourceLineNo">158</span> @Override<a name="line.158"></a> -<span class="sourceLineNo">159</span> public String getPeerId() {<a name="line.159"></a> -<span class="sourceLineNo">160</span> return this.actualPeerId;<a name="line.160"></a> -<span class="sourceLineNo">161</span> }<a name="line.161"></a> -<span class="sourceLineNo">162</span><a name="line.162"></a> -<span class="sourceLineNo">163</span> @Override<a name="line.163"></a> -<span class="sourceLineNo">164</span> public ServerName getServerWALsBelongTo() {<a name="line.164"></a> -<span class="sourceLineNo">165</span> return this.replicationQueueInfo.getDeadRegionServers().get(0);<a name="line.165"></a> -<span class="sourceLineNo">166</span> }<a name="line.166"></a> -<span class="sourceLineNo">167</span><a name="line.167"></a> -<span class="sourceLineNo">168</span> @Override<a name="line.168"></a> -<span class="sourceLineNo">169</span> public boolean isRecovered() {<a name="line.169"></a> -<span class="sourceLineNo">170</span> return true;<a name="line.170"></a> -<span class="sourceLineNo">171</span> }<a name="line.171"></a> -<span class="sourceLineNo">172</span>}<a name="line.172"></a> +<span class="sourceLineNo">152</span> this.getSourceMetrics().clear();<a name="line.152"></a> +<span class="sourceLineNo">153</span> manager.removeRecoveredSource(this);<a name="line.153"></a> +<span class="sourceLineNo">154</span> LOG.info("Finished recovering queue {} with the following stats: {}", queueId, getStats());<a name="line.154"></a> +<span class="sourceLineNo">155</span> }<a name="line.155"></a> +<span class="sourceLineNo">156</span> }<a name="line.156"></a> +<span class="sourceLineNo">157</span> }<a name="line.157"></a> +<span class="sourceLineNo">158</span><a name="line.158"></a> +<span class="sourceLineNo">159</span> @Override<a name="line.159"></a> +<span class="sourceLineNo">160</span> public String getPeerId() {<a name="line.160"></a> +<span class="sourceLineNo">161</span> return this.actualPeerId;<a name="line.161"></a> +<span class="sourceLineNo">162</span> }<a name="line.162"></a> +<span class="sourceLineNo">163</span><a name="line.163"></a> +<span class="sourceLineNo">164</span> @Override<a name="line.164"></a> +<span class="sourceLineNo">165</span> public ServerName getServerWALsBelongTo() {<a name="line.165"></a> +<span class="sourceLineNo">166</span> return this.replicationQueueInfo.getDeadRegionServers().get(0);<a name="line.166"></a> +<span class="sourceLineNo">167</span> }<a name="line.167"></a> +<span class="sourceLineNo">168</span><a name="line.168"></a> +<span class="sourceLineNo">169</span> @Override<a name="line.169"></a> +<span class="sourceLineNo">170</span> public boolean isRecovered() {<a name="line.170"></a> +<span class="sourceLineNo">171</span> return true;<a name="line.171"></a> +<span class="sourceLineNo">172</span> }<a name="line.172"></a> +<span class="sourceLineNo">173</span>}<a name="line.173"></a>
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/de18d468/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html index 4eb9011..a99b4a7 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.LogsComparator.html @@ -527,109 +527,110 @@ <span class="sourceLineNo">519</span> }<a name="line.519"></a> <span class="sourceLineNo">520</span> }<a name="line.520"></a> <span class="sourceLineNo">521</span> }<a name="line.521"></a> -<span class="sourceLineNo">522</span> }<a name="line.522"></a> -<span class="sourceLineNo">523</span><a name="line.523"></a> -<span class="sourceLineNo">524</span> @Override<a name="line.524"></a> -<span class="sourceLineNo">525</span> public String getQueueId() {<a name="line.525"></a> -<span class="sourceLineNo">526</span> return this.queueId;<a name="line.526"></a> -<span class="sourceLineNo">527</span> }<a name="line.527"></a> -<span class="sourceLineNo">528</span><a name="line.528"></a> -<span class="sourceLineNo">529</span> @Override<a name="line.529"></a> -<span class="sourceLineNo">530</span> public String getPeerId() {<a name="line.530"></a> -<span class="sourceLineNo">531</span> return this.peerId;<a name="line.531"></a> -<span class="sourceLineNo">532</span> }<a name="line.532"></a> -<span class="sourceLineNo">533</span><a name="line.533"></a> -<span class="sourceLineNo">534</span> @Override<a name="line.534"></a> -<span class="sourceLineNo">535</span> public Path getCurrentPath() {<a name="line.535"></a> -<span class="sourceLineNo">536</span> // only for testing<a name="line.536"></a> -<span class="sourceLineNo">537</span> for (ReplicationSourceShipper worker : workerThreads.values()) {<a name="line.537"></a> -<span class="sourceLineNo">538</span> if (worker.getCurrentPath() != null) {<a name="line.538"></a> -<span class="sourceLineNo">539</span> return worker.getCurrentPath();<a name="line.539"></a> -<span class="sourceLineNo">540</span> }<a name="line.540"></a> -<span class="sourceLineNo">541</span> }<a name="line.541"></a> -<span class="sourceLineNo">542</span> return null;<a name="line.542"></a> -<span class="sourceLineNo">543</span> }<a name="line.543"></a> -<span class="sourceLineNo">544</span><a name="line.544"></a> -<span class="sourceLineNo">545</span> @Override<a name="line.545"></a> -<span class="sourceLineNo">546</span> public boolean isSourceActive() {<a name="line.546"></a> -<span class="sourceLineNo">547</span> return !this.server.isStopped() && this.sourceRunning;<a name="line.547"></a> -<span class="sourceLineNo">548</span> }<a name="line.548"></a> -<span class="sourceLineNo">549</span><a name="line.549"></a> -<span class="sourceLineNo">550</span> /**<a name="line.550"></a> -<span class="sourceLineNo">551</span> * Comparator used to compare logs together based on their start time<a name="line.551"></a> -<span class="sourceLineNo">552</span> */<a name="line.552"></a> -<span class="sourceLineNo">553</span> public static class LogsComparator implements Comparator<Path> {<a name="line.553"></a> -<span class="sourceLineNo">554</span><a name="line.554"></a> -<span class="sourceLineNo">555</span> @Override<a name="line.555"></a> -<span class="sourceLineNo">556</span> public int compare(Path o1, Path o2) {<a name="line.556"></a> -<span class="sourceLineNo">557</span> return Long.compare(getTS(o1), getTS(o2));<a name="line.557"></a> -<span class="sourceLineNo">558</span> }<a name="line.558"></a> -<span class="sourceLineNo">559</span><a name="line.559"></a> -<span class="sourceLineNo">560</span> /**<a name="line.560"></a> -<span class="sourceLineNo">561</span> * Split a path to get the start time<a name="line.561"></a> -<span class="sourceLineNo">562</span> * For example: 10.20.20.171%3A60020.1277499063250<a name="line.562"></a> -<span class="sourceLineNo">563</span> * @param p path to split<a name="line.563"></a> -<span class="sourceLineNo">564</span> * @return start time<a name="line.564"></a> -<span class="sourceLineNo">565</span> */<a name="line.565"></a> -<span class="sourceLineNo">566</span> private static long getTS(Path p) {<a name="line.566"></a> -<span class="sourceLineNo">567</span> int tsIndex = p.getName().lastIndexOf('.') + 1;<a name="line.567"></a> -<span class="sourceLineNo">568</span> return Long.parseLong(p.getName().substring(tsIndex));<a name="line.568"></a> -<span class="sourceLineNo">569</span> }<a name="line.569"></a> -<span class="sourceLineNo">570</span> }<a name="line.570"></a> -<span class="sourceLineNo">571</span><a name="line.571"></a> -<span class="sourceLineNo">572</span> @Override<a name="line.572"></a> -<span class="sourceLineNo">573</span> public String getStats() {<a name="line.573"></a> -<span class="sourceLineNo">574</span> StringBuilder sb = new StringBuilder();<a name="line.574"></a> -<span class="sourceLineNo">575</span> sb.append("Total replicated edits: ").append(totalReplicatedEdits)<a name="line.575"></a> -<span class="sourceLineNo">576</span> .append(", current progress: \n");<a name="line.576"></a> -<span class="sourceLineNo">577</span> for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {<a name="line.577"></a> -<span class="sourceLineNo">578</span> String walGroupId = entry.getKey();<a name="line.578"></a> -<span class="sourceLineNo">579</span> ReplicationSourceShipper worker = entry.getValue();<a name="line.579"></a> -<span class="sourceLineNo">580</span> long position = worker.getCurrentPosition();<a name="line.580"></a> -<span class="sourceLineNo">581</span> Path currentPath = worker.getCurrentPath();<a name="line.581"></a> -<span class="sourceLineNo">582</span> sb.append("walGroup [").append(walGroupId).append("]: ");<a name="line.582"></a> -<span class="sourceLineNo">583</span> if (currentPath != null) {<a name="line.583"></a> -<span class="sourceLineNo">584</span> sb.append("currently replicating from: ").append(currentPath).append(" at position: ")<a name="line.584"></a> -<span class="sourceLineNo">585</span> .append(position).append("\n");<a name="line.585"></a> -<span class="sourceLineNo">586</span> } else {<a name="line.586"></a> -<span class="sourceLineNo">587</span> sb.append("no replication ongoing, waiting for new log");<a name="line.587"></a> -<span class="sourceLineNo">588</span> }<a name="line.588"></a> -<span class="sourceLineNo">589</span> }<a name="line.589"></a> -<span class="sourceLineNo">590</span> return sb.toString();<a name="line.590"></a> -<span class="sourceLineNo">591</span> }<a name="line.591"></a> -<span class="sourceLineNo">592</span><a name="line.592"></a> -<span class="sourceLineNo">593</span> @Override<a name="line.593"></a> -<span class="sourceLineNo">594</span> public MetricsSource getSourceMetrics() {<a name="line.594"></a> -<span class="sourceLineNo">595</span> return this.metrics;<a name="line.595"></a> -<span class="sourceLineNo">596</span> }<a name="line.596"></a> -<span class="sourceLineNo">597</span><a name="line.597"></a> -<span class="sourceLineNo">598</span> @Override<a name="line.598"></a> -<span class="sourceLineNo">599</span> public void postShipEdits(List<Entry> entries, int batchSize) {<a name="line.599"></a> -<span class="sourceLineNo">600</span> if (throttler.isEnabled()) {<a name="line.600"></a> -<span class="sourceLineNo">601</span> throttler.addPushSize(batchSize);<a name="line.601"></a> -<span class="sourceLineNo">602</span> }<a name="line.602"></a> -<span class="sourceLineNo">603</span> totalReplicatedEdits.addAndGet(entries.size());<a name="line.603"></a> -<span class="sourceLineNo">604</span> totalBufferUsed.addAndGet(-batchSize);<a name="line.604"></a> -<span class="sourceLineNo">605</span> }<a name="line.605"></a> -<span class="sourceLineNo">606</span><a name="line.606"></a> -<span class="sourceLineNo">607</span> @Override<a name="line.607"></a> -<span class="sourceLineNo">608</span> public WALFileLengthProvider getWALFileLengthProvider() {<a name="line.608"></a> -<span class="sourceLineNo">609</span> return walFileLengthProvider;<a name="line.609"></a> -<span class="sourceLineNo">610</span> }<a name="line.610"></a> -<span class="sourceLineNo">611</span><a name="line.611"></a> -<span class="sourceLineNo">612</span> @Override<a name="line.612"></a> -<span class="sourceLineNo">613</span> public ServerName getServerWALsBelongTo() {<a name="line.613"></a> -<span class="sourceLineNo">614</span> return server.getServerName();<a name="line.614"></a> -<span class="sourceLineNo">615</span> }<a name="line.615"></a> -<span class="sourceLineNo">616</span><a name="line.616"></a> -<span class="sourceLineNo">617</span> Server getServer() {<a name="line.617"></a> -<span class="sourceLineNo">618</span> return server;<a name="line.618"></a> -<span class="sourceLineNo">619</span> }<a name="line.619"></a> -<span class="sourceLineNo">620</span><a name="line.620"></a> -<span class="sourceLineNo">621</span> ReplicationQueueStorage getQueueStorage() {<a name="line.621"></a> -<span class="sourceLineNo">622</span> return queueStorage;<a name="line.622"></a> -<span class="sourceLineNo">623</span> }<a name="line.623"></a> -<span class="sourceLineNo">624</span>}<a name="line.624"></a> +<span class="sourceLineNo">522</span> this.metrics.clear();<a name="line.522"></a> +<span class="sourceLineNo">523</span> }<a name="line.523"></a> +<span class="sourceLineNo">524</span><a name="line.524"></a> +<span class="sourceLineNo">525</span> @Override<a name="line.525"></a> +<span class="sourceLineNo">526</span> public String getQueueId() {<a name="line.526"></a> +<span class="sourceLineNo">527</span> return this.queueId;<a name="line.527"></a> +<span class="sourceLineNo">528</span> }<a name="line.528"></a> +<span class="sourceLineNo">529</span><a name="line.529"></a> +<span class="sourceLineNo">530</span> @Override<a name="line.530"></a> +<span class="sourceLineNo">531</span> public String getPeerId() {<a name="line.531"></a> +<span class="sourceLineNo">532</span> return this.peerId;<a name="line.532"></a> +<span class="sourceLineNo">533</span> }<a name="line.533"></a> +<span class="sourceLineNo">534</span><a name="line.534"></a> +<span class="sourceLineNo">535</span> @Override<a name="line.535"></a> +<span class="sourceLineNo">536</span> public Path getCurrentPath() {<a name="line.536"></a> +<span class="sourceLineNo">537</span> // only for testing<a name="line.537"></a> +<span class="sourceLineNo">538</span> for (ReplicationSourceShipper worker : workerThreads.values()) {<a name="line.538"></a> +<span class="sourceLineNo">539</span> if (worker.getCurrentPath() != null) {<a name="line.539"></a> +<span class="sourceLineNo">540</span> return worker.getCurrentPath();<a name="line.540"></a> +<span class="sourceLineNo">541</span> }<a name="line.541"></a> +<span class="sourceLineNo">542</span> }<a name="line.542"></a> +<span class="sourceLineNo">543</span> return null;<a name="line.543"></a> +<span class="sourceLineNo">544</span> }<a name="line.544"></a> +<span class="sourceLineNo">545</span><a name="line.545"></a> +<span class="sourceLineNo">546</span> @Override<a name="line.546"></a> +<span class="sourceLineNo">547</span> public boolean isSourceActive() {<a name="line.547"></a> +<span class="sourceLineNo">548</span> return !this.server.isStopped() && this.sourceRunning;<a name="line.548"></a> +<span class="sourceLineNo">549</span> }<a name="line.549"></a> +<span class="sourceLineNo">550</span><a name="line.550"></a> +<span class="sourceLineNo">551</span> /**<a name="line.551"></a> +<span class="sourceLineNo">552</span> * Comparator used to compare logs together based on their start time<a name="line.552"></a> +<span class="sourceLineNo">553</span> */<a name="line.553"></a> +<span class="sourceLineNo">554</span> public static class LogsComparator implements Comparator<Path> {<a name="line.554"></a> +<span class="sourceLineNo">555</span><a name="line.555"></a> +<span class="sourceLineNo">556</span> @Override<a name="line.556"></a> +<span class="sourceLineNo">557</span> public int compare(Path o1, Path o2) {<a name="line.557"></a> +<span class="sourceLineNo">558</span> return Long.compare(getTS(o1), getTS(o2));<a name="line.558"></a> +<span class="sourceLineNo">559</span> }<a name="line.559"></a> +<span class="sourceLineNo">560</span><a name="line.560"></a> +<span class="sourceLineNo">561</span> /**<a name="line.561"></a> +<span class="sourceLineNo">562</span> * Split a path to get the start time<a name="line.562"></a> +<span class="sourceLineNo">563</span> * For example: 10.20.20.171%3A60020.1277499063250<a name="line.563"></a> +<span class="sourceLineNo">564</span> * @param p path to split<a name="line.564"></a> +<span class="sourceLineNo">565</span> * @return start time<a name="line.565"></a> +<span class="sourceLineNo">566</span> */<a name="line.566"></a> +<span class="sourceLineNo">567</span> private static long getTS(Path p) {<a name="line.567"></a> +<span class="sourceLineNo">568</span> int tsIndex = p.getName().lastIndexOf('.') + 1;<a name="line.568"></a> +<span class="sourceLineNo">569</span> return Long.parseLong(p.getName().substring(tsIndex));<a name="line.569"></a> +<span class="sourceLineNo">570</span> }<a name="line.570"></a> +<span class="sourceLineNo">571</span> }<a name="line.571"></a> +<span class="sourceLineNo">572</span><a name="line.572"></a> +<span class="sourceLineNo">573</span> @Override<a name="line.573"></a> +<span class="sourceLineNo">574</span> public String getStats() {<a name="line.574"></a> +<span class="sourceLineNo">575</span> StringBuilder sb = new StringBuilder();<a name="line.575"></a> +<span class="sourceLineNo">576</span> sb.append("Total replicated edits: ").append(totalReplicatedEdits)<a name="line.576"></a> +<span class="sourceLineNo">577</span> .append(", current progress: \n");<a name="line.577"></a> +<span class="sourceLineNo">578</span> for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {<a name="line.578"></a> +<span class="sourceLineNo">579</span> String walGroupId = entry.getKey();<a name="line.579"></a> +<span class="sourceLineNo">580</span> ReplicationSourceShipper worker = entry.getValue();<a name="line.580"></a> +<span class="sourceLineNo">581</span> long position = worker.getCurrentPosition();<a name="line.581"></a> +<span class="sourceLineNo">582</span> Path currentPath = worker.getCurrentPath();<a name="line.582"></a> +<span class="sourceLineNo">583</span> sb.append("walGroup [").append(walGroupId).append("]: ");<a name="line.583"></a> +<span class="sourceLineNo">584</span> if (currentPath != null) {<a name="line.584"></a> +<span class="sourceLineNo">585</span> sb.append("currently replicating from: ").append(currentPath).append(" at position: ")<a name="line.585"></a> +<span class="sourceLineNo">586</span> .append(position).append("\n");<a name="line.586"></a> +<span class="sourceLineNo">587</span> } else {<a name="line.587"></a> +<span class="sourceLineNo">588</span> sb.append("no replication ongoing, waiting for new log");<a name="line.588"></a> +<span class="sourceLineNo">589</span> }<a name="line.589"></a> +<span class="sourceLineNo">590</span> }<a name="line.590"></a> +<span class="sourceLineNo">591</span> return sb.toString();<a name="line.591"></a> +<span class="sourceLineNo">592</span> }<a name="line.592"></a> +<span class="sourceLineNo">593</span><a name="line.593"></a> +<span class="sourceLineNo">594</span> @Override<a name="line.594"></a> +<span class="sourceLineNo">595</span> public MetricsSource getSourceMetrics() {<a name="line.595"></a> +<span class="sourceLineNo">596</span> return this.metrics;<a name="line.596"></a> +<span class="sourceLineNo">597</span> }<a name="line.597"></a> +<span class="sourceLineNo">598</span><a name="line.598"></a> +<span class="sourceLineNo">599</span> @Override<a name="line.599"></a> +<span class="sourceLineNo">600</span> public void postShipEdits(List<Entry> entries, int batchSize) {<a name="line.600"></a> +<span class="sourceLineNo">601</span> if (throttler.isEnabled()) {<a name="line.601"></a> +<span class="sourceLineNo">602</span> throttler.addPushSize(batchSize);<a name="line.602"></a> +<span class="sourceLineNo">603</span> }<a name="line.603"></a> +<span class="sourceLineNo">604</span> totalReplicatedEdits.addAndGet(entries.size());<a name="line.604"></a> +<span class="sourceLineNo">605</span> totalBufferUsed.addAndGet(-batchSize);<a name="line.605"></a> +<span class="sourceLineNo">606</span> }<a name="line.606"></a> +<span class="sourceLineNo">607</span><a name="line.607"></a> +<span class="sourceLineNo">608</span> @Override<a name="line.608"></a> +<span class="sourceLineNo">609</span> public WALFileLengthProvider getWALFileLengthProvider() {<a name="line.609"></a> +<span class="sourceLineNo">610</span> return walFileLengthProvider;<a name="line.610"></a> +<span class="sourceLineNo">611</span> }<a name="line.611"></a> +<span class="sourceLineNo">612</span><a name="line.612"></a> +<span class="sourceLineNo">613</span> @Override<a name="line.613"></a> +<span class="sourceLineNo">614</span> public ServerName getServerWALsBelongTo() {<a name="line.614"></a> +<span class="sourceLineNo">615</span> return server.getServerName();<a name="line.615"></a> +<span class="sourceLineNo">616</span> }<a name="line.616"></a> +<span class="sourceLineNo">617</span><a name="line.617"></a> +<span class="sourceLineNo">618</span> Server getServer() {<a name="line.618"></a> +<span class="sourceLineNo">619</span> return server;<a name="line.619"></a> +<span class="sourceLineNo">620</span> }<a name="line.620"></a> +<span class="sourceLineNo">621</span><a name="line.621"></a> +<span class="sourceLineNo">622</span> ReplicationQueueStorage getQueueStorage() {<a name="line.622"></a> +<span class="sourceLineNo">623</span> return queueStorage;<a name="line.623"></a> +<span class="sourceLineNo">624</span> }<a name="line.624"></a> +<span class="sourceLineNo">625</span>}<a name="line.625"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/de18d468/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html index 4eb9011..a99b4a7 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.html @@ -527,109 +527,110 @@ <span class="sourceLineNo">519</span> }<a name="line.519"></a> <span class="sourceLineNo">520</span> }<a name="line.520"></a> <span class="sourceLineNo">521</span> }<a name="line.521"></a> -<span class="sourceLineNo">522</span> }<a name="line.522"></a> -<span class="sourceLineNo">523</span><a name="line.523"></a> -<span class="sourceLineNo">524</span> @Override<a name="line.524"></a> -<span class="sourceLineNo">525</span> public String getQueueId() {<a name="line.525"></a> -<span class="sourceLineNo">526</span> return this.queueId;<a name="line.526"></a> -<span class="sourceLineNo">527</span> }<a name="line.527"></a> -<span class="sourceLineNo">528</span><a name="line.528"></a> -<span class="sourceLineNo">529</span> @Override<a name="line.529"></a> -<span class="sourceLineNo">530</span> public String getPeerId() {<a name="line.530"></a> -<span class="sourceLineNo">531</span> return this.peerId;<a name="line.531"></a> -<span class="sourceLineNo">532</span> }<a name="line.532"></a> -<span class="sourceLineNo">533</span><a name="line.533"></a> -<span class="sourceLineNo">534</span> @Override<a name="line.534"></a> -<span class="sourceLineNo">535</span> public Path getCurrentPath() {<a name="line.535"></a> -<span class="sourceLineNo">536</span> // only for testing<a name="line.536"></a> -<span class="sourceLineNo">537</span> for (ReplicationSourceShipper worker : workerThreads.values()) {<a name="line.537"></a> -<span class="sourceLineNo">538</span> if (worker.getCurrentPath() != null) {<a name="line.538"></a> -<span class="sourceLineNo">539</span> return worker.getCurrentPath();<a name="line.539"></a> -<span class="sourceLineNo">540</span> }<a name="line.540"></a> -<span class="sourceLineNo">541</span> }<a name="line.541"></a> -<span class="sourceLineNo">542</span> return null;<a name="line.542"></a> -<span class="sourceLineNo">543</span> }<a name="line.543"></a> -<span class="sourceLineNo">544</span><a name="line.544"></a> -<span class="sourceLineNo">545</span> @Override<a name="line.545"></a> -<span class="sourceLineNo">546</span> public boolean isSourceActive() {<a name="line.546"></a> -<span class="sourceLineNo">547</span> return !this.server.isStopped() && this.sourceRunning;<a name="line.547"></a> -<span class="sourceLineNo">548</span> }<a name="line.548"></a> -<span class="sourceLineNo">549</span><a name="line.549"></a> -<span class="sourceLineNo">550</span> /**<a name="line.550"></a> -<span class="sourceLineNo">551</span> * Comparator used to compare logs together based on their start time<a name="line.551"></a> -<span class="sourceLineNo">552</span> */<a name="line.552"></a> -<span class="sourceLineNo">553</span> public static class LogsComparator implements Comparator<Path> {<a name="line.553"></a> -<span class="sourceLineNo">554</span><a name="line.554"></a> -<span class="sourceLineNo">555</span> @Override<a name="line.555"></a> -<span class="sourceLineNo">556</span> public int compare(Path o1, Path o2) {<a name="line.556"></a> -<span class="sourceLineNo">557</span> return Long.compare(getTS(o1), getTS(o2));<a name="line.557"></a> -<span class="sourceLineNo">558</span> }<a name="line.558"></a> -<span class="sourceLineNo">559</span><a name="line.559"></a> -<span class="sourceLineNo">560</span> /**<a name="line.560"></a> -<span class="sourceLineNo">561</span> * Split a path to get the start time<a name="line.561"></a> -<span class="sourceLineNo">562</span> * For example: 10.20.20.171%3A60020.1277499063250<a name="line.562"></a> -<span class="sourceLineNo">563</span> * @param p path to split<a name="line.563"></a> -<span class="sourceLineNo">564</span> * @return start time<a name="line.564"></a> -<span class="sourceLineNo">565</span> */<a name="line.565"></a> -<span class="sourceLineNo">566</span> private static long getTS(Path p) {<a name="line.566"></a> -<span class="sourceLineNo">567</span> int tsIndex = p.getName().lastIndexOf('.') + 1;<a name="line.567"></a> -<span class="sourceLineNo">568</span> return Long.parseLong(p.getName().substring(tsIndex));<a name="line.568"></a> -<span class="sourceLineNo">569</span> }<a name="line.569"></a> -<span class="sourceLineNo">570</span> }<a name="line.570"></a> -<span class="sourceLineNo">571</span><a name="line.571"></a> -<span class="sourceLineNo">572</span> @Override<a name="line.572"></a> -<span class="sourceLineNo">573</span> public String getStats() {<a name="line.573"></a> -<span class="sourceLineNo">574</span> StringBuilder sb = new StringBuilder();<a name="line.574"></a> -<span class="sourceLineNo">575</span> sb.append("Total replicated edits: ").append(totalReplicatedEdits)<a name="line.575"></a> -<span class="sourceLineNo">576</span> .append(", current progress: \n");<a name="line.576"></a> -<span class="sourceLineNo">577</span> for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {<a name="line.577"></a> -<span class="sourceLineNo">578</span> String walGroupId = entry.getKey();<a name="line.578"></a> -<span class="sourceLineNo">579</span> ReplicationSourceShipper worker = entry.getValue();<a name="line.579"></a> -<span class="sourceLineNo">580</span> long position = worker.getCurrentPosition();<a name="line.580"></a> -<span class="sourceLineNo">581</span> Path currentPath = worker.getCurrentPath();<a name="line.581"></a> -<span class="sourceLineNo">582</span> sb.append("walGroup [").append(walGroupId).append("]: ");<a name="line.582"></a> -<span class="sourceLineNo">583</span> if (currentPath != null) {<a name="line.583"></a> -<span class="sourceLineNo">584</span> sb.append("currently replicating from: ").append(currentPath).append(" at position: ")<a name="line.584"></a> -<span class="sourceLineNo">585</span> .append(position).append("\n");<a name="line.585"></a> -<span class="sourceLineNo">586</span> } else {<a name="line.586"></a> -<span class="sourceLineNo">587</span> sb.append("no replication ongoing, waiting for new log");<a name="line.587"></a> -<span class="sourceLineNo">588</span> }<a name="line.588"></a> -<span class="sourceLineNo">589</span> }<a name="line.589"></a> -<span class="sourceLineNo">590</span> return sb.toString();<a name="line.590"></a> -<span class="sourceLineNo">591</span> }<a name="line.591"></a> -<span class="sourceLineNo">592</span><a name="line.592"></a> -<span class="sourceLineNo">593</span> @Override<a name="line.593"></a> -<span class="sourceLineNo">594</span> public MetricsSource getSourceMetrics() {<a name="line.594"></a> -<span class="sourceLineNo">595</span> return this.metrics;<a name="line.595"></a> -<span class="sourceLineNo">596</span> }<a name="line.596"></a> -<span class="sourceLineNo">597</span><a name="line.597"></a> -<span class="sourceLineNo">598</span> @Override<a name="line.598"></a> -<span class="sourceLineNo">599</span> public void postShipEdits(List<Entry> entries, int batchSize) {<a name="line.599"></a> -<span class="sourceLineNo">600</span> if (throttler.isEnabled()) {<a name="line.600"></a> -<span class="sourceLineNo">601</span> throttler.addPushSize(batchSize);<a name="line.601"></a> -<span class="sourceLineNo">602</span> }<a name="line.602"></a> -<span class="sourceLineNo">603</span> totalReplicatedEdits.addAndGet(entries.size());<a name="line.603"></a> -<span class="sourceLineNo">604</span> totalBufferUsed.addAndGet(-batchSize);<a name="line.604"></a> -<span class="sourceLineNo">605</span> }<a name="line.605"></a> -<span class="sourceLineNo">606</span><a name="line.606"></a> -<span class="sourceLineNo">607</span> @Override<a name="line.607"></a> -<span class="sourceLineNo">608</span> public WALFileLengthProvider getWALFileLengthProvider() {<a name="line.608"></a> -<span class="sourceLineNo">609</span> return walFileLengthProvider;<a name="line.609"></a> -<span class="sourceLineNo">610</span> }<a name="line.610"></a> -<span class="sourceLineNo">611</span><a name="line.611"></a> -<span class="sourceLineNo">612</span> @Override<a name="line.612"></a> -<span class="sourceLineNo">613</span> public ServerName getServerWALsBelongTo() {<a name="line.613"></a> -<span class="sourceLineNo">614</span> return server.getServerName();<a name="line.614"></a> -<span class="sourceLineNo">615</span> }<a name="line.615"></a> -<span class="sourceLineNo">616</span><a name="line.616"></a> -<span class="sourceLineNo">617</span> Server getServer() {<a name="line.617"></a> -<span class="sourceLineNo">618</span> return server;<a name="line.618"></a> -<span class="sourceLineNo">619</span> }<a name="line.619"></a> -<span class="sourceLineNo">620</span><a name="line.620"></a> -<span class="sourceLineNo">621</span> ReplicationQueueStorage getQueueStorage() {<a name="line.621"></a> -<span class="sourceLineNo">622</span> return queueStorage;<a name="line.622"></a> -<span class="sourceLineNo">623</span> }<a name="line.623"></a> -<span class="sourceLineNo">624</span>}<a name="line.624"></a> +<span class="sourceLineNo">522</span> this.metrics.clear();<a name="line.522"></a> +<span class="sourceLineNo">523</span> }<a name="line.523"></a> +<span class="sourceLineNo">524</span><a name="line.524"></a> +<span class="sourceLineNo">525</span> @Override<a name="line.525"></a> +<span class="sourceLineNo">526</span> public String getQueueId() {<a name="line.526"></a> +<span class="sourceLineNo">527</span> return this.queueId;<a name="line.527"></a> +<span class="sourceLineNo">528</span> }<a name="line.528"></a> +<span class="sourceLineNo">529</span><a name="line.529"></a> +<span class="sourceLineNo">530</span> @Override<a name="line.530"></a> +<span class="sourceLineNo">531</span> public String getPeerId() {<a name="line.531"></a> +<span class="sourceLineNo">532</span> return this.peerId;<a name="line.532"></a> +<span class="sourceLineNo">533</span> }<a name="line.533"></a> +<span class="sourceLineNo">534</span><a name="line.534"></a> +<span class="sourceLineNo">535</span> @Override<a name="line.535"></a> +<span class="sourceLineNo">536</span> public Path getCurrentPath() {<a name="line.536"></a> +<span class="sourceLineNo">537</span> // only for testing<a name="line.537"></a> +<span class="sourceLineNo">538</span> for (ReplicationSourceShipper worker : workerThreads.values()) {<a name="line.538"></a> +<span class="sourceLineNo">539</span> if (worker.getCurrentPath() != null) {<a name="line.539"></a> +<span class="sourceLineNo">540</span> return worker.getCurrentPath();<a name="line.540"></a> +<span class="sourceLineNo">541</span> }<a name="line.541"></a> +<span class="sourceLineNo">542</span> }<a name="line.542"></a> +<span class="sourceLineNo">543</span> return null;<a name="line.543"></a> +<span class="sourceLineNo">544</span> }<a name="line.544"></a> +<span class="sourceLineNo">545</span><a name="line.545"></a> +<span class="sourceLineNo">546</span> @Override<a name="line.546"></a> +<span class="sourceLineNo">547</span> public boolean isSourceActive() {<a name="line.547"></a> +<span class="sourceLineNo">548</span> return !this.server.isStopped() && this.sourceRunning;<a name="line.548"></a> +<span class="sourceLineNo">549</span> }<a name="line.549"></a> +<span class="sourceLineNo">550</span><a name="line.550"></a> +<span class="sourceLineNo">551</span> /**<a name="line.551"></a> +<span class="sourceLineNo">552</span> * Comparator used to compare logs together based on their start time<a name="line.552"></a> +<span class="sourceLineNo">553</span> */<a name="line.553"></a> +<span class="sourceLineNo">554</span> public static class LogsComparator implements Comparator<Path> {<a name="line.554"></a> +<span class="sourceLineNo">555</span><a name="line.555"></a> +<span class="sourceLineNo">556</span> @Override<a name="line.556"></a> +<span class="sourceLineNo">557</span> public int compare(Path o1, Path o2) {<a name="line.557"></a> +<span class="sourceLineNo">558</span> return Long.compare(getTS(o1), getTS(o2));<a name="line.558"></a> +<span class="sourceLineNo">559</span> }<a name="line.559"></a> +<span class="sourceLineNo">560</span><a name="line.560"></a> +<span class="sourceLineNo">561</span> /**<a name="line.561"></a> +<span class="sourceLineNo">562</span> * Split a path to get the start time<a name="line.562"></a> +<span class="sourceLineNo">563</span> * For example: 10.20.20.171%3A60020.1277499063250<a name="line.563"></a> +<span class="sourceLineNo">564</span> * @param p path to split<a name="line.564"></a> +<span class="sourceLineNo">565</span> * @return start time<a name="line.565"></a> +<span class="sourceLineNo">566</span> */<a name="line.566"></a> +<span class="sourceLineNo">567</span> private static long getTS(Path p) {<a name="line.567"></a> +<span class="sourceLineNo">568</span> int tsIndex = p.getName().lastIndexOf('.') + 1;<a name="line.568"></a> +<span class="sourceLineNo">569</span> return Long.parseLong(p.getName().substring(tsIndex));<a name="line.569"></a> +<span class="sourceLineNo">570</span> }<a name="line.570"></a> +<span class="sourceLineNo">571</span> }<a name="line.571"></a> +<span class="sourceLineNo">572</span><a name="line.572"></a> +<span class="sourceLineNo">573</span> @Override<a name="line.573"></a> +<span class="sourceLineNo">574</span> public String getStats() {<a name="line.574"></a> +<span class="sourceLineNo">575</span> StringBuilder sb = new StringBuilder();<a name="line.575"></a> +<span class="sourceLineNo">576</span> sb.append("Total replicated edits: ").append(totalReplicatedEdits)<a name="line.576"></a> +<span class="sourceLineNo">577</span> .append(", current progress: \n");<a name="line.577"></a> +<span class="sourceLineNo">578</span> for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {<a name="line.578"></a> +<span class="sourceLineNo">579</span> String walGroupId = entry.getKey();<a name="line.579"></a> +<span class="sourceLineNo">580</span> ReplicationSourceShipper worker = entry.getValue();<a name="line.580"></a> +<span class="sourceLineNo">581</span> long position = worker.getCurrentPosition();<a name="line.581"></a> +<span class="sourceLineNo">582</span> Path currentPath = worker.getCurrentPath();<a name="line.582"></a> +<span class="sourceLineNo">583</span> sb.append("walGroup [").append(walGroupId).append("]: ");<a name="line.583"></a> +<span class="sourceLineNo">584</span> if (currentPath != null) {<a name="line.584"></a> +<span class="sourceLineNo">585</span> sb.append("currently replicating from: ").append(currentPath).append(" at position: ")<a name="line.585"></a> +<span class="sourceLineNo">586</span> .append(position).append("\n");<a name="line.586"></a> +<span class="sourceLineNo">587</span> } else {<a name="line.587"></a> +<span class="sourceLineNo">588</span> sb.append("no replication ongoing, waiting for new log");<a name="line.588"></a> +<span class="sourceLineNo">589</span> }<a name="line.589"></a> +<span class="sourceLineNo">590</span> }<a name="line.590"></a> +<span class="sourceLineNo">591</span> return sb.toString();<a name="line.591"></a> +<span class="sourceLineNo">592</span> }<a name="line.592"></a> +<span class="sourceLineNo">593</span><a name="line.593"></a> +<span class="sourceLineNo">594</span> @Override<a name="line.594"></a> +<span class="sourceLineNo">595</span> public MetricsSource getSourceMetrics() {<a name="line.595"></a> +<span class="sourceLineNo">596</span> return this.metrics;<a name="line.596"></a> +<span class="sourceLineNo">597</span> }<a name="line.597"></a> +<span class="sourceLineNo">598</span><a name="line.598"></a> +<span class="sourceLineNo">599</span> @Override<a name="line.599"></a> +<span class="sourceLineNo">600</span> public void postShipEdits(List<Entry> entries, int batchSize) {<a name="line.600"></a> +<span class="sourceLineNo">601</span> if (throttler.isEnabled()) {<a name="line.601"></a> +<span class="sourceLineNo">602</span> throttler.addPushSize(batchSize);<a name="line.602"></a> +<span class="sourceLineNo">603</span> }<a name="line.603"></a> +<span class="sourceLineNo">604</span> totalReplicatedEdits.addAndGet(entries.size());<a name="line.604"></a> +<span class="sourceLineNo">605</span> totalBufferUsed.addAndGet(-batchSize);<a name="line.605"></a> +<span class="sourceLineNo">606</span> }<a name="line.606"></a> +<span class="sourceLineNo">607</span><a name="line.607"></a> +<span class="sourceLineNo">608</span> @Override<a name="line.608"></a> +<span class="sourceLineNo">609</span> public WALFileLengthProvider getWALFileLengthProvider() {<a name="line.609"></a> +<span class="sourceLineNo">610</span> return walFileLengthProvider;<a name="line.610"></a> +<span class="sourceLineNo">611</span> }<a name="line.611"></a> +<span class="sourceLineNo">612</span><a name="line.612"></a> +<span class="sourceLineNo">613</span> @Override<a name="line.613"></a> +<span class="sourceLineNo">614</span> public ServerName getServerWALsBelongTo() {<a name="line.614"></a> +<span class="sourceLineNo">615</span> return server.getServerName();<a name="line.615"></a> +<span class="sourceLineNo">616</span> }<a name="line.616"></a> +<span class="sourceLineNo">617</span><a name="line.617"></a> +<span class="sourceLineNo">618</span> Server getServer() {<a name="line.618"></a> +<span class="sourceLineNo">619</span> return server;<a name="line.619"></a> +<span class="sourceLineNo">620</span> }<a name="line.620"></a> +<span class="sourceLineNo">621</span><a name="line.621"></a> +<span class="sourceLineNo">622</span> ReplicationQueueStorage getQueueStorage() {<a name="line.622"></a> +<span class="sourceLineNo">623</span> return queueStorage;<a name="line.623"></a> +<span class="sourceLineNo">624</span> }<a name="line.624"></a> +<span class="sourceLineNo">625</span>}<a name="line.625"></a>