This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new a3edcc2  HBASE-25562 ReplicationSourceWALReader log and handle 
exception immediately without retrying (#2943)
a3edcc2 is described below

commit a3edcc285481357446ed02effbb83a4142402c8f
Author: XinSun <ddu...@gmail.com>
AuthorDate: Sat Feb 20 10:20:54 2021 +0800

    HBASE-25562 ReplicationSourceWALReader log and handle exception immediately 
without retrying (#2943)
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
    Signed-off-by: stack <st...@apache.org>
    Signed-off-by: shahrs87
---
 .../regionserver/ReplicationSourceWALReader.java   | 30 ++++++++++++----------
 1 file changed, 17 insertions(+), 13 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 52ac144..f52a83a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -150,14 +150,13 @@ class ReplicationSourceWALReader extends Thread {
           }
         }
       } catch (IOException e) { // stream related
-        if (sleepMultiplier < maxRetriesMultiplier) {
-          LOG.debug("Failed to read stream of replication entries: " + e);
-          sleepMultiplier++;
-        } else {
-          LOG.error("Failed to read stream of replication entries", e);
-          handleEofException(e);
+        if (!handleEofException(e)) {
+          LOG.warn("Failed to read stream of replication entries", e);
+          if (sleepMultiplier < maxRetriesMultiplier) {
+            sleepMultiplier ++;
+          }
+          Threads.sleep(sleepForRetries * sleepMultiplier);
         }
-        Threads.sleep(sleepForRetries * sleepMultiplier);
       } catch (InterruptedException e) {
         LOG.trace("Interrupted while sleeping between WAL reads");
         Thread.currentThread().interrupt();
@@ -244,10 +243,13 @@ class ReplicationSourceWALReader extends Thread {
     }
   }
 
-  // if we get an EOF due to a zero-length log, and there are other logs in 
queue
-  // (highly likely we've closed the current log), we've hit the max retries, 
and autorecovery is
-  // enabled, then dump the log
-  private void handleEofException(IOException e) {
+  /**
+   * if we get an EOF due to a zero-length log, and there are other logs in 
queue
+   * (highly likely we've closed the current log), and autorecovery is
+   * enabled, then dump the log
+   * @return true only the IOE can be handled
+   */
+  private boolean handleEofException(IOException e) {
     PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
     // Dump the log even if logQueue size is 1 if the source is from recovered 
Source
     // since we don't add current log to recovered source queue so it is safe 
to remove.
@@ -255,14 +257,16 @@ class ReplicationSourceWALReader extends Thread {
       (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
       try {
         if (fs.getFileStatus(queue.peek()).getLen() == 0) {
-          LOG.warn("Forcing removal of 0 length log in queue: " + 
queue.peek());
+          LOG.warn("Forcing removal of 0 length log in queue: {}", 
queue.peek());
           logQueue.remove(walGroupId);
           currentPosition = 0;
+          return true;
         }
       } catch (IOException ioe) {
-        LOG.warn("Couldn't get file length information about log " + 
queue.peek());
+        LOG.warn("Couldn't get file length information about log {}", 
queue.peek());
       }
     }
+    return false;
   }
 
   public Path getCurrentPath() {

Reply via email to