Repository: hbase
Updated Branches:
  refs/heads/branch-1 c40d880a3 -> 280120ee1


HBASE-13121 Async wal replication for region replicas and dist log replay does 
not work together

Conflicts:
        
hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
        
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
        
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/280120ee
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/280120ee
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/280120ee

Branch: refs/heads/branch-1
Commit: 280120ee1593ed65d288bfb1169150ee9e73a33f
Parents: c40d880
Author: Enis Soztutar <e...@apache.org>
Authored: Mon Mar 9 15:49:09 2015 -0700
Committer: Enis Soztutar <e...@apache.org>
Committed: Mon Mar 9 16:12:04 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |   6 +-
 .../ZkSplitLogWorkerCoordination.java           |  13 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 126 +++++++++++++++----
 .../handler/FinishRegionRecoveringHandler.java  |  55 ++++++++
 .../hbase/replication/BaseWALEntryFilter.java   |  29 +++++
 .../RegionReplicaReplicationEndpoint.java       |  45 ++++++-
 .../hadoop/hbase/wal/WALPrettyPrinter.java      |   2 +-
 .../zookeeper/RecoveringRegionWatcher.java      |  11 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  | 105 ++++++++++++++++
 .../regionserver/TestRegionReplicaFailover.java |   2 +-
 ...egionReplicaReplicationEndpointNoMaster.java |  67 ++++++++--
 11 files changed, 409 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/280120ee/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index 9400a2c..7492474 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -262,7 +262,10 @@ public class RpcClientImpl extends AbstractRpcClient {
           try {
             Connection.this.tracedWriteRequest(cts.call, cts.priority, 
cts.span);
           } catch (IOException e) {
-            LOG.warn("call write error for call #" + cts.call.id + ", message 
=" + e.getMessage());
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("call write error for call #" + cts.call.id
+                + ", message =" + e.getMessage());
+            }
             cts.call.setException(e);
             markClosed(e);
           }
@@ -1132,6 +1135,7 @@ public class RpcClientImpl extends AbstractRpcClient {
    * @throws InterruptedException
    * @throws IOException
    */
+  @Override
   protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, 
MethodDescriptor md,
       Message param, Message returnType, User ticket, InetSocketAddress addr)
       throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/280120ee/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
index a0addb0..32df146 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
+import 
org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
 import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -455,11 +456,8 @@ public class ZkSplitLogWorkerCoordination extends 
ZooKeeperListener implements
                 String nodePath = 
ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
                 try {
                   if (ZKUtil.checkExists(watcher, nodePath) == -1) {
-                    HRegion r = recoveringRegions.remove(region);
-                    if (r != null) {
-                      r.setRecovering(false);
-                    }
-                    LOG.debug("Mark recovering region:" + region + " up.");
+                    server.getExecutorService().submit(
+                      new FinishRegionRecoveringHandler(server, region, 
nodePath));
                   } else {
                     // current check is a defensive(or redundant) mechanism to 
prevent us from
                     // having stale recovering regions in our internal RS 
memory state while
@@ -583,9 +581,8 @@ public class ZkSplitLogWorkerCoordination extends 
ZooKeeperListener implements
    * Next part is related to WALSplitterHandler
    */
   /**
-   * endTask() can fail and the only way to recover out of it is for the 
-   * {@link org.apache.hadoop.hbase.master.SplitLogManager} to
-   * timeout the task node.
+   * endTask() can fail and the only way to recover out of it is for the
+   * {@link org.apache.hadoop.hbase.master.SplitLogManager} to timeout the 
task node.
    * @param slt
    * @param ctr
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/280120ee/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9f192be..cb880c5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -272,6 +272,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
    * replication.
    */
   protected volatile long lastReplayedOpenRegionSeqId = -1L;
+  protected volatile long lastReplayedCompactionSeqId = -1L;
 
   /**
    * Operation enum is used in {@link HRegion#startRegionOperation} to provide 
operation context for
@@ -1158,6 +1159,46 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
    */
   public void setRecovering(boolean newState) {
     boolean wasRecovering = this.isRecovering;
+    // before we flip the recovering switch (enabling reads) we should write 
the region open
+    // event to WAL if needed
+    if (wal != null && getRegionServerServices() != null && 
!writestate.readOnly
+        && wasRecovering && !newState) {
+
+      // force a flush only if region replication is set up for this region. 
Otherwise no need.
+      boolean forceFlush = getTableDesc().getRegionReplication() > 1;
+
+      // force a flush first
+      MonitoredTask status = TaskMonitor.get().createStatus(
+        "Flushing region " + this + " because recovery is finished");
+      try {
+        if (forceFlush) {
+          internalFlushcache(status);
+        }
+
+        status.setStatus("Writing region open event marker to WAL because 
recovery is finished");
+        try {
+          long seqId = openSeqNum;
+          // obtain a new seqId because we possibly have writes and flushes on 
top of openSeqNum
+          if (wal != null) {
+            seqId = getNextSequenceId(wal);
+          }
+          writeRegionOpenMarker(wal, seqId);
+        } catch (IOException e) {
+          // We cannot rethrow this exception since we are being called from 
the zk thread. The
+          // region has already opened. In this case we log the error, but 
continue
+          LOG.warn(getRegionInfo().getEncodedName() + " : was not able to 
write region opening "
+              + "event to WAL, continueing", e);
+        }
+      } catch (IOException ioe) {
+        // Distributed log replay semantics does not necessarily require a 
flush, since the replayed
+        // data is already written again in the WAL. So failed flush should be 
fine.
+        LOG.warn(getRegionInfo().getEncodedName() + " : was not able to flush "
+            + "event to WAL, continueing", ioe);
+      } finally {
+        status.cleanup();
+      }
+    }
+
     this.isRecovering = newState;
     if (wasRecovering && !isRecovering) {
       // Call only when wal replay is over.
@@ -2378,7 +2419,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
    * @return Next sequence number unassociated with any actual edit.
    * @throws IOException
    */
-  private long getNextSequenceId(final WAL wal) throws IOException {
+  @VisibleForTesting
+  protected long getNextSequenceId(final WAL wal) throws IOException {
     WALKey key = this.appendEmptyEdit(wal, null);
     return key.getSequenceId();
   }
@@ -4122,31 +4164,45 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
     checkTargetRegion(compaction.getEncodedRegionName().toByteArray(),
       "Compaction marker from WAL ", compaction);
 
-    if (replaySeqId < lastReplayedOpenRegionSeqId) {
-      LOG.warn(getRegionInfo().getEncodedName() + " : "
-          + "Skipping replaying compaction event :" + 
TextFormat.shortDebugString(compaction)
-          + " because its sequence id is smaller than this regions 
lastReplayedOpenRegionSeqId "
-          + " of " + lastReplayedOpenRegionSeqId);
-      return;
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(getRegionInfo().getEncodedName() + " : "
-          + "Replaying compaction marker " + 
TextFormat.shortDebugString(compaction));
-    }
-
-    startRegionOperation(Operation.REPLAY_EVENT);
-    try {
-      Store store = this.getStore(compaction.getFamilyName().toByteArray());
-      if (store == null) {
+    synchronized (writestate) {
+      if (replaySeqId < lastReplayedOpenRegionSeqId) {
         LOG.warn(getRegionInfo().getEncodedName() + " : "
-            + "Found Compaction WAL edit for deleted family:"
-            + Bytes.toString(compaction.getFamilyName().toByteArray()));
+            + "Skipping replaying compaction event :" + 
TextFormat.shortDebugString(compaction)
+            + " because its sequence id " + replaySeqId + " is smaller than 
this regions "
+            + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
         return;
       }
-      store.replayCompactionMarker(compaction, pickCompactionFiles, 
removeFiles);
-    } finally {
-      closeRegionOperation(Operation.REPLAY_EVENT);
+      if (replaySeqId < lastReplayedCompactionSeqId) {
+        LOG.warn(getRegionInfo().getEncodedName() + " : "
+            + "Skipping replaying compaction event :" + 
TextFormat.shortDebugString(compaction)
+            + " because its sequence id " + replaySeqId + " is smaller than 
this regions "
+            + "lastReplayedCompactionSeqId of " + lastReplayedCompactionSeqId);
+        return;
+      } else {
+        lastReplayedCompactionSeqId = replaySeqId;
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getRegionInfo().getEncodedName() + " : "
+            + "Replaying compaction marker " + 
TextFormat.shortDebugString(compaction)
+            + " with seqId=" + replaySeqId + " and 
lastReplayedOpenRegionSeqId="
+            + lastReplayedOpenRegionSeqId);
+      }
+
+      startRegionOperation(Operation.REPLAY_EVENT);
+      try {
+        Store store = this.getStore(compaction.getFamilyName().toByteArray());
+        if (store == null) {
+          LOG.warn(getRegionInfo().getEncodedName() + " : "
+              + "Found Compaction WAL edit for deleted family:"
+              + Bytes.toString(compaction.getFamilyName().toByteArray()));
+          return;
+        }
+        store.replayCompactionMarker(compaction, pickCompactionFiles, 
removeFiles);
+        logRegionFiles();
+      } finally {
+        closeRegionOperation(Operation.REPLAY_EVENT);
+      }
     }
   }
 
@@ -4185,6 +4241,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
           TextFormat.shortDebugString(flush));
         break;
       }
+
+      logRegionFiles();
     } finally {
       closeRegionOperation(Operation.REPLAY_EVENT);
     }
@@ -4645,6 +4703,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
           notifyAll(); // FindBugs NN_NAKED_NOTIFY
         }
       }
+      logRegionFiles();
     } finally {
       closeRegionOperation(Operation.REPLAY_EVENT);
     }
@@ -4850,6 +4909,17 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
     }
   }
 
+  private void logRegionFiles() {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: 
");
+      for (Store s : stores.values()) {
+        for (StoreFile sf : s.getStorefiles()) {
+          LOG.trace(getRegionInfo().getEncodedName() + " : " + sf);
+        }
+      }
+    }
+  }
+
   /** Checks whether the given regionName is either equal to our region, or 
that
    * the regionName is the primary region to our corresponding range for the 
secondary replica.
    */
@@ -4954,6 +5024,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
         for (StoreFile storeFile: store.getStorefiles()) {
           storeFileNames.add(storeFile.getPath().toString());
         }
+
+        logRegionFiles();
       }
     }
     return storeFileNames;
@@ -6220,7 +6292,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
     checkClassLoading();
     this.openSeqNum = initialize(reporter);
     this.setSequenceId(openSeqNum);
-    if (wal != null && getRegionServerServices() != null && 
!writestate.readOnly) {
+    if (wal != null && getRegionServerServices() != null && 
!writestate.readOnly
+        && !isRecovering) {
+      // Only write the region open event marker to WAL if (1) we are not 
read-only
+      // (2) dist log replay is off or we are not recovering. In case region is
+      // recovering, the open event will be written at setRecovering(false)
       writeRegionOpenMarker(wal, openSeqNum);
     }
     return this;
@@ -7316,7 +7392,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
       ClassSize.OBJECT +
       ClassSize.ARRAY +
       45 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
-      (12 * Bytes.SIZEOF_LONG) +
+      (13 * Bytes.SIZEOF_LONG) +
       5 * Bytes.SIZEOF_BOOLEAN);
 
   // woefully out of date - currently missing:

http://git-wip-us.apache.org/repos/asf/hbase/blob/280120ee/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/FinishRegionRecoveringHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/FinishRegionRecoveringHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/FinishRegionRecoveringHandler.java
new file mode 100644
index 0000000..2ff3454
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/FinishRegionRecoveringHandler.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+public class FinishRegionRecoveringHandler extends EventHandler {
+  private static final Log LOG = 
LogFactory.getLog(FinishRegionRecoveringHandler.class);
+
+  protected final RegionServerServices rss;
+  protected final String regionName;
+  protected final String path;
+
+  public FinishRegionRecoveringHandler(RegionServerServices rss,
+      String regionName, String path) {
+    // we are using the open region handlers, since this operation is in the 
region open lifecycle
+    super(rss, EventType.M_RS_OPEN_REGION);
+    this.rss = rss;
+    this.regionName = regionName;
+    this.path = path;
+  }
+
+  @Override
+  public void process() throws IOException {
+    HRegion region = this.rss.getRecoveringRegions().remove(regionName);
+    if (region != null) {
+      region.setRecovering(false);
+      LOG.info(path + " deleted; " + regionName + " recovered.");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/280120ee/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseWALEntryFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseWALEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseWALEntryFilter.java
new file mode 100644
index 0000000..42b3b7b
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseWALEntryFilter.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A base class WALEntryFilter implementations. Protects against changes in 
the interface signature.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public abstract class BaseWALEntryFilter implements WALEntryFilter {
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/280120ee/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index 92c0faf..a008456 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -46,9 +46,9 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
 import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.RetryingCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
@@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
 import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
+import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
+import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
@@ -76,6 +78,7 @@ import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
 import com.google.protobuf.ServiceException;
 
 /**
@@ -106,6 +109,44 @@ public class RegionReplicaReplicationEndpoint extends 
HBaseReplicationEndpoint {
 
   private ExecutorService pool;
 
+  /**
+   * Skips the entries which has original seqId. Only entries persisted via 
distributed log replay
+   * have their original seq Id fields set.
+   */
+  private class SkipReplayedEditsFilter extends BaseWALEntryFilter {
+    @Override
+    public Entry filter(Entry entry) {
+      // if orig seq id is set, skip replaying the entry
+      if (entry.getKey().getOrigLogSeqNum() > 0) {
+        return null;
+      }
+      return entry;
+    }
+  }
+
+  @Override
+  public WALEntryFilter getWALEntryfilter() {
+    WALEntryFilter superFilter = super.getWALEntryfilter();
+    WALEntryFilter skipReplayedEditsFilter = getSkipReplayedEditsFilter();
+
+    if (superFilter == null) {
+      return skipReplayedEditsFilter;
+    }
+
+    if (skipReplayedEditsFilter == null) {
+      return superFilter;
+    }
+
+    ArrayList<WALEntryFilter> filters = Lists.newArrayList();
+    filters.add(superFilter);
+    filters.add(skipReplayedEditsFilter);
+    return new ChainWALEntryFilter(filters);
+  }
+
+  protected WALEntryFilter getSkipReplayedEditsFilter() {
+    return new SkipReplayedEditsFilter();
+  }
+
   @Override
   public void init(Context context) throws IOException {
     super.init(context);
@@ -141,7 +182,7 @@ public class RegionReplicaReplicationEndpoint extends 
HBaseReplicationEndpoint {
   @Override
   protected void doStart() {
     try {
-      connection = (ClusterConnection) 
HConnectionManager.createConnection(ctx.getConfiguration());
+      connection = (ClusterConnection) 
ConnectionFactory.createConnection(this.conf);
       this.pool = getDefaultThreadPool(conf);
       outputSink = new RegionReplicaOutputSink(controller, entryBuffers, 
connection, pool,
         numWriterThreads, operationTimeout);

http://git-wip-us.apache.org/repos/asf/hbase/blob/280120ee/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
index 720cedc..f0d1e67 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
@@ -350,7 +350,7 @@ public class WALPrettyPrinter {
     options.addOption("j", "json", false, "Output JSON");
     options.addOption("p", "printvals", false, "Print values");
     options.addOption("r", "region", true,
-        "Region to filter by. Pass region name; e.g. 'hbase:meta,,1'");
+        "Region to filter by. Pass encoded region name; e.g. 
'9192caead6a5a20acb4454ffbc79fa14'");
     options.addOption("s", "sequence", true,
         "Sequence to filter by. Pass sequence number.");
     options.addOption("w", "row", true, "Row to filter by. Pass row name.");

http://git-wip-us.apache.org/repos/asf/hbase/blob/280120ee/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
index a07bd2f..5fff9d2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.zookeeper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import 
org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
 import org.apache.zookeeper.KeeperException;
 
 /**
@@ -33,7 +33,7 @@ public class RecoveringRegionWatcher extends 
ZooKeeperListener {
   private static final Log LOG = 
LogFactory.getLog(RecoveringRegionWatcher.class);
 
   private HRegionServer server;
-  
+
   /**
    * Construct a ZooKeeper event listener.
    */
@@ -47,6 +47,7 @@ public class RecoveringRegionWatcher extends 
ZooKeeperListener {
    * Called when a node has been deleted
    * @param path full path of the deleted node
    */
+  @Override
   public void nodeDeleted(String path) {
     if (this.server.isStopped() || this.server.isStopping()) {
       return;
@@ -58,12 +59,8 @@ public class RecoveringRegionWatcher extends 
ZooKeeperListener {
     }
 
     String regionName = path.substring(parentPath.length() + 1);
-    HRegion region = this.server.getRecoveringRegions().remove(regionName);
-    if (region != null) {
-      region.setRecovering(false);
-    }
 
-    LOG.info(path + " deleted; " + regionName + " recovered.");
+    server.getExecutorService().submit(new 
FinishRegionRecoveringHandler(server, regionName, path));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/280120ee/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 47b592f..ce11af7 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -131,6 +132,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
 import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import 
org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -166,6 +168,7 @@ import org.mockito.Mockito;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 
 /**
@@ -5778,6 +5781,108 @@ public class TestHRegion {
     }
   }
 
+  // Helper for test testOpenRegionWrittenToWALForLogReplay
+  static class HRegionWithSeqId extends HRegion {
+    public HRegionWithSeqId(final Path tableDir, final WAL wal, final 
FileSystem fs,
+        final Configuration confParam, final HRegionInfo regionInfo,
+        final HTableDescriptor htd, final RegionServerServices rsServices) {
+      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
+    }
+    @Override
+    protected long getNextSequenceId(WAL wal) throws IOException {
+      return 42;
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testOpenRegionWrittenToWALForLogReplay() throws Exception {
+    // similar to the above test but with distributed log replay
+    final ServerName serverName = 
ServerName.valueOf("testOpenRegionWrittenToWALForLogReplay",
+      100, 42);
+    final RegionServerServices rss = 
spy(TEST_UTIL.createMockRegionServerService(serverName));
+
+    HTableDescriptor htd
+        = new 
HTableDescriptor(TableName.valueOf("testOpenRegionWrittenToWALForLogReplay"));
+    htd.addFamily(new HColumnDescriptor(fam1));
+    htd.addFamily(new HColumnDescriptor(fam2));
+
+    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
+      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
+
+    // open the region w/o rss and wal and flush some files
+    HRegion region = HRegion.createHRegion(hri, TEST_UTIL.getDataTestDir(), 
TEST_UTIL
+             .getConfiguration(), htd);
+    assertNotNull(region);
+
+    // create a file in fam1 for the region before opening in OpenRegionHandler
+    region.put(new Put(Bytes.toBytes("a")).add(fam1, fam1, fam1));
+    region.flushcache();
+    HRegion.closeHRegion(region);
+
+    ArgumentCaptor<WALEdit> editCaptor = 
ArgumentCaptor.forClass(WALEdit.class);
+
+    // capture append() calls
+    WAL wal = mock(WAL.class);
+    when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
+
+    // add the region to recovering regions
+    HashMap<String, HRegion> recoveringRegions = Maps.newHashMap();
+    recoveringRegions.put(region.getRegionInfo().getEncodedName(), null);
+    when(rss.getRecoveringRegions()).thenReturn(recoveringRegions);
+
+    try {
+      Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+      conf.set(HConstants.REGION_IMPL, HRegionWithSeqId.class.getName());
+      region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
+        conf, rss, null);
+
+      // verify that we have not appended region open event to WAL because 
this region is still
+      // recovering
+      verify(wal, times(0)).append((HTableDescriptor)any(), 
(HRegionInfo)any(), (WALKey)any()
+        , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), 
(List<Cell>)any());
+
+      // not put the region out of recovering state
+      new FinishRegionRecoveringHandler(rss, 
region.getRegionInfo().getEncodedName(), "/foo")
+        .prepare().process();
+
+      // now we should have put the entry
+      verify(wal, times(1)).append((HTableDescriptor)any(), 
(HRegionInfo)any(), (WALKey)any()
+        , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), 
(List<Cell>)any());
+
+      WALEdit edit = editCaptor.getValue();
+      assertNotNull(edit);
+      assertNotNull(edit.getCells());
+      assertEquals(1, edit.getCells().size());
+      RegionEventDescriptor desc = 
WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
+      assertNotNull(desc);
+
+      LOG.info("RegionEventDescriptor from WAL: " + desc);
+
+      assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, 
desc.getEventType());
+      assertTrue(Bytes.equals(desc.getTableName().toByteArray(), 
htd.getName()));
+      assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
+        hri.getEncodedNameAsBytes()));
+      assertTrue(desc.getLogSequenceNumber() > 0);
+      assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
+      assertEquals(2, desc.getStoresCount());
+
+      StoreDescriptor store = desc.getStores(0);
+      assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
+      assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
+      assertEquals(1, store.getStoreFileCount()); // 1store file
+      assertFalse(store.getStoreFile(0).contains("/")); // ensure path is 
relative
+
+      store = desc.getStores(1);
+      assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
+      assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
+      assertEquals(0, store.getStoreFileCount()); // no store files
+
+    } finally {
+      HRegion.closeHRegion(region);
+    }
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testCloseRegionWrittenToWAL() throws Exception {

http://git-wip-us.apache.org/repos/asf/hbase/blob/280120ee/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
index 694d4ea..3e214c4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
@@ -95,7 +95,7 @@ public class TestRegionReplicaFailover {
   @Parameters
   public static Collection<Object[]> getParameters() {
     Object[][] params =
-        new Boolean[][] { {false} }; // TODO: enable dist log replay testing 
after HBASE-13121
+        new Boolean[][] { {true}, {false} };
     return Arrays.asList(params);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/280120ee/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index 3b51772..5f2c737 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import static 
org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
 import static 
org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -39,7 +40,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.coprocessor.BaseWALObserver;
@@ -57,7 +58,11 @@ import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import 
org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import 
org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -159,12 +164,12 @@ public class TestRegionReplicaReplicationEndpointNoMaster 
{
     }
   }
 
-  @Test
+  @Test (timeout = 240000)
   public void testReplayCallable() throws Exception {
     // tests replaying the edits to a secondary region replica using the 
Callable directly
     openRegion(HTU, rs0, hriSecondary);
     ClusterConnection connection =
-        (ClusterConnection) 
HConnectionManager.createConnection(HTU.getConfiguration());
+        (ClusterConnection) 
ConnectionFactory.createConnection(HTU.getConfiguration());
 
     //load some data to primary
     HTU.loadNumericRows(table, f, 0, 1000);
@@ -199,13 +204,13 @@ public class TestRegionReplicaReplicationEndpointNoMaster 
{
     }
   }
 
-  @Test
+  @Test (timeout = 240000)
   public void testReplayCallableWithRegionMove() throws Exception {
     // tests replaying the edits to a secondary region replica using the 
Callable directly while
     // the region is moved to another location.It tests handling of RME.
     openRegion(HTU, rs0, hriSecondary);
     ClusterConnection connection =
-        (ClusterConnection) 
HConnectionManager.createConnection(HTU.getConfiguration());
+        (ClusterConnection) 
ConnectionFactory.createConnection(HTU.getConfiguration());
     //load some data to primary
     HTU.loadNumericRows(table, f, 0, 1000);
 
@@ -234,12 +239,12 @@ public class TestRegionReplicaReplicationEndpointNoMaster 
{
     connection.close();
   }
 
-  @Test
+  @Test (timeout = 240000)
   public void testRegionReplicaReplicationEndpointReplicate() throws Exception 
{
     // tests replaying the edits to a secondary region replica using the 
RRRE.replicate()
     openRegion(HTU, rs0, hriSecondary);
     ClusterConnection connection =
-        (ClusterConnection) 
HConnectionManager.createConnection(HTU.getConfiguration());
+        (ClusterConnection) 
ConnectionFactory.createConnection(HTU.getConfiguration());
     RegionReplicaReplicationEndpoint replicator = new 
RegionReplicaReplicationEndpoint();
 
     ReplicationEndpoint.Context context = 
mock(ReplicationEndpoint.Context.class);
@@ -264,4 +269,52 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
     connection.close();
   }
 
+  @Test (timeout = 240000)
+  public void testReplayedEditsAreSkipped() throws Exception {
+    openRegion(HTU, rs0, hriSecondary);
+    ClusterConnection connection =
+        (ClusterConnection) 
ConnectionFactory.createConnection(HTU.getConfiguration());
+    RegionReplicaReplicationEndpoint replicator = new 
RegionReplicaReplicationEndpoint();
+
+    ReplicationEndpoint.Context context = 
mock(ReplicationEndpoint.Context.class);
+    when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
+    when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
+
+    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
+    when(mockPeer.getTableCFs()).thenReturn(null);
+    when(context.getReplicationPeer()).thenReturn(mockPeer);
+
+    replicator.init(context);
+    replicator.start();
+
+    // test the filter for the RE, not actual replication
+    WALEntryFilter filter = replicator.getWALEntryfilter();
+
+    //load some data to primary
+    HTU.loadNumericRows(table, f, 0, 1000);
+
+    Assert.assertEquals(1000, entries.size());
+    for (Entry e: entries) {
+      if 
(Integer.parseInt(Bytes.toString(e.getEdit().getCells().get(0).getValue())) % 2 
== 0) {
+        e.getKey().setOrigLogSeqNum(1); // simulate dist log replay by setting 
orig seq id
+      }
+    }
+
+    long skipped = 0, replayed = 0;
+    for (Entry e : entries) {
+      if (filter.filter(e) == null) {
+        skipped++;
+      } else {
+        replayed++;
+      }
+    }
+
+    assertEquals(500, skipped);
+    assertEquals(500, replayed);
+
+    HTU.deleteNumericRows(table, f, 0, 1000);
+    closeRegion(HTU, rs0, hriSecondary);
+    connection.close();
+  }
+
 }

Reply via email to