This is an automated email from the ASF dual-hosted git repository. sunxin pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push: new a3edcc2 HBASE-25562 ReplicationSourceWALReader log and handle exception immediately without retrying (#2943) a3edcc2 is described below commit a3edcc285481357446ed02effbb83a4142402c8f Author: XinSun <ddu...@gmail.com> AuthorDate: Sat Feb 20 10:20:54 2021 +0800 HBASE-25562 ReplicationSourceWALReader log and handle exception immediately without retrying (#2943) Signed-off-by: Wellington Chevreuil <wchevre...@apache.org> Signed-off-by: stack <st...@apache.org> Signed-off-by: shahrs87 --- .../regionserver/ReplicationSourceWALReader.java | 30 ++++++++++++---------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 52ac144..f52a83a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -150,14 +150,13 @@ class ReplicationSourceWALReader extends Thread { } } } catch (IOException e) { // stream related - if (sleepMultiplier < maxRetriesMultiplier) { - LOG.debug("Failed to read stream of replication entries: " + e); - sleepMultiplier++; - } else { - LOG.error("Failed to read stream of replication entries", e); - handleEofException(e); + if (!handleEofException(e)) { + LOG.warn("Failed to read stream of replication entries", e); + if (sleepMultiplier < maxRetriesMultiplier) { + sleepMultiplier ++; + } + Threads.sleep(sleepForRetries * sleepMultiplier); } - Threads.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { LOG.trace("Interrupted while sleeping between WAL reads"); Thread.currentThread().interrupt(); @@ -244,10 +243,13 @@ class ReplicationSourceWALReader extends Thread { } } - // if we get an EOF due to a zero-length log, and there are other logs in queue - // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is - // enabled, then dump the log - private void handleEofException(IOException e) { + /** + * if we get an EOF due to a zero-length log, and there are other logs in queue + * (highly likely we've closed the current log), and autorecovery is + * enabled, then dump the log + * @return true only the IOE can be handled + */ + private boolean handleEofException(IOException e) { PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId); // Dump the log even if logQueue size is 1 if the source is from recovered Source // since we don't add current log to recovered source queue so it is safe to remove. @@ -255,14 +257,16 @@ class ReplicationSourceWALReader extends Thread { (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) { try { if (fs.getFileStatus(queue.peek()).getLen() == 0) { - LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek()); + LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek()); logQueue.remove(walGroupId); currentPosition = 0; + return true; } } catch (IOException ioe) { - LOG.warn("Couldn't get file length information about log " + queue.peek()); + LOG.warn("Couldn't get file length information about log {}", queue.peek()); } } + return false; } public Path getCurrentPath() {