1458451310 commented on code in PR #2546: URL: https://github.com/apache/hbase/pull/2546#discussion_r1574592005
########## hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java: ########## @@ -325,4 +327,53 @@ void stopWorker() { public boolean isFinished() { return state == WorkerState.FINISHED; } + + /** + * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>, + * in case there were unprocessed entries batched by the reader to the shipper, + * but the shipper didn't manage to ship those because the replication source is being terminated. + * In that case, it iterates through the batched entries and decrease the pending + * entries size from <code>ReplicationSourceManager.totalBufferUser</code> + * <p/> + * <b>NOTES</b> + * 1) This method should only be called upon replication source termination. + * It blocks waiting for both shipper and reader threads termination, + * to make sure no race conditions + * when updating <code>ReplicationSourceManager.totalBufferUser</code>. + * + * 2) It <b>does not</b> attempt to terminate reader and shipper threads. Those <b>must</b> + * have been triggered interruption/termination prior to calling this method. + */ + void clearWALEntryBatch() { + long timeout = System.currentTimeMillis() + this.shipEditsTimeout; + while(this.isAlive() || this.entryReader.isAlive()){ + try { + if (System.currentTimeMillis() >= timeout) { + LOG.warn("Interrupting source thread for peer {} without cleaning buffer usage " + + "because clearWALEntryBatch method timed out whilst waiting reader/shipper " + + "thread to stop.", this.source.getPeerId()); + Thread.currentThread().interrupt(); Review Comment: if return, then we do not clean the batch, so replication quota will be leaked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org