This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 3dfde543c75 HBASE-27157 Potential race condition in WorkerAssigner
(#4577)
3dfde543c75 is described below
commit 3dfde543c75312f95683b8fa48db41fb51267bac
Author: Ruan Hui <[email protected]>
AuthorDate: Wed Jul 6 10:59:13 2022 +0800
HBASE-27157 Potential race condition in WorkerAssigner (#4577)
Close #7299
Co-authored-by: Duo Zhang <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Lijin Bin <[email protected]>
(cherry picked from commit 0d1ff8aa9bc21b73f2cf624d35fdcea1417de613)
---
.../hadoop/hbase/master/SplitWALManager.java | 18 +--
.../apache/hadoop/hbase/master/WorkerAssigner.java | 33 ++---
.../master/procedure/SnapshotVerifyProcedure.java | 3 +-
.../hbase/master/procedure/SplitWALProcedure.java | 2 +-
.../hbase/master/snapshot/SnapshotManager.java | 16 +--
.../hadoop/hbase/master/TestSplitWALManager.java | 136 +++++++++++----------
6 files changed, 100 insertions(+), 108 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
index 18dfc7d493b..32b2f4d21f2 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -35,7 +34,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
@@ -153,25 +151,19 @@ public class SplitWALManager {
*/
public ServerName acquireSplitWALWorker(Procedure<?> procedure)
throws ProcedureSuspendedException {
- Optional<ServerName> worker = splitWorkerAssigner.acquire();
- if (worker.isPresent()) {
- LOG.debug("Acquired split WAL worker={}", worker.get());
- return worker.get();
- }
- splitWorkerAssigner.suspend(procedure);
- throw new ProcedureSuspendedException();
+ ServerName worker = splitWorkerAssigner.acquire(procedure);
+ LOG.debug("Acquired split WAL worker={}", worker);
+ return worker;
}
/**
* After the worker finished the split WAL task, it will release the worker,
and wake up all the
* suspend procedures in the ProcedureEvent
- * @param worker worker which is about to release
- * @param scheduler scheduler which is to wake up the procedure event
+ * @param worker worker which is about to release
*/
- public void releaseSplitWALWorker(ServerName worker,
MasterProcedureScheduler scheduler) {
+ public void releaseSplitWALWorker(ServerName worker) {
LOG.debug("Release split WAL worker={}", worker);
splitWorkerAssigner.release(worker);
- splitWorkerAssigner.wake(scheduler);
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java
index b6df41acee2..7b1ec80cab4 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java
@@ -23,9 +23,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -51,36 +51,37 @@ public class WorkerAssigner implements ServerListener {
}
}
- public synchronized Optional<ServerName> acquire() {
+ public synchronized ServerName acquire(Procedure<?> proc) throws
ProcedureSuspendedException {
List<ServerName> serverList =
master.getServerManager().getOnlineServersList();
Collections.shuffle(serverList);
Optional<ServerName> worker = serverList.stream()
.filter(
serverName -> !currentWorkers.containsKey(serverName) ||
currentWorkers.get(serverName) > 0)
.findAny();
- worker.ifPresent(name -> currentWorkers.compute(name, (serverName,
- availableWorker) -> availableWorker == null ? maxTasks - 1 :
availableWorker - 1));
- return worker;
+ if (worker.isPresent()) {
+ ServerName sn = worker.get();
+ currentWorkers.compute(sn, (serverName,
+ availableWorker) -> availableWorker == null ? maxTasks - 1 :
availableWorker - 1);
+ return sn;
+ } else {
+ event.suspend();
+ event.suspendIfNotReady(proc);
+ throw new ProcedureSuspendedException();
+ }
}
public synchronized void release(ServerName serverName) {
currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
- }
-
- public void suspend(Procedure<?> proc) {
- event.suspend();
- event.suspendIfNotReady(proc);
- }
-
- public void wake(MasterProcedureScheduler scheduler) {
if (!event.isReady()) {
- event.wake(scheduler);
+
event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
}
}
@Override
- public void serverAdded(ServerName worker) {
-
this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
+ public synchronized void serverAdded(ServerName worker) {
+ if (!event.isReady()) {
+
event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
+ }
}
public synchronized void addUsedWorker(ServerName worker) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
index a3e126484c3..34a12ed52b1 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
@@ -109,8 +109,7 @@ public class SnapshotVerifyProcedure extends
ServerRemoteProcedure
setFailure("verify-snapshot", e);
} finally {
// release the worker
-
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this,
targetServer,
- env.getProcedureScheduler());
+
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this,
targetServer);
}
return isProcedureCompleted;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
index 699834f9c1d..98c2c0ec693 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
@@ -90,7 +90,7 @@ public class SplitWALProcedure
skipPersistence();
throw new ProcedureSuspendedException();
}
- splitWALManager.releaseSplitWALWorker(worker,
env.getProcedureScheduler());
+ splitWALManager.releaseSplitWALWorker(worker);
if (!finished) {
LOG.warn("Failed to split wal {} by server {}, retry...", walPath,
worker);
setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
index 8936fbfc7fa..bb64062cf1b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
@@ -26,7 +26,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -65,7 +64,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure;
@@ -1470,20 +1468,14 @@ public class SnapshotManager extends
MasterProcedureManager implements Stoppable
public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure
procedure)
throws ProcedureSuspendedException {
- Optional<ServerName> worker = verifyWorkerAssigner.acquire();
- if (worker.isPresent()) {
- LOG.debug("{} Acquired verify snapshot worker={}", procedure,
worker.get());
- return worker.get();
- }
- verifyWorkerAssigner.suspend(procedure);
- throw new ProcedureSuspendedException();
+ ServerName worker = verifyWorkerAssigner.acquire(procedure);
+ LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker);
+ return worker;
}
- public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure,
ServerName worker,
- MasterProcedureScheduler scheduler) {
+ public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure,
ServerName worker) {
LOG.debug("{} Release verify snapshot worker={}", procedure, worker);
verifyWorkerAssigner.release(worker);
- verifyWorkerAssigner.wake(scheduler);
}
private void restoreWorkers() {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
index 8818609c731..40af3f2f901 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
@@ -17,9 +17,12 @@
*/
package org.apache.hadoop.hbase.master;
-import static
org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
-import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
import static
org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
@@ -33,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.Procedure;
@@ -46,10 +50,10 @@ import
org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -63,7 +67,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@Category({ MasterTests.class, LargeTests.class })
-
public class TestSplitWALManager {
@ClassRule
@@ -78,10 +81,11 @@ public class TestSplitWALManager {
private byte[] FAMILY;
@Before
- public void setup() throws Exception {
+ public void setUp() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
- TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
false);
- TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
+
TEST_UTIL.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
false);
+
TEST_UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
5);
+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
TEST_UTIL.startMiniCluster(3);
master = TEST_UTIL.getHBaseCluster().getMaster();
splitWALManager = master.getSplitWALManager();
@@ -90,7 +94,7 @@ public class TestSplitWALManager {
}
@After
- public void teardown() throws Exception {
+ public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@@ -98,57 +102,61 @@ public class TestSplitWALManager {
public void testAcquireAndRelease() throws Exception {
List<FakeServerProcedure> testProcedures = new ArrayList<>();
for (int i = 0; i < 4; i++) {
- testProcedures
- .add(new
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
+ testProcedures.add(new FakeServerProcedure(
+ ServerName.valueOf("server" + i, 12345,
EnvironmentEdgeManager.currentTime())));
}
- ServerName server =
splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
- Assert.assertNotNull(server);
-
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
-
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
-
- Exception e = null;
- try {
- splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
- } catch (ProcedureSuspendedException suspendException) {
- e = suspendException;
+ ProcedureExecutor<MasterProcedureEnv> procExec =
master.getMasterProcedureExecutor();
+ procExec.submitProcedure(testProcedures.get(0));
+ TEST_UTIL.waitFor(10000, () -> testProcedures.get(0).isWorkerAcquired());
+ procExec.submitProcedure(testProcedures.get(1));
+ procExec.submitProcedure(testProcedures.get(2));
+ TEST_UTIL.waitFor(10000,
+ () -> testProcedures.get(1).isWorkerAcquired() &&
testProcedures.get(2).isWorkerAcquired());
+
+ // should get a ProcedureSuspendedException, so it will try to acquire but
can not get a worker
+ procExec.submitProcedure(testProcedures.get(3));
+ TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isTriedToAcquire());
+ for (int i = 0; i < 3; i++) {
+ Thread.sleep(1000);
+ assertFalse(testProcedures.get(3).isWorkerAcquired());
}
- Assert.assertNotNull(e);
- Assert.assertTrue(e instanceof ProcedureSuspendedException);
- splitWALManager.releaseSplitWALWorker(server,
TEST_UTIL.getHBaseCluster().getMaster()
- .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
-
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
+ // release a worker, the last procedure should be able to get a worker
+ testProcedures.get(0).countDown();
+ TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isWorkerAcquired());
+
+ for (int i = 1; i < 4; i++) {
+ testProcedures.get(i).countDown();
+ }
+ for (int i = 0; i < 4; i++) {
+ final int index = i;
+ TEST_UTIL.waitFor(10000, () -> testProcedures.get(index).isFinished());
+ }
}
@Test
public void testAddNewServer() throws Exception {
List<FakeServerProcedure> testProcedures = new ArrayList<>();
for (int i = 0; i < 4; i++) {
- testProcedures
- .add(new
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
+ testProcedures.add(
+ new
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(1).getServerName()));
}
ServerName server =
splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
- Assert.assertNotNull(server);
-
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
-
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
-
- Exception e = null;
- try {
- splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
- } catch (ProcedureSuspendedException suspendException) {
- e = suspendException;
- }
- Assert.assertNotNull(e);
- Assert.assertTrue(e instanceof ProcedureSuspendedException);
+ assertNotNull(server);
+
assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
+
assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
+
+ assertThrows(ProcedureSuspendedException.class,
+ () -> splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
JVMClusterUtil.RegionServerThread newServer =
TEST_UTIL.getHBaseCluster().startRegionServer();
newServer.waitForServerOnline();
-
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
+
assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
}
@Test
public void testCreateSplitWALProcedures() throws Exception {
- TEST_UTIL.createTable(TABLE_NAME, FAMILY,
TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY,
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
// load table
TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME),
FAMILY);
ProcedureExecutor<MasterProcedureEnv> masterPE =
master.getMasterProcedureExecutor();
@@ -158,21 +166,21 @@ public class TestSplitWALManager {
// Test splitting meta wal
FileStatus[] wals =
TEST_UTIL.getTestFileSystem().listStatus(metaWALDir,
MasterWalManager.META_FILTER);
- Assert.assertEquals(1, wals.length);
+ assertEquals(1, wals.length);
List<Procedure> testProcedures =
splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]),
metaServer);
- Assert.assertEquals(1, testProcedures.size());
+ assertEquals(1, testProcedures.size());
ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
-
Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
+ assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
// Test splitting wal
wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir,
MasterWalManager.NON_META_FILTER);
- Assert.assertEquals(1, wals.length);
+ assertEquals(1, wals.length);
testProcedures =
splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]),
metaServer);
- Assert.assertEquals(1, testProcedures.size());
+ assertEquals(1, testProcedures.size());
ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
-
Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
+ assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
}
@Test
@@ -192,11 +200,11 @@ public class TestSplitWALManager {
ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure,
HConstants.NO_NONCE,
HConstants.NO_NONCE);
TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
- Assert.assertFalse(failedProcedure.isWorkerAcquired());
+ assertFalse(failedProcedure.isWorkerAcquired());
// let one procedure finish and release worker
testProcedures.get(0).countDown();
TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired());
- Assert.assertTrue(testProcedures.get(0).isSuccess());
+ assertTrue(testProcedures.get(0).isSuccess());
}
@Test
@@ -206,14 +214,14 @@ public class TestSplitWALManager {
TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME),
FAMILY);
ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer,
true);
- Assert.assertEquals(1, metaWals.size());
+ assertEquals(1, metaWals.size());
List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false);
- Assert.assertEquals(1, wals.size());
+ assertEquals(1, wals.size());
ServerName testServer =
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs !=
metaServer).findAny()
.get();
metaWals = splitWALManager.getWALsToSplit(testServer, true);
- Assert.assertEquals(0, metaWals.size());
+ assertEquals(0, metaWals.size());
}
private void splitLogsTestHelper(HBaseTestingUtility testUtil) throws
Exception {
@@ -233,9 +241,9 @@ public class TestSplitWALManager {
.map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs !=
metaServer).findAny()
.get();
List<Procedure> procedures = splitWALManager.splitWALs(testServer, false);
- Assert.assertEquals(1, procedures.size());
+ assertEquals(1, procedures.size());
ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
- Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer,
false).size());
+ assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size());
// Validate the old WAL file archive dir
Path walRootDir = hmaster.getMasterFileSystem().getWALRootDir();
@@ -244,12 +252,12 @@ public class TestSplitWALManager {
int archiveFileCount = walFS.listStatus(walArchivePath).length;
procedures = splitWALManager.splitWALs(metaServer, true);
- Assert.assertEquals(1, procedures.size());
+ assertEquals(1, procedures.size());
ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
- Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer,
true).size());
- Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer,
false).size());
+ assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size());
+ assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size());
// There should be archiveFileCount + 1 WALs after SplitWALProcedure finish
- Assert.assertEquals("Splitted WAL files should be archived",
archiveFileCount + 1,
+ assertEquals("Splitted WAL files should be archived", archiveFileCount + 1,
walFS.listStatus(walArchivePath).length);
}
@@ -261,8 +269,8 @@ public class TestSplitWALManager {
@Test
public void testSplitLogsWithDifferentWalAndRootFS() throws Exception {
HBaseTestingUtility testUtil2 = new HBaseTestingUtility();
- testUtil2.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
false);
- testUtil2.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
+
testUtil2.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
false);
+
testUtil2.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
Path dir = TEST_UTIL.getDataTestDirOnTestFS("testWalDir");
testUtil2.getConfiguration().set(CommonFSUtils.HBASE_WAL_DIR,
dir.toString());
CommonFSUtils.setWALRootDir(testUtil2.getConfiguration(), dir);
@@ -295,7 +303,7 @@ public class TestSplitWALManager {
ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(),
failedProcedure,
HConstants.NO_NONCE, HConstants.NO_NONCE);
TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
- Assert.assertFalse(failedProcedure.isWorkerAcquired());
+ assertFalse(failedProcedure.isWorkerAcquired());
for (int i = 0; i < 3; i++) {
testProcedures.get(i).countDown();
}
@@ -307,9 +315,9 @@ public class TestSplitWALManager {
implements ServerProcedureInterface {
private ServerName serverName;
- private ServerName worker;
+ private volatile ServerName worker;
private CountDownLatch barrier = new CountDownLatch(1);
- private boolean triedToAcquire = false;
+ private volatile boolean triedToAcquire = false;
public FakeServerProcedure() {
}
@@ -348,7 +356,7 @@ public class TestSplitWALManager {
setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
return Flow.HAS_MORE_STATE;
case RELEASE_SPLIT_WORKER:
- splitWALManager.releaseSplitWALWorker(worker,
env.getProcedureScheduler());
+ splitWALManager.releaseSplitWALWorker(worker);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);