This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 18a2af493aa HBASE-28090 Make entryReader field final in 
ReplicationSourceShipper class (#5409)
18a2af493aa is described below

commit 18a2af493aa471a303a954cc8c4d2d36ac293bec
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Tue Sep 19 07:54:11 2023 +0800

    HBASE-28090 Make entryReader field final in ReplicationSourceShipper class 
(#5409)
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
    (cherry picked from commit 787d524d55c794d0703bcc3c15b4538ba96803e9)
---
 .../regionserver/RecoveredReplicationSource.java    |  5 +++--
 .../RecoveredReplicationSourceShipper.java          |  4 ++--
 .../replication/regionserver/ReplicationSource.java | 14 ++++++--------
 .../regionserver/ReplicationSourceShipper.java      | 21 ++++++++-------------
 .../regionserver/TestReplicationSource.java         |  3 +--
 5 files changed, 20 insertions(+), 27 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index e740a01dc4f..e9062472221 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -27,8 +27,9 @@ import org.apache.yetus.audience.InterfaceAudience;
 public class RecoveredReplicationSource extends ReplicationSource {
 
   @Override
-  protected RecoveredReplicationSourceShipper createNewShipper(String 
walGroupId) {
-    return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, 
this, queueStorage,
+  protected RecoveredReplicationSourceShipper createNewShipper(String 
walGroupId,
+    ReplicationSourceWALReader walReader) {
+    return new RecoveredReplicationSourceShipper(conf, walGroupId, this, 
walReader, queueStorage,
       () -> {
         if (workerThreads.isEmpty()) {
           this.getSourceMetrics().clear();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 2bb3a7c3591..ece566d9600 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -30,9 +30,9 @@ public class RecoveredReplicationSourceShipper extends 
ReplicationSourceShipper
   private final Runnable tryFinish;
 
   public RecoveredReplicationSourceShipper(Configuration conf, String 
walGroupId,
-    ReplicationSourceLogQueue logQueue, RecoveredReplicationSource source,
+    RecoveredReplicationSource source, ReplicationSourceWALReader walReader,
     ReplicationQueueStorage queueStorage, Runnable tryFinish) {
-    super(conf, walGroupId, logQueue, source);
+    super(conf, walGroupId, source, walReader);
     this.tryFinish = tryFinish;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index e4da44e9b13..00be66c5c0f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -367,14 +367,13 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
         return value;
       } else {
         LOG.debug("{} starting shipping worker for walGroupId={}", 
logPeerId(), walGroupId);
-        ReplicationSourceShipper worker = createNewShipper(walGroupId);
         ReplicationSourceWALReader walReader =
           createNewWALReader(walGroupId, getStartOffset(walGroupId));
+        ReplicationSourceShipper worker = createNewShipper(walGroupId, 
walReader);
         Threads.setDaemonThreadRunning(
           walReader, Thread.currentThread().getName() + 
".replicationSource.wal-reader."
             + walGroupId + "," + queueId,
           (t, e) -> this.uncaughtException(t, e, this.manager, 
this.getPeerId()));
-        worker.setWALReader(walReader);
         worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, 
this.getPeerId()));
         return worker;
       }
@@ -428,8 +427,9 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     return fileSize;
   }
 
-  protected ReplicationSourceShipper createNewShipper(String walGroupId) {
-    return new ReplicationSourceShipper(conf, walGroupId, logQueue, this);
+  protected ReplicationSourceShipper createNewShipper(String walGroupId,
+    ReplicationSourceWALReader walReader) {
+    return new ReplicationSourceShipper(conf, walGroupId, this, walReader);
   }
 
   private ReplicationSourceWALReader createNewWALReader(String walGroupId, 
long startPosition) {
@@ -665,7 +665,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     terminate(reason, cause, clearMetrics, true);
   }
 
-  public void terminate(String reason, Exception cause, boolean clearMetrics, 
boolean join) {
+  private void terminate(String reason, Exception cause, boolean clearMetrics, 
boolean join) {
     if (cause == null) {
       LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, 
reason);
     } else {
@@ -684,9 +684,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
 
     for (ReplicationSourceShipper worker : workers) {
       worker.stopWorker();
-      if (worker.entryReader != null) {
-        worker.entryReader.setReaderRunning(false);
-      }
+      worker.entryReader.setReaderRunning(false);
     }
 
     if (this.replicationEndpoint != null) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 7b863dc35ae..6d0730d76b6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -55,9 +55,8 @@ public class ReplicationSourceShipper extends Thread {
   }
 
   private final Configuration conf;
-  protected final String walGroupId;
-  protected final ReplicationSourceLogQueue logQueue;
-  protected final ReplicationSource source;
+  final String walGroupId;
+  private final ReplicationSource source;
 
   // Last position in the log that we sent to ZooKeeper
   // It will be accessed by the stats thread so make it volatile
@@ -66,22 +65,22 @@ public class ReplicationSourceShipper extends Thread {
   private Path currentPath;
   // Current state of the worker thread
   private volatile WorkerState state;
-  protected ReplicationSourceWALReader entryReader;
+  final ReplicationSourceWALReader entryReader;
 
   // How long should we sleep for each retry
-  protected final long sleepForRetries;
+  private final long sleepForRetries;
   // Maximum number of retries before taking bold actions
-  protected final int maxRetriesMultiplier;
+  private final int maxRetriesMultiplier;
   private final int DEFAULT_TIMEOUT = 20000;
   private final int getEntriesTimeout;
   private final int shipEditsTimeout;
 
-  public ReplicationSourceShipper(Configuration conf, String walGroupId,
-    ReplicationSourceLogQueue logQueue, ReplicationSource source) {
+  public ReplicationSourceShipper(Configuration conf, String walGroupId, 
ReplicationSource source,
+    ReplicationSourceWALReader walReader) {
     this.conf = conf;
     this.walGroupId = walGroupId;
-    this.logQueue = logQueue;
     this.source = source;
+    this.entryReader = walReader;
     // 1 second
     this.sleepForRetries = 
this.conf.getLong("replication.source.sleepforretries", 1000);
     // 5 minutes @ 1 sec per
@@ -295,10 +294,6 @@ public class ReplicationSourceShipper extends Thread {
     return currentPosition;
   }
 
-  void setWALReader(ReplicationSourceWALReader entryReader) {
-    this.entryReader = entryReader;
-  }
-
   protected boolean isActive() {
     return source.isSourceActive() && state == WorkerState.RUNNING && 
!isInterrupted();
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 707bab875d2..53996c37664 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -291,8 +291,7 @@ public class TestReplicationSource {
       mock(MetricsSource.class));
     ReplicationSourceWALReader reader =
       new ReplicationSourceWALReader(null, conf, null, 0, null, source, null);
-    ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, 
null, null, source);
-    shipper.entryReader = reader;
+    ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, 
null, source, reader);
     source.workerThreads.put("testPeer", shipper);
     WALEntryBatch batch = new WALEntryBatch(10, logDir);
     WAL.Entry mockEntry = mock(WAL.Entry.class);

Reply via email to