Repository: hbase
Updated Branches:
  refs/heads/0.98 8ac090c6d -> 7f28fcf42


HBASE-11863 WAL files are not archived and stays in the WAL directory after 
splitting


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7f28fcf4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7f28fcf4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7f28fcf4

Branch: refs/heads/0.98
Commit: 7f28fcf429242c549219502bfb7da0ad28753f4c
Parents: 8ac090c
Author: Enis Soztutar <e...@apache.org>
Authored: Tue Sep 2 18:24:15 2014 -0700
Committer: Enis Soztutar <e...@apache.org>
Committed: Tue Sep 2 18:24:15 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/master/MasterFileSystem.java   |  5 +-
 .../hadoop/hbase/master/SplitLogManager.java    | 43 ++++------
 .../master/TestDistributedLogSplitting.java     | 19 ++---
 .../hbase/master/TestSplitLogManager.java       | 86 ++++++++++++++------
 4 files changed, 88 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7f28fcf4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index 7a0a03f..842cd86 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
 import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -95,12 +94,14 @@ public class MasterFileSystem {
   private final MasterServices services;
 
   final static PathFilter META_FILTER = new PathFilter() {
+    @Override
     public boolean accept(Path p) {
       return HLogUtil.isMetaFile(p);
     }
   };
 
   final static PathFilter NON_META_FILTER = new PathFilter() {
+    @Override
     public boolean accept(Path p) {
       return !HLogUtil.isMetaFile(p);
     }
@@ -487,7 +488,7 @@ public class MasterFileSystem {
       org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir
         .migrateFSTableDescriptorsIfNecessary(fs, rd);
     }
-      
+
     // Create tableinfo-s for hbase:meta if not already there.
     new FSTableDescriptors(fs, 
rd).createTableDescriptor(HTableDescriptor.META_TABLEDESC);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f28fcf4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 2e98433..192cd9e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -78,6 +78,8 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Distributes the task of log splitting to the available region servers.
  * Coordination happens via zookeeper. For every log file that has to be split 
a
@@ -156,26 +158,6 @@ public class SplitLogManager extends ZooKeeperListener {
    * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, 
Configuration conf,
    *   Stoppable stopper, MasterServices master, ServerName serverName,
    *   boolean masterRecovery, TaskFinisher tf)}
-   * with masterRecovery = false, and tf = null.  Used in unit tests.
-   *
-   * @param zkw the ZK watcher
-   * @param conf the HBase configuration
-   * @param stopper the stoppable in case anything is wrong
-   * @param master the master services
-   * @param serverName the master server name
-   * @throws KeeperException
-   * @throws InterruptedIOException
-   */
-    public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
-       Stoppable stopper, MasterServices master, ServerName serverName)
-       throws InterruptedIOException, KeeperException {
-    this(zkw, conf, stopper, master, serverName, false, null);
-  }
-
-  /**
-   * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, 
Configuration conf,
-   *   Stoppable stopper, MasterServices master, ServerName serverName,
-   *   boolean masterRecovery, TaskFinisher tf)}
    * that provides a task finisher for copying recovered edits to their final 
destination.
    * The task finisher has to be robust because it can be arbitrarily 
restarted or called
    * multiple times.
@@ -190,7 +172,7 @@ public class SplitLogManager extends ZooKeeperListener {
    * @throws InterruptedIOException
    */
   public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
-      Stoppable stopper, MasterServices master, ServerName serverName, boolean 
masterRecovery) 
+      Stoppable stopper, MasterServices master, ServerName serverName, boolean 
masterRecovery)
       throws InterruptedIOException, KeeperException {
     this(zkw, conf, stopper, master, serverName, masterRecovery, new 
TaskFinisher() {
       @Override
@@ -223,7 +205,7 @@ public class SplitLogManager extends ZooKeeperListener {
    */
   public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
         Stoppable stopper, MasterServices master,
-        ServerName serverName, boolean masterRecovery, TaskFinisher tf) 
+        ServerName serverName, boolean masterRecovery, TaskFinisher tf)
       throws InterruptedIOException, KeeperException {
     super(zkw);
     this.taskFinisher = tf;
@@ -236,7 +218,7 @@ public class SplitLogManager extends ZooKeeperListener {
     this.unassignedTimeout =
       conf.getInt("hbase.splitlog.manager.unassigned.timeout", 
DEFAULT_UNASSIGNED_TIMEOUT);
 
-    // Determine recovery mode  
+    // Determine recovery mode
     setRecoveryMode(true);
 
     LOG.info("Timeout=" + timeout + ", unassigned timeout=" + 
unassignedTimeout +
@@ -465,6 +447,11 @@ public class SplitLogManager extends ZooKeeperListener {
     }
   }
 
+  @VisibleForTesting
+  ConcurrentMap<String, Task> getTasks() {
+    return tasks;
+  }
+
   private int activeTasks(final TaskBatch batch) {
     int count = 0;
     for (Task t: tasks.values()) {
@@ -1259,7 +1246,7 @@ public class SplitLogManager extends ZooKeeperListener {
     }
     return result;
   }
-  
+
   /**
    * This function is to set recovery mode from outstanding split log tasks 
from before or
    * current configuration setting
@@ -1282,7 +1269,7 @@ public class SplitLogManager extends ZooKeeperListener {
     boolean hasSplitLogTask = false;
     boolean hasRecoveringRegions = false;
     RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
-    RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ? 
+    RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ?
       RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
 
     // Firstly check if there are outstanding recovering regions
@@ -1307,7 +1294,7 @@ public class SplitLogManager extends ZooKeeperListener {
               previousRecoveryMode = slt.getMode();
               if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
                 // created by old code base where we don't set recovery mode 
in splitlogtask
-                // we can safely set to LOG_SPLITTING because we're in master 
initialization code 
+                // we can safely set to LOG_SPLITTING because we're in master 
initialization code
                 // before SSH is enabled & there is no outstanding recovering 
regions
                 previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
               }
@@ -1332,7 +1319,7 @@ public class SplitLogManager extends ZooKeeperListener {
         // splitlogtask hasn't drained yet, keep existing recovery mode
         return;
       }
-  
+
       if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
         this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
         this.recoveryMode = previousRecoveryMode;
@@ -1345,7 +1332,7 @@ public class SplitLogManager extends ZooKeeperListener {
   public RecoveryMode getRecoveryMode() {
     return this.recoveryMode;
   }
-  
+
   /**
    * Returns if distributed log replay is turned on or not
    * @param conf

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f28fcf4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index c5f869d..3d9b1cf 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -26,11 +26,6 @@ import static 
org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
 import static org.junit.Assert.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -261,6 +256,10 @@ public class TestDistributedLogSplitting {
       }
       LOG.info(count + " edits in " + files.length + " recovered edits 
files.");
     }
+
+    // check that the log file is moved
+    assertFalse(fs.exists(logDir));
+
     assertEquals(NUM_LOG_LINES, count);
   }
 
@@ -466,9 +465,9 @@ public class TestDistributedLogSplitting {
 
     Thread.sleep(2000);
     LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size());
-    
+
     startMasterAndWaitUntilLogSplit(cluster);
-    
+
     // wait for abort completes
     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
       @Override
@@ -781,10 +780,10 @@ public class TestDistributedLogSplitting {
     }
     makeHLog(hrs.getWAL(), regions, "disableTable", "family", NUM_LOG_LINES, 
100, false);
     makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
-    
+
     LOG.info("Disabling table\n");
     TEST_UTIL.getHBaseAdmin().disableTable(Bytes.toBytes("disableTable"));
-    
+
     // abort RS
     LOG.info("Aborting region server: " + hrs.getServerName());
     hrs.abort("testing");
@@ -1229,7 +1228,7 @@ public class TestDistributedLogSplitting {
       WALEdit e = new WALEdit();
       value++;
       e.add(new KeyValue(row, family, qualifier, timeStamp, 
Bytes.toBytes(value)));
-      hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e, 
+      hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e,
         System.currentTimeMillis(), htd, sequenceId);
     }
     hrs.getWAL().sync();

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f28fcf4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
index 4988742..185c92e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
@@ -19,11 +19,8 @@
 package org.apache.hadoop.hbase.master;
 
 import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode;
 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
-import static 
org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success;
 import static 
org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
-import static 
org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result;
 import static 
org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
@@ -40,6 +37,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -61,8 +59,6 @@ import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
 import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
 import 
org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -141,15 +137,15 @@ public class TestSplitLogManager {
     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
     conf.setInt("hfile.format.version", 3);
     to = to + 4 * 100;
-    
-    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) 
? 
+
+    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) 
?
         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
   }
 
   @After
   public void teardown() throws IOException, KeeperException {
     stopper.stop("");
-    slm.stop();
+    if (slm != null) slm.stop();
     TEST_UTIL.shutdownMiniZKCluster();
   }
 
@@ -160,6 +156,7 @@ public class TestSplitLogManager {
   private void waitForCounter(final AtomicLong ctr, long oldval, long newval, 
long timems)
       throws Exception {
     Expr e = new Expr() {
+      @Override
       public long eval() {
         return ctr.get();
       }
@@ -207,7 +204,7 @@ public class TestSplitLogManager {
   public void testTaskCreation() throws Exception {
 
     LOG.info("TestTaskCreation - test the creation of a task in zk");
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, 
null);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -227,7 +224,7 @@ public class TestSplitLogManager {
     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), 
Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, 
null);
     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
     Task task = slm.findOrCreateOrphanTask(tasknode);
     assertTrue(task.isOrphan());
@@ -253,7 +250,7 @@ public class TestSplitLogManager {
         CreateMode.PERSISTENT);
     int version = ZKUtil.checkExists(zkw, tasknode);
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, 
null);
     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
     Task task = slm.findOrCreateOrphanTask(tasknode);
     assertTrue(task.isOrphan());
@@ -276,7 +273,7 @@ public class TestSplitLogManager {
     LOG.info("TestMultipleResbmits - no indefinite resubmissions");
 
     conf.setInt("hbase.splitlog.max.resubmit", 2);
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, 
null);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -308,7 +305,7 @@ public class TestSplitLogManager {
   public void testRescanCleanup() throws Exception {
     LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, 
null);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -337,7 +334,7 @@ public class TestSplitLogManager {
   public void testTaskDone() throws Exception {
     LOG.info("TestTaskDone - cleanup task node once in DONE state");
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, 
null);
     TaskBatch batch = new TaskBatch();
     String tasknode = submitTaskAndWait(batch, "foo/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
@@ -357,7 +354,7 @@ public class TestSplitLogManager {
     LOG.info("TestTaskErr - cleanup task node once in ERR state");
 
     conf.setInt("hbase.splitlog.max.resubmit", 0);
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, 
null);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -379,7 +376,7 @@ public class TestSplitLogManager {
   public void testTaskResigned() throws Exception {
     LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
     assertEquals(tot_mgr_resubmit.get(), 0);
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, 
null);
     assertEquals(tot_mgr_resubmit.get(), 0);
     TaskBatch batch = new TaskBatch();
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -413,7 +410,7 @@ public class TestSplitLogManager {
     zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), 
Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, 
null);
     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
 
     // submit another task which will stay in unassigned mode
@@ -442,7 +439,7 @@ public class TestSplitLogManager {
     LOG.info("testDeadWorker");
 
     conf.setLong("hbase.splitlog.max.resubmit", 0);
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, 
null);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -467,7 +464,7 @@ public class TestSplitLogManager {
 
   @Test
   public void testWorkerCrash() throws Exception {
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, 
null);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -492,7 +489,7 @@ public class TestSplitLogManager {
   @Test
   public void testEmptyLogDir() throws Exception {
     LOG.info("testEmptyLogDir");
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, 
null);
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
         UUID.randomUUID().toString());
@@ -501,6 +498,45 @@ public class TestSplitLogManager {
     assertFalse(fs.exists(emptyLogDirPath));
   }
 
+  @Test (timeout = 60000)
+  public void testLogFilesAreArchived() throws Exception {
+    LOG.info("testLogFilesAreArchived");
+    final SplitLogManager slm = new SplitLogManager(zkw, conf, stopper, master,
+      DUMMY_MASTER, false);
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
+    conf.set(HConstants.HBASE_DIR, dir.toString());
+    Path logDirPath = new Path(dir, UUID.randomUUID().toString());
+    fs.mkdirs(logDirPath);
+    // create an empty log file
+    String logFile = ServerName.valueOf("foo", 1, 1).toString();
+    fs.create(new Path(logDirPath, logFile)).close();
+
+    // spin up a thread mocking split done.
+    new Thread() {
+      @Override
+      public void run() {
+        boolean done = false;
+        while (!done) {
+          for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
+            final ServerName worker1 = ServerName.valueOf("worker1,1,1");
+            SplitLogTask slt = new SplitLogTask.Done(worker1, 
RecoveryMode.LOG_SPLITTING);
+            try {
+              ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
+            } catch (KeeperException e) {
+              LOG.warn(e);
+            }
+            done = true;
+          }
+        }
+      };
+    }.start();
+
+    slm.splitLogDistributed(logDirPath);
+
+    assertFalse(fs.exists(logDirPath));
+  }
+
   /**
    * The following test case is aiming to test the situation when 
distributedLogReplay is turned off
    * and restart a cluster there should no recovery regions in ZK left.
@@ -515,7 +551,7 @@ public class TestSplitLogManager {
           HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
     ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L));
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, false, 
null);
     slm.removeStaleRecoveringRegionsFromZK(null);
 
     List<String> recoveringRegions =
@@ -523,7 +559,7 @@ public class TestSplitLogManager {
 
     assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
   }
-  
+
   @Test(timeout=60000)
   public void testGetPreviousRecoveryMode() throws Exception {
     LOG.info("testGetPreviousRecoveryMode");
@@ -536,12 +572,12 @@ public class TestSplitLogManager {
         ServerName.valueOf("mgr,1,1"), 
RecoveryMode.LOG_SPLITTING).toByteArray(),
         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
-    slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER, 
false, null);
     assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING);
-    
+
     zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, 
"testRecovery"), -1);
     slm.setRecoveryMode(false);
     assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
   }
-  
+
 }

Reply via email to