Apache9 commented on a change in pull request #2255:
URL: https://github.com/apache/hbase/pull/2255#discussion_r472636537



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -290,7 +290,22 @@ private boolean updateLogPosition(WALEntryBatch batch) {
   public void startup(UncaughtExceptionHandler handler) {
     String name = Thread.currentThread().getName();
     Threads.setDaemonThreadRunning(this,
-      name + ".replicationSource.shipper" + walGroupId + "," + 
source.getQueueId(), handler);
+      name + ".replicationSource.shipper" + walGroupId + "," + 
source.getQueueId(),
+      (t,e) -> {

Review comment:
       OK, the code is almost the same... Then I think we could move the logic 
into uncaughtException method? If abortOnError is true, we about, otherwise we 
will try to refresh the source.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -373,7 +389,21 @@ private void tryStartNewShipper(String walGroupId, 
PriorityBlockingQueue<Path> q
         Threads.setDaemonThreadRunning(
             walReader, Thread.currentThread().getName()
                 + ".replicationSource.wal-reader." + walGroupId + "," + 
queueId,
-            this::uncaughtException);
+                (t,e) -> {

Review comment:
       So here it is for wal reader. I think refreshSources and retry is an 
acceptable way. Then let's just test the abortOnError flag here? If it is true, 
we will call uncaughtException, otherwise we will try to refresh the 
replication source.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -583,16 +617,27 @@ private void initialize() {
       PriorityBlockingQueue<Path> queue = entry.getValue();
       tryStartNewShipper(walGroupId, queue);
     }
+    this.startupOngoing.set(false);
   }
 
   @Override
   public void startup() {
     // mark we are running now
     this.sourceRunning = true;
-    initThread = new Thread(this::initialize);
-    Threads.setDaemonThreadRunning(initThread,
-      Thread.currentThread().getName() + ".replicationSource," + this.queueId,
-      this::uncaughtException);
+    this.retryStartup.set(true);

Review comment:
       This flag is only used in this method? Let's use a local var instead of 
a class member field?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -583,16 +617,27 @@ private void initialize() {
       PriorityBlockingQueue<Path> queue = entry.getValue();
       tryStartNewShipper(walGroupId, queue);
     }
+    this.startupOngoing.set(false);
   }
 
   @Override
   public void startup() {
     // mark we are running now
     this.sourceRunning = true;
-    initThread = new Thread(this::initialize);
-    Threads.setDaemonThreadRunning(initThread,
-      Thread.currentThread().getName() + ".replicationSource," + this.queueId,
-      this::uncaughtException);
+    this.retryStartup.set(true);
+    do {
+      if(retryStartup.get()) {
+        retryStartup.set(false);
+        startupOngoing.set(true);

Review comment:
       So this one is exactly the same with source.isActive? Can we just make 
use of that flag instead of introducing a new one?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to