Apache9 commented on a change in pull request #2255: URL: https://github.com/apache/hbase/pull/2255#discussion_r472636537
########## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java ########## @@ -290,7 +290,22 @@ private boolean updateLogPosition(WALEntryBatch batch) { public void startup(UncaughtExceptionHandler handler) { String name = Thread.currentThread().getName(); Threads.setDaemonThreadRunning(this, - name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); + name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), + (t,e) -> { Review comment: OK, the code is almost the same... Then I think we could move the logic into uncaughtException method? If abortOnError is true, we about, otherwise we will try to refresh the source. ########## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ########## @@ -373,7 +389,21 @@ private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> q Threads.setDaemonThreadRunning( walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader." + walGroupId + "," + queueId, - this::uncaughtException); + (t,e) -> { Review comment: So here it is for wal reader. I think refreshSources and retry is an acceptable way. Then let's just test the abortOnError flag here? If it is true, we will call uncaughtException, otherwise we will try to refresh the replication source. ########## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ########## @@ -583,16 +617,27 @@ private void initialize() { PriorityBlockingQueue<Path> queue = entry.getValue(); tryStartNewShipper(walGroupId, queue); } + this.startupOngoing.set(false); } @Override public void startup() { // mark we are running now this.sourceRunning = true; - initThread = new Thread(this::initialize); - Threads.setDaemonThreadRunning(initThread, - Thread.currentThread().getName() + ".replicationSource," + this.queueId, - this::uncaughtException); + this.retryStartup.set(true); Review comment: This flag is only used in this method? Let's use a local var instead of a class member field? ########## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ########## @@ -583,16 +617,27 @@ private void initialize() { PriorityBlockingQueue<Path> queue = entry.getValue(); tryStartNewShipper(walGroupId, queue); } + this.startupOngoing.set(false); } @Override public void startup() { // mark we are running now this.sourceRunning = true; - initThread = new Thread(this::initialize); - Threads.setDaemonThreadRunning(initThread, - Thread.currentThread().getName() + ".replicationSource," + this.queueId, - this::uncaughtException); + this.retryStartup.set(true); + do { + if(retryStartup.get()) { + retryStartup.set(false); + startupOngoing.set(true); Review comment: So this one is exactly the same with source.isActive? Can we just make use of that flag instead of introducing a new one? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org