This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push: new 18a2af493aa HBASE-28090 Make entryReader field final in ReplicationSourceShipper class (#5409) 18a2af493aa is described below commit 18a2af493aa471a303a954cc8c4d2d36ac293bec Author: Duo Zhang <zhang...@apache.org> AuthorDate: Tue Sep 19 07:54:11 2023 +0800 HBASE-28090 Make entryReader field final in ReplicationSourceShipper class (#5409) Signed-off-by: Wellington Chevreuil <wchevre...@apache.org> (cherry picked from commit 787d524d55c794d0703bcc3c15b4538ba96803e9) --- .../regionserver/RecoveredReplicationSource.java | 5 +++-- .../RecoveredReplicationSourceShipper.java | 4 ++-- .../replication/regionserver/ReplicationSource.java | 14 ++++++-------- .../regionserver/ReplicationSourceShipper.java | 21 ++++++++------------- .../regionserver/TestReplicationSource.java | 3 +-- 5 files changed, 20 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index e740a01dc4f..e9062472221 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -27,8 +27,9 @@ import org.apache.yetus.audience.InterfaceAudience; public class RecoveredReplicationSource extends ReplicationSource { @Override - protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) { - return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage, + protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId, + ReplicationSourceWALReader walReader) { + return new RecoveredReplicationSourceShipper(conf, walGroupId, this, walReader, queueStorage, () -> { if (workerThreads.isEmpty()) { this.getSourceMetrics().clear(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 2bb3a7c3591..ece566d9600 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -30,9 +30,9 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper private final Runnable tryFinish; public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, - ReplicationSourceLogQueue logQueue, RecoveredReplicationSource source, + RecoveredReplicationSource source, ReplicationSourceWALReader walReader, ReplicationQueueStorage queueStorage, Runnable tryFinish) { - super(conf, walGroupId, logQueue, source); + super(conf, walGroupId, source, walReader); this.tryFinish = tryFinish; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index e4da44e9b13..00be66c5c0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -367,14 +367,13 @@ public class ReplicationSource implements ReplicationSourceInterface { return value; } else { LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId); - ReplicationSourceShipper worker = createNewShipper(walGroupId); ReplicationSourceWALReader walReader = createNewWALReader(walGroupId, getStartOffset(walGroupId)); + ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader); Threads.setDaemonThreadRunning( walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader." + walGroupId + "," + queueId, (t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); - worker.setWALReader(walReader); worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); return worker; } @@ -428,8 +427,9 @@ public class ReplicationSource implements ReplicationSourceInterface { return fileSize; } - protected ReplicationSourceShipper createNewShipper(String walGroupId) { - return new ReplicationSourceShipper(conf, walGroupId, logQueue, this); + protected ReplicationSourceShipper createNewShipper(String walGroupId, + ReplicationSourceWALReader walReader) { + return new ReplicationSourceShipper(conf, walGroupId, this, walReader); } private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) { @@ -665,7 +665,7 @@ public class ReplicationSource implements ReplicationSourceInterface { terminate(reason, cause, clearMetrics, true); } - public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { + private void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { if (cause == null) { LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); } else { @@ -684,9 +684,7 @@ public class ReplicationSource implements ReplicationSourceInterface { for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); - if (worker.entryReader != null) { - worker.entryReader.setReaderRunning(false); - } + worker.entryReader.setReaderRunning(false); } if (this.replicationEndpoint != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 7b863dc35ae..6d0730d76b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -55,9 +55,8 @@ public class ReplicationSourceShipper extends Thread { } private final Configuration conf; - protected final String walGroupId; - protected final ReplicationSourceLogQueue logQueue; - protected final ReplicationSource source; + final String walGroupId; + private final ReplicationSource source; // Last position in the log that we sent to ZooKeeper // It will be accessed by the stats thread so make it volatile @@ -66,22 +65,22 @@ public class ReplicationSourceShipper extends Thread { private Path currentPath; // Current state of the worker thread private volatile WorkerState state; - protected ReplicationSourceWALReader entryReader; + final ReplicationSourceWALReader entryReader; // How long should we sleep for each retry - protected final long sleepForRetries; + private final long sleepForRetries; // Maximum number of retries before taking bold actions - protected final int maxRetriesMultiplier; + private final int maxRetriesMultiplier; private final int DEFAULT_TIMEOUT = 20000; private final int getEntriesTimeout; private final int shipEditsTimeout; - public ReplicationSourceShipper(Configuration conf, String walGroupId, - ReplicationSourceLogQueue logQueue, ReplicationSource source) { + public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source, + ReplicationSourceWALReader walReader) { this.conf = conf; this.walGroupId = walGroupId; - this.logQueue = logQueue; this.source = source; + this.entryReader = walReader; // 1 second this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 5 minutes @ 1 sec per @@ -295,10 +294,6 @@ public class ReplicationSourceShipper extends Thread { return currentPosition; } - void setWALReader(ReplicationSourceWALReader entryReader) { - this.entryReader = entryReader; - } - protected boolean isActive() { return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 707bab875d2..53996c37664 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -291,8 +291,7 @@ public class TestReplicationSource { mock(MetricsSource.class)); ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null, conf, null, 0, null, source, null); - ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, null, source); - shipper.entryReader = reader; + ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, source, reader); source.workerThreads.put("testPeer", shipper); WALEntryBatch batch = new WALEntryBatch(10, logDir); WAL.Entry mockEntry = mock(WAL.Entry.class);