Repository: hbase Updated Branches: refs/heads/0.98 8ac090c6d -> 7f28fcf42
HBASE-11863 WAL files are not archived and stays in the WAL directory after splitting Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7f28fcf4 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7f28fcf4 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7f28fcf4 Branch: refs/heads/0.98 Commit: 7f28fcf429242c549219502bfb7da0ad28753f4c Parents: 8ac090c Author: Enis Soztutar <e...@apache.org> Authored: Tue Sep 2 18:24:15 2014 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Tue Sep 2 18:24:15 2014 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/master/MasterFileSystem.java | 5 +- .../hadoop/hbase/master/SplitLogManager.java | 43 ++++------ .../master/TestDistributedLogSplitting.java | 19 ++--- .../hbase/master/TestSplitLogManager.java | 86 ++++++++++++++------ 4 files changed, 88 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/7f28fcf4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 7a0a03f..842cd86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -95,12 +94,14 @@ public class MasterFileSystem { private final MasterServices services; final static PathFilter META_FILTER = new PathFilter() { + @Override public boolean accept(Path p) { return HLogUtil.isMetaFile(p); } }; final static PathFilter NON_META_FILTER = new PathFilter() { + @Override public boolean accept(Path p) { return !HLogUtil.isMetaFile(p); } @@ -487,7 +488,7 @@ public class MasterFileSystem { org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir .migrateFSTableDescriptorsIfNecessary(fs, rd); } - + // Create tableinfo-s for hbase:meta if not already there. new FSTableDescriptors(fs, rd).createTableDescriptor(HTableDescriptor.META_TABLEDESC); http://git-wip-us.apache.org/repos/asf/hbase/blob/7f28fcf4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 2e98433..192cd9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -78,6 +78,8 @@ import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; +import com.google.common.annotations.VisibleForTesting; + /** * Distributes the task of log splitting to the available region servers. * Coordination happens via zookeeper. For every log file that has to be split a @@ -156,26 +158,6 @@ public class SplitLogManager extends ZooKeeperListener { * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, * Stoppable stopper, MasterServices master, ServerName serverName, * boolean masterRecovery, TaskFinisher tf)} - * with masterRecovery = false, and tf = null. Used in unit tests. - * - * @param zkw the ZK watcher - * @param conf the HBase configuration - * @param stopper the stoppable in case anything is wrong - * @param master the master services - * @param serverName the master server name - * @throws KeeperException - * @throws InterruptedIOException - */ - public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, - Stoppable stopper, MasterServices master, ServerName serverName) - throws InterruptedIOException, KeeperException { - this(zkw, conf, stopper, master, serverName, false, null); - } - - /** - * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, - * Stoppable stopper, MasterServices master, ServerName serverName, - * boolean masterRecovery, TaskFinisher tf)} * that provides a task finisher for copying recovered edits to their final destination. * The task finisher has to be robust because it can be arbitrarily restarted or called * multiple times. @@ -190,7 +172,7 @@ public class SplitLogManager extends ZooKeeperListener { * @throws InterruptedIOException */ public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, - Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery) + Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery) throws InterruptedIOException, KeeperException { this(zkw, conf, stopper, master, serverName, masterRecovery, new TaskFinisher() { @Override @@ -223,7 +205,7 @@ public class SplitLogManager extends ZooKeeperListener { */ public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, Stoppable stopper, MasterServices master, - ServerName serverName, boolean masterRecovery, TaskFinisher tf) + ServerName serverName, boolean masterRecovery, TaskFinisher tf) throws InterruptedIOException, KeeperException { super(zkw); this.taskFinisher = tf; @@ -236,7 +218,7 @@ public class SplitLogManager extends ZooKeeperListener { this.unassignedTimeout = conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); - // Determine recovery mode + // Determine recovery mode setRecoveryMode(true); LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout + @@ -465,6 +447,11 @@ public class SplitLogManager extends ZooKeeperListener { } } + @VisibleForTesting + ConcurrentMap<String, Task> getTasks() { + return tasks; + } + private int activeTasks(final TaskBatch batch) { int count = 0; for (Task t: tasks.values()) { @@ -1259,7 +1246,7 @@ public class SplitLogManager extends ZooKeeperListener { } return result; } - + /** * This function is to set recovery mode from outstanding split log tasks from before or * current configuration setting @@ -1282,7 +1269,7 @@ public class SplitLogManager extends ZooKeeperListener { boolean hasSplitLogTask = false; boolean hasRecoveringRegions = false; RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN; - RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ? + RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING; // Firstly check if there are outstanding recovering regions @@ -1307,7 +1294,7 @@ public class SplitLogManager extends ZooKeeperListener { previousRecoveryMode = slt.getMode(); if (previousRecoveryMode == RecoveryMode.UNKNOWN) { // created by old code base where we don't set recovery mode in splitlogtask - // we can safely set to LOG_SPLITTING because we're in master initialization code + // we can safely set to LOG_SPLITTING because we're in master initialization code // before SSH is enabled & there is no outstanding recovering regions previousRecoveryMode = RecoveryMode.LOG_SPLITTING; } @@ -1332,7 +1319,7 @@ public class SplitLogManager extends ZooKeeperListener { // splitlogtask hasn't drained yet, keep existing recovery mode return; } - + if (previousRecoveryMode != RecoveryMode.UNKNOWN) { this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig); this.recoveryMode = previousRecoveryMode; @@ -1345,7 +1332,7 @@ public class SplitLogManager extends ZooKeeperListener { public RecoveryMode getRecoveryMode() { return this.recoveryMode; } - + /** * Returns if distributed log replay is turned on or not * @param conf http://git-wip-us.apache.org/repos/asf/hbase/blob/7f28fcf4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index c5f869d..3d9b1cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -26,11 +26,6 @@ import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done; import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err; import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned; import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -261,6 +256,10 @@ public class TestDistributedLogSplitting { } LOG.info(count + " edits in " + files.length + " recovered edits files."); } + + // check that the log file is moved + assertFalse(fs.exists(logDir)); + assertEquals(NUM_LOG_LINES, count); } @@ -466,9 +465,9 @@ public class TestDistributedLogSplitting { Thread.sleep(2000); LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size()); - + startMasterAndWaitUntilLogSplit(cluster); - + // wait for abort completes TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { @Override @@ -781,10 +780,10 @@ public class TestDistributedLogSplitting { } makeHLog(hrs.getWAL(), regions, "disableTable", "family", NUM_LOG_LINES, 100, false); makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100); - + LOG.info("Disabling table\n"); TEST_UTIL.getHBaseAdmin().disableTable(Bytes.toBytes("disableTable")); - + // abort RS LOG.info("Aborting region server: " + hrs.getServerName()); hrs.abort("testing"); @@ -1229,7 +1228,7 @@ public class TestDistributedLogSplitting { WALEdit e = new WALEdit(); value++; e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); - hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e, + hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e, System.currentTimeMillis(), htd, sequenceId); } hrs.getWAL().sync(); http://git-wip-us.apache.org/repos/asf/hbase/blob/7f28fcf4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 4988742..185c92e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -19,11 +19,8 @@ package org.apache.hadoop.hbase.master; import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted; @@ -40,6 +37,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -61,8 +59,6 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.regionserver.SplitLogWorker; import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -141,15 +137,15 @@ public class TestSplitLogManager { conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); conf.setInt("hfile.format.version", 3); to = to + 4 * 100; - - this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + + this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); } @After public void teardown() throws IOException, KeeperException { stopper.stop(""); - slm.stop(); + if (slm != null) slm.stop(); TEST_UTIL.shutdownMiniZKCluster(); } @@ -160,6 +156,7 @@ public class TestSplitLogManager { private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems) throws Exception { Expr e = new Expr() { + @Override public long eval() { return ctr.get(); } @@ -207,7 +204,7 @@ public class TestSplitLogManager { public void testTaskCreation() throws Exception { LOG.info("TestTaskCreation - test the creation of a task in zk"); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, null); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -227,7 +224,7 @@ public class TestSplitLogManager { zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, null); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); @@ -253,7 +250,7 @@ public class TestSplitLogManager { CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, null); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); @@ -276,7 +273,7 @@ public class TestSplitLogManager { LOG.info("TestMultipleResbmits - no indefinite resubmissions"); conf.setInt("hbase.splitlog.max.resubmit", 2); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, null); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -308,7 +305,7 @@ public class TestSplitLogManager { public void testRescanCleanup() throws Exception { LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, null); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -337,7 +334,7 @@ public class TestSplitLogManager { public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, null); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); @@ -357,7 +354,7 @@ public class TestSplitLogManager { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, null); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -379,7 +376,7 @@ public class TestSplitLogManager { public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); assertEquals(tot_mgr_resubmit.get(), 0); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, null); assertEquals(tot_mgr_resubmit.get(), 0); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -413,7 +410,7 @@ public class TestSplitLogManager { zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, null); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); // submit another task which will stay in unassigned mode @@ -442,7 +439,7 @@ public class TestSplitLogManager { LOG.info("testDeadWorker"); conf.setLong("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, null); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -467,7 +464,7 @@ public class TestSplitLogManager { @Test public void testWorkerCrash() throws Exception { - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, null); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -492,7 +489,7 @@ public class TestSplitLogManager { @Test public void testEmptyLogDir() throws Exception { LOG.info("testEmptyLogDir"); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, null); FileSystem fs = TEST_UTIL.getTestFileSystem(); Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString()); @@ -501,6 +498,45 @@ public class TestSplitLogManager { assertFalse(fs.exists(emptyLogDirPath)); } + @Test (timeout = 60000) + public void testLogFilesAreArchived() throws Exception { + LOG.info("testLogFilesAreArchived"); + final SplitLogManager slm = new SplitLogManager(zkw, conf, stopper, master, + DUMMY_MASTER, false); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived"); + conf.set(HConstants.HBASE_DIR, dir.toString()); + Path logDirPath = new Path(dir, UUID.randomUUID().toString()); + fs.mkdirs(logDirPath); + // create an empty log file + String logFile = ServerName.valueOf("foo", 1, 1).toString(); + fs.create(new Path(logDirPath, logFile)).close(); + + // spin up a thread mocking split done. + new Thread() { + @Override + public void run() { + boolean done = false; + while (!done) { + for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) { + final ServerName worker1 = ServerName.valueOf("worker1,1,1"); + SplitLogTask slt = new SplitLogTask.Done(worker1, RecoveryMode.LOG_SPLITTING); + try { + ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray()); + } catch (KeeperException e) { + LOG.warn(e); + } + done = true; + } + } + }; + }.start(); + + slm.splitLogDistributed(logDirPath); + + assertFalse(fs.exists(logDirPath)); + } + /** * The following test case is aiming to test the situation when distributedLogReplay is turned off * and restart a cluster there should no recovery regions in ZK left. @@ -515,7 +551,7 @@ public class TestSplitLogManager { HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L)); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, null); slm.removeStaleRecoveringRegionsFromZK(null); List<String> recoveringRegions = @@ -523,7 +559,7 @@ public class TestSplitLogManager { assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty()); } - + @Test(timeout=60000) public void testGetPreviousRecoveryMode() throws Exception { LOG.info("testGetPreviousRecoveryMode"); @@ -536,12 +572,12 @@ public class TestSplitLogManager { ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER, false, null); assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING); - + zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1); slm.setRecoveryMode(false); assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY); } - + }