HBASE-15537 Make multi WAL work with WALs other than FSHLog

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/394b89d1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/394b89d1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/394b89d1

Branch: refs/heads/HBASE-14850
Commit: 394b89d153a9bef67a84633f4ff68aff26d53832
Parents: 2dcd08b
Author: zhangduo <zhang...@apache.org>
Authored: Wed Apr 6 17:04:28 2016 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri Apr 8 10:36:16 2016 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/wal/AbstractFSWAL.java   |   4 +
 .../hadoop/hbase/wal/AbstractFSWALProvider.java |   4 +-
 .../hbase/wal/RegionGroupingProvider.java       | 138 ++++++++-----------
 .../org/apache/hadoop/hbase/wal/WALFactory.java |  30 ++--
 ...ReplicationEndpointWithMultipleAsyncWAL.java |  36 +++++
 .../TestReplicationEndpointWithMultipleWAL.java |   2 +
 ...lMasterRSCompressedWithMultipleAsyncWAL.java |  37 +++++
 ...onKillMasterRSCompressedWithMultipleWAL.java |   2 +
 ...plicationSyncUpToolWithMultipleAsyncWAL.java |  37 +++++
 ...estReplicationSyncUpToolWithMultipleWAL.java |   2 +
 .../wal/TestBoundedRegionGroupingStrategy.java  | 131 ++++++++++--------
 11 files changed, 273 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/394b89d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index b89488a..e4c4eb3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -678,6 +678,10 @@ public abstract class AbstractFSWAL<W> implements WAL {
         // NewPath could be equal to oldPath if replaceWriter fails.
         newPath = replaceWriter(oldPath, newPath, nextWriter);
         tellListenersAboutPostLogRoll(oldPath, newPath);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Create new " + getClass().getSimpleName() + " writer with 
pipeline: "
+              + Arrays.toString(getPipeline()));
+        }
         // Can we delete any of the old log files?
         if (getNumRolledLogFiles() > 0) {
           cleanOldLogs();

http://git-wip-us.apache.org/repos/asf/hbase/blob/394b89d1/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 2f5c299..e495e99 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -82,7 +82,7 @@ public abstract class AbstractFSWALProvider<T extends 
AbstractFSWAL<?>> implemen
    * @param factory factory that made us, identity used for FS layout. may not 
be null
    * @param conf may not be null
    * @param listeners may be null
-   * @param providerId differentiate between providers from one facotry, used 
for FS layout. may be
+   * @param providerId differentiate between providers from one factory, used 
for FS layout. may be
    *          null
    */
   @Override
@@ -109,7 +109,7 @@ public abstract class AbstractFSWALProvider<T extends 
AbstractFSWAL<?>> implemen
   }
 
   @Override
-  public WAL getWAL(byte[] identifier, byte[] namespace) throws IOException {
+  public T getWAL(byte[] identifier, byte[] namespace) throws IOException {
     T walCopy = wal;
     if (walCopy == null) {
       // only lock when need to create wal, and need to lock since

http://git-wip-us.apache.org/repos/asf/hbase/blob/394b89d1/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index 0aeaccf..b447e94 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -18,34 +18,31 @@
  */
 package org.apache.hadoop.hbase.wal;
 
-import static 
org.apache.hadoop.hbase.wal.DefaultWALProvider.META_WAL_PROVIDER_ID;
-import static 
org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
+import static 
org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
+import static 
org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
-import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
-
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 // imports for classes still in regionserver.wal
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.IdReadWriteLock;
 
 /**
  * A WAL Provider that returns a WAL per group of regions.
  *
+ * This provider follows the decorator pattern and mainly holds the logic for 
WAL grouping.
+ * WAL creation/roll/close is delegated to {@link #DELEGATE_PROVIDER}
+ *
  * Region grouping is handled via {@link RegionGroupingStrategy} and can be 
configured via the
  * property "hbase.wal.regiongrouping.strategy". Current strategy choices are
  * <ul>
@@ -57,7 +54,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
  * Optionally, a FQCN to a custom implementation may be given.
  */
 @InterfaceAudience.Private
-class RegionGroupingProvider implements WALProvider {
+public class RegionGroupingProvider implements WALProvider {
   private static final Log LOG = 
LogFactory.getLog(RegionGroupingProvider.class);
 
   /**
@@ -124,22 +121,23 @@ class RegionGroupingProvider implements WALProvider {
   public static final String REGION_GROUPING_STRATEGY = 
"hbase.wal.regiongrouping.strategy";
   public static final String DEFAULT_REGION_GROUPING_STRATEGY = 
Strategies.defaultStrategy.name();
 
+  /** delegate provider for WAL creation/roll/close */
+  public static final String DELEGATE_PROVIDER = 
"hbase.wal.regiongrouping.delegate.provider";
+  public static final String DEFAULT_DELEGATE_PROVIDER = 
WALFactory.Providers.defaultProvider
+      .name();
+
   private static final String META_WAL_GROUP_NAME = "meta";
 
-  /** A group-wal mapping, recommended to make sure one-one rather than 
many-one mapping */
-  protected final Map<String, FSHLog> cached = new HashMap<String, FSHLog>();
-  /** Stores unique wals generated by this RegionGroupingProvider */
-  private final Set<FSHLog> logs = Collections.synchronizedSet(new 
HashSet<FSHLog>());
+  /** A group-provider mapping, make sure one-one rather than many-one mapping 
*/
+  private final ConcurrentMap<String, WALProvider> cached = new 
ConcurrentHashMap<>();
 
-  /**
-   * we synchronize on walCacheLock to prevent wal recreation in different 
threads
-   */
-  final Object walCacheLock = new Object();
+  private final IdReadWriteLock createLock = new IdReadWriteLock();
 
-  protected RegionGroupingStrategy strategy = null;
+  private RegionGroupingStrategy strategy = null;
+  private WALFactory factory = null;
   private List<WALActionsListener> listeners = null;
   private String providerId = null;
-  private Configuration conf = null;
+  private Class<? extends WALProvider> providerClass;
 
   @Override
   public void init(final WALFactory factory, final Configuration conf,
@@ -147,6 +145,7 @@ class RegionGroupingProvider implements WALProvider {
     if (null != strategy) {
       throw new IllegalStateException("WALProvider.init should only be called 
once.");
     }
+    this.factory = factory;
     this.listeners = null == listeners ? null : 
Collections.unmodifiableList(listeners);
     StringBuilder sb = new StringBuilder().append(factory.factoryId);
     if (providerId != null) {
@@ -158,45 +157,33 @@ class RegionGroupingProvider implements WALProvider {
     }
     this.providerId = sb.toString();
     this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, 
DEFAULT_REGION_GROUPING_STRATEGY);
-    this.conf = conf;
+    this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, 
DEFAULT_DELEGATE_PROVIDER);
   }
 
-  /**
-   * Populate the cache for this group.
-   */
-  FSHLog populateCache(String groupName) throws IOException {
-    boolean isMeta = META_WAL_PROVIDER_ID.equals(providerId);
-    String hlogPrefix;
-    List<WALActionsListener> listeners;
-    if (isMeta) {
-      hlogPrefix = this.providerId;
-      // don't watch log roll for meta
-      listeners = Collections.<WALActionsListener> singletonList(new 
MetricsWAL());
+  private WALProvider createProvider(String group) throws IOException {
+    if (META_WAL_PROVIDER_ID.equals(providerId)) {
+      return factory.createProvider(providerClass, listeners, 
META_WAL_PROVIDER_ID);
     } else {
-      hlogPrefix = groupName;
-      listeners = this.listeners;
+      return factory.createProvider(providerClass, listeners, group);
     }
-    FSHLog log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
-        DefaultWALProvider.getWALDirectoryName(providerId), 
HConstants.HREGION_OLDLOGDIR_NAME,
-        conf, listeners, true, hlogPrefix, isMeta ? META_WAL_PROVIDER_ID : 
null);
-    cached.put(groupName, log);
-    logs.add(log);
-    return log;
   }
 
   private WAL getWAL(final String group) throws IOException {
-    WAL log = cached.get(group);
-    if (null == log) {
-      // only lock when need to create wal, and need to lock since
-      // creating hlog on fs is time consuming
-      synchronized (this.walCacheLock) {
-        log = cached.get(group);// check again
-        if (null == log) {
-          log = populateCache(group);
+    WALProvider provider = cached.get(group);
+    if (provider == null) {
+      Lock lock = createLock.getLock(group.hashCode()).writeLock();
+      lock.lock();
+      try {
+        provider = cached.get(group);
+        if (provider == null) {
+          provider = createProvider(group);
+          cached.put(group, provider);
         }
+      } finally {
+        lock.unlock();
       }
     }
-    return log;
+    return provider.getWAL(null, null);
   }
 
   @Override
@@ -214,15 +201,15 @@ class RegionGroupingProvider implements WALProvider {
   public void shutdown() throws IOException {
     // save the last exception and rethrow
     IOException failure = null;
-    synchronized (logs) {
-      for (FSHLog wal : logs) {
-        try {
-          wal.shutdown();
-        } catch (IOException exception) {
-          LOG.error("Problem shutting down log '" + wal + "': " + 
exception.getMessage());
-          LOG.debug("Details of problem shutting down log '" + wal + "'", 
exception);
-          failure = exception;
+    for (WALProvider provider: cached.values()) {
+      try {
+        provider.shutdown();
+      } catch (IOException e) {
+        LOG.error("Problem shutting down wal provider '" + provider + "': " + 
e.getMessage());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Details of problem shutting down wal provider '" + 
provider + "'", e);
         }
+        failure = e;
       }
     }
     if (failure != null) {
@@ -234,15 +221,15 @@ class RegionGroupingProvider implements WALProvider {
   public void close() throws IOException {
     // save the last exception and rethrow
     IOException failure = null;
-    synchronized (logs) {
-      for (FSHLog wal : logs) {
-        try {
-          wal.close();
-        } catch (IOException exception) {
-          LOG.error("Problem closing log '" + wal + "': " + 
exception.getMessage());
-          LOG.debug("Details of problem closing wal '" + wal + "'", exception);
-          failure = exception;
+    for (WALProvider provider : cached.values()) {
+      try {
+        provider.close();
+      } catch (IOException e) {
+        LOG.error("Problem closing wal provider '" + provider + "': " + 
e.getMessage());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Details of problem closing wal provider '" + provider + 
"'", e);
         }
+        failure = e;
       }
     }
     if (failure != null) {
@@ -262,10 +249,8 @@ class RegionGroupingProvider implements WALProvider {
   @Override
   public long getNumLogFiles() {
     long numLogFiles = 0;
-    synchronized (logs) {
-      for (FSHLog wal : logs) {
-        numLogFiles += wal.getNumLogFiles();
-      }
+    for (WALProvider provider : cached.values()) {
+      numLogFiles += provider.getNumLogFiles();
     }
     return numLogFiles;
   }
@@ -273,12 +258,9 @@ class RegionGroupingProvider implements WALProvider {
   @Override
   public long getLogFileSize() {
     long logFileSize = 0;
-    synchronized (logs) {
-      for (FSHLog wal : logs) {
-        logFileSize += wal.getLogFileSize();
-      }
+    for (WALProvider provider : cached.values()) {
+      logFileSize += provider.getLogFileSize();
     }
     return logFileSize;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/394b89d1/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index a2761df..a9c17b5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -127,38 +127,46 @@ public class WALFactory {
     factoryId = SINGLETON_ID;
   }
 
-  /**
-   * instantiate a provider from a config property.
-   * requires conf to have already been set (as well as anything the provider 
might need to read).
-   */
-  WALProvider getProvider(final String key, final String defaultValue,
-      final List<WALActionsListener> listeners, final String providerId) 
throws IOException {
-    Class<? extends WALProvider> clazz;
+  Class<? extends WALProvider> getProviderClass(String key, String 
defaultValue) {
     try {
-      clazz = Providers.valueOf(conf.get(key, defaultValue)).clazz;
+      return Providers.valueOf(conf.get(key, defaultValue)).clazz;
     } catch (IllegalArgumentException exception) {
       // Fall back to them specifying a class name
       // Note that the passed default class shouldn't actually be used, since 
the above only fails
       // when there is a config value present.
-      clazz = conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
+      return conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
     }
+  }
+
+  WALProvider createProvider(Class<? extends WALProvider> clazz,
+      List<WALActionsListener> listeners, String providerId) throws 
IOException {
     LOG.info("Instantiating WALProvider of type " + clazz);
     try {
       final WALProvider result = clazz.newInstance();
       result.init(this, conf, listeners, providerId);
       return result;
     } catch (InstantiationException exception) {
-      LOG.error("couldn't set up WALProvider, check config key " + key);
+      LOG.error("couldn't set up WALProvider, the configured class is " + 
clazz);
       LOG.debug("Exception details for failure to load WALProvider.", 
exception);
       throw new IOException("couldn't set up WALProvider", exception);
     } catch (IllegalAccessException exception) {
-      LOG.error("couldn't set up WALProvider, check config key " + key);
+      LOG.error("couldn't set up WALProvider, the configured class is " + 
clazz);
       LOG.debug("Exception details for failure to load WALProvider.", 
exception);
       throw new IOException("couldn't set up WALProvider", exception);
     }
   }
 
   /**
+   * instantiate a provider from a config property.
+   * requires conf to have already been set (as well as anything the provider 
might need to read).
+   */
+  WALProvider getProvider(final String key, final String defaultValue,
+      final List<WALActionsListener> listeners, final String providerId) 
throws IOException {
+    Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue);
+    return createProvider(clazz, listeners, providerId);
+  }
+
+  /**
    * @param conf must not be null, will keep a reference to read params in 
later reader/writer
    *     instances.
    * @param listeners may be null. will be given to all created wals (and not 
meta-wals)

http://git-wip-us.apache.org/repos/asf/hbase/blob/394b89d1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java
new file mode 100644
index 0000000..debe8a1
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.multiwal;
+
+import org.apache.hadoop.hbase.replication.TestReplicationEndpoint;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationEndpointWithMultipleAsyncWAL extends 
TestReplicationEndpoint {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
+    conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
+    TestReplicationEndpoint.setUpBeforeClass();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/394b89d1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java
index cd7873a..8f350f2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.multiwal;
 import org.apache.hadoop.hbase.replication.TestReplicationEndpoint;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
@@ -29,6 +30,7 @@ public class TestReplicationEndpointWithMultipleWAL extends 
TestReplicationEndpo
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
+    conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
     TestReplicationEndpoint.setUpBeforeClass();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/394b89d1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java
new file mode 100644
index 0000000..416cda7
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.multiwal;
+
+import 
org.apache.hadoop.hbase.replication.TestReplicationKillMasterRSCompressed;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ReplicationTests.class, LargeTests.class})
+public class TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL extends
+    TestReplicationKillMasterRSCompressed {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
+    conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
+    TestReplicationKillMasterRSCompressed.setUpBeforeClass();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/394b89d1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java
index 0b2c2cc..1806776 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.multiwal;
 import 
org.apache.hadoop.hbase.replication.TestReplicationKillMasterRSCompressed;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
@@ -30,6 +31,7 @@ public class 
TestReplicationKillMasterRSCompressedWithMultipleWAL extends
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
+    conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
     TestReplicationKillMasterRSCompressed.setUpBeforeClass();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/394b89d1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java
new file mode 100644
index 0000000..bb91aaf
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.multiwal;
+
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationSyncUpToolWithMultipleAsyncWAL extends 
TestReplicationSyncUpTool {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
+    conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
+    TestReplicationBase.setUpBeforeClass();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/394b89d1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
index a8e455c..7329a90 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
@@ -21,6 +21,7 @@ import 
org.apache.hadoop.hbase.replication.TestReplicationBase;
 import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
@@ -30,6 +31,7 @@ public class TestReplicationSyncUpToolWithMultipleWAL extends 
TestReplicationSyn
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
+    conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
     TestReplicationBase.setUpBeforeClass();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/394b89d1/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
index 2044f82..8523e69 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
@@ -18,80 +18,92 @@
  */
 package org.apache.hadoop.hbase.wal;
 
+import static 
org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.DEFAULT_NUM_REGION_GROUPS;
+import static 
org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.NUM_REGION_GROUPS;
+import static org.apache.hadoop.hbase.wal.RegionGroupingProvider.*;
+import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
+import static org.junit.Assert.assertEquals;
+
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 
-import static org.junit.Assert.assertEquals;
-import static 
org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.NUM_REGION_GROUPS;
-import static 
org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.DEFAULT_NUM_REGION_GROUPS;
-import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
-import static 
org.apache.hadoop.hbase.wal.RegionGroupingProvider.REGION_GROUPING_STRATEGY;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
-@Category({RegionServerTests.class, LargeTests.class})
+@RunWith(Parameterized.class)
+@Category({ RegionServerTests.class, LargeTests.class })
 public class TestBoundedRegionGroupingStrategy {
-  protected static final Log LOG = 
LogFactory.getLog(TestBoundedRegionGroupingStrategy.class);
+  private static final Log LOG = 
LogFactory.getLog(TestBoundedRegionGroupingStrategy.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static Configuration CONF;
+  private static DistributedFileSystem FS;
 
-  @Rule
-  public TestName currentTest = new TestName();
-  protected static Configuration conf;
-  protected static FileSystem fs;
-  protected final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  @Parameter
+  public String walProvider;
+
+  @Parameters(name = "{index}: delegate-provider={0}")
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { 
"asyncfs" });
+  }
 
   @Before
   public void setUp() throws Exception {
-    FileStatus[] entries = fs.listStatus(new Path("/"));
-    for (FileStatus dir : entries) {
-      fs.delete(dir.getPath(), true);
-    }
+    CONF.set(DELEGATE_PROVIDER, walProvider);
   }
 
   @After
   public void tearDown() throws Exception {
+    FileStatus[] entries = FS.listStatus(new Path("/"));
+    for (FileStatus dir : entries) {
+      FS.delete(dir.getPath(), true);
+    }
   }
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    conf = TEST_UTIL.getConfiguration();
+    CONF = TEST_UTIL.getConfiguration();
     // Make block sizes small.
-    conf.setInt("dfs.blocksize", 1024 * 1024);
+    CONF.setInt("dfs.blocksize", 1024 * 1024);
     // quicker heartbeat interval for faster DN death notification
-    conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
-    conf.setInt("dfs.heartbeat.interval", 1);
-    conf.setInt("dfs.client.socket-timeout", 5000);
+    CONF.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
+    CONF.setInt("dfs.heartbeat.interval", 1);
+    CONF.setInt("dfs.client.socket-timeout", 5000);
 
     // faster failover with cluster.shutdown();fs.close() idiom
-    conf.setInt("hbase.ipc.client.connect.max.retries", 1);
-    conf.setInt("dfs.client.block.recovery.retries", 1);
-    conf.setInt("hbase.ipc.client.connection.maxidletime", 500);
+    CONF.setInt("hbase.ipc.client.connect.max.retries", 1);
+    CONF.setInt("dfs.client.block.recovery.retries", 1);
+    CONF.setInt("hbase.ipc.client.connection.maxidletime", 500);
 
-    conf.setClass(WAL_PROVIDER, RegionGroupingProvider.class, 
WALProvider.class);
-    conf.set(REGION_GROUPING_STRATEGY, 
RegionGroupingProvider.Strategies.bounded.name());
+    CONF.setClass(WAL_PROVIDER, RegionGroupingProvider.class, 
WALProvider.class);
+    CONF.set(REGION_GROUPING_STRATEGY, 
RegionGroupingProvider.Strategies.bounded.name());
 
     TEST_UTIL.startMiniDFSCluster(3);
 
-    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+    FS = TEST_UTIL.getDFSCluster().getFileSystem();
   }
 
   @AfterClass
@@ -106,8 +118,8 @@ public class TestBoundedRegionGroupingStrategy {
   public void testConcurrentWrites() throws Exception {
     // Run the WPE tool with three threads writing 3000 edits each 
concurrently.
     // When done, verify that all edits were written.
-    int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
-        new String [] {"-threads", "3", "-verify", "-noclosefs", 
"-iterations", "3000"});
+    int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF),
+      new String[] { "-threads", "3", "-verify", "-noclosefs", "-iterations", 
"3000" });
     assertEquals(0, errCode);
   }
 
@@ -117,39 +129,39 @@ public class TestBoundedRegionGroupingStrategy {
   @Test
   public void testMoreRegionsThanBound() throws Exception {
     final String parallelism = Integer.toString(DEFAULT_NUM_REGION_GROUPS * 2);
-    int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
-        new String [] {"-threads", parallelism, "-verify", "-noclosefs", 
"-iterations", "3000",
-            "-regions", parallelism});
+    int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF),
+      new String[] { "-threads", parallelism, "-verify", "-noclosefs", 
"-iterations", "3000",
+          "-regions", parallelism });
     assertEquals(0, errCode);
   }
 
   @Test
   public void testBoundsGreaterThanDefault() throws Exception {
-    final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
+    final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
     try {
-      conf.setInt(NUM_REGION_GROUPS, temp*4);
-      final String parallelism = Integer.toString(temp*4);
-      int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
-          new String [] {"-threads", parallelism, "-verify", "-noclosefs", 
"-iterations", "3000",
-              "-regions", parallelism});
+      CONF.setInt(NUM_REGION_GROUPS, temp * 4);
+      final String parallelism = Integer.toString(temp * 4);
+      int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF),
+        new String[] { "-threads", parallelism, "-verify", "-noclosefs", 
"-iterations", "3000",
+            "-regions", parallelism });
       assertEquals(0, errCode);
     } finally {
-      conf.setInt(NUM_REGION_GROUPS, temp);
+      CONF.setInt(NUM_REGION_GROUPS, temp);
     }
   }
 
   @Test
   public void testMoreRegionsThanBoundWithBoundsGreaterThanDefault() throws 
Exception {
-    final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
+    final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
     try {
-      conf.setInt(NUM_REGION_GROUPS, temp*4);
-      final String parallelism = Integer.toString(temp*4*2);
-      int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
-          new String [] {"-threads", parallelism, "-verify", "-noclosefs", 
"-iterations", "3000",
-              "-regions", parallelism});
+      CONF.setInt(NUM_REGION_GROUPS, temp * 4);
+      final String parallelism = Integer.toString(temp * 4 * 2);
+      int errCode = WALPerformanceEvaluation.innerMain(new Configuration(CONF),
+        new String[] { "-threads", parallelism, "-verify", "-noclosefs", 
"-iterations", "3000",
+            "-regions", parallelism });
       assertEquals(0, errCode);
     } finally {
-      conf.setInt(NUM_REGION_GROUPS, temp);
+      CONF.setInt(NUM_REGION_GROUPS, temp);
     }
   }
 
@@ -158,32 +170,33 @@ public class TestBoundedRegionGroupingStrategy {
    */
   @Test
   public void setMembershipDedups() throws IOException {
-    final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
+    final int temp = CONF.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
     WALFactory wals = null;
     try {
-      conf.setInt(NUM_REGION_GROUPS, temp*4);
+      CONF.setInt(NUM_REGION_GROUPS, temp * 4);
       // Set HDFS root directory for storing WAL
-      FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
+      FSUtils.setRootDir(CONF, TEST_UTIL.getDataTestDirOnTestFS());
 
-      wals = new WALFactory(conf, null, currentTest.getMethodName());
-      final Set<WAL> seen = new HashSet<WAL>(temp*4);
+      wals = new WALFactory(CONF, null, "setMembershipDedups");
+      final Set<WAL> seen = new HashSet<WAL>(temp * 4);
       final Random random = new Random();
       int count = 0;
       // we know that this should see one of the wals more than once
-      for (int i = 0; i < temp*8; i++) {
+      for (int i = 0; i < temp * 8; i++) {
         final WAL maybeNewWAL = wals.getWAL(Bytes.toBytes(random.nextInt()), 
null);
         LOG.info("Iteration " + i + ", checking wal " + maybeNewWAL);
         if (seen.add(maybeNewWAL)) {
           count++;
         }
       }
-      assertEquals("received back a different number of WALs that are not 
equal() to each other " +
-          "than the bound we placed.", temp*4, count);
+      assertEquals("received back a different number of WALs that are not 
equal() to each other "
+          + "than the bound we placed.",
+        temp * 4, count);
     } finally {
       if (wals != null) {
         wals.close();
       }
-      conf.setInt(NUM_REGION_GROUPS, temp);
+      CONF.setInt(NUM_REGION_GROUPS, temp);
     }
   }
 }

Reply via email to