HBASE-19926 Use a separated class to implement the WALActionListener for 
Replication


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3b603d2c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3b603d2c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3b603d2c

Branch: refs/heads/HBASE-19397-branch-2
Commit: 3b603d2c08c1f1905a589597737412b43970a304
Parents: 0ca7a2e
Author: zhangduo <zhang...@apache.org>
Authored: Sun Feb 4 10:42:33 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Sun Feb 4 20:32:14 2018 +0800

----------------------------------------------------------------------
 .../replication/regionserver/Replication.java   | 22 +----
 .../regionserver/ReplicationSourceManager.java  | 47 +---------
 .../ReplicationSourceWALActionListener.java     | 98 ++++++++++++++++++++
 .../TestReplicationSourceManager.java           | 30 ++----
 4 files changed, 108 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3b603d2c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 0274b0a..ad12c66 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
@@ -44,8 +43,6 @@ import 
org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -130,23 +127,8 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
         replicationTracker, conf, this.server, fs, logDir, oldLogDir, 
clusterId,
         walProvider != null ? walProvider.getWALFileLengthProvider() : p -> 
OptionalLong.empty());
     if (walProvider != null) {
-      walProvider.addWALActionsListener(new WALActionsListener() {
-
-        @Override
-        public void preLogRoll(Path oldPath, Path newPath) throws IOException {
-          replicationManager.preLogRoll(newPath);
-        }
-
-        @Override
-        public void postLogRoll(Path oldPath, Path newPath) throws IOException 
{
-          replicationManager.postLogRoll(newPath);
-        }
-
-        @Override
-        public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) 
throws IOException {
-          replicationManager.scopeWALEdits(logKey, logEdit);
-        }
-      });
+      walProvider
+        .addWALActionsListener(new ReplicationSourceWALActionListener(conf, 
replicationManager));
     }
     this.statsThreadPeriod =
         this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);

http://git-wip-us.apache.org/repos/asf/hbase/blob/3b603d2c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 8543896..cbbfca0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -40,12 +40,9 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Server;
@@ -64,21 +61,16 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-
 /**
  * This class is responsible to manage all the replication
  * sources. There are two classes of sources:
@@ -471,43 +463,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     return totalBufferUsed;
   }
 
-  void scopeWALEdits(WALKey logKey, WALEdit logEdit) throws IOException {
-    scopeWALEdits(logKey, logEdit, this.conf);
-  }
-
-  /**
-   * Utility method used to set the correct scopes on each log key. Doesn't 
set a scope on keys from
-   * compaction WAL edits and if the scope is local.
-   * @param logKey Key that may get scoped according to its edits
-   * @param logEdit Edits used to lookup the scopes
-   * @throws IOException If failed to parse the WALEdit
-   */
-  @VisibleForTesting
-  static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration 
conf) throws IOException {
-    boolean replicationForBulkLoadEnabled =
-      ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf);
-    boolean foundOtherEdits = false;
-    for (Cell cell : logEdit.getCells()) {
-      if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
-        foundOtherEdits = true;
-        break;
-      }
-    }
-
-    if (!foundOtherEdits && logEdit.getCells().size() > 0) {
-      WALProtos.RegionEventDescriptor maybeEvent =
-        WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
-      if (maybeEvent != null &&
-        (maybeEvent.getEventType() == 
WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
-        // In serially replication, we use scopes when reading close marker.
-        foundOtherEdits = true;
-      }
-    }
-    if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || 
logEdit.isReplay()) {
-      ((WALKeyImpl) logKey).serializeReplicationScope(false);
-    }
-  }
-
   /**
    * Factory method to create a replication source
    * @param conf the configuration to use

http://git-wip-us.apache.org/repos/asf/hbase/blob/3b603d2c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
new file mode 100644
index 0000000..eb12614
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * Used to receive new wals.
+ */
+@InterfaceAudience.Private
+class ReplicationSourceWALActionListener implements WALActionsListener {
+
+  private final Configuration conf;
+
+  private final ReplicationSourceManager manager;
+
+  public ReplicationSourceWALActionListener(Configuration conf, 
ReplicationSourceManager manager) {
+    this.conf = conf;
+    this.manager = manager;
+  }
+
+  @Override
+  public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+    manager.preLogRoll(newPath);
+  }
+
+  @Override
+  public void postLogRoll(Path oldPath, Path newPath) throws IOException {
+    manager.postLogRoll(newPath);
+  }
+
+  @Override
+  public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws 
IOException {
+    scopeWALEdits(logKey, logEdit, conf);
+  }
+
+  /**
+   * Utility method used to set the correct scopes on each log key. Doesn't 
set a scope on keys from
+   * compaction WAL edits and if the scope is local.
+   * @param logKey Key that may get scoped according to its edits
+   * @param logEdit Edits used to lookup the scopes
+   * @throws IOException If failed to parse the WALEdit
+   */
+  @VisibleForTesting
+  static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration 
conf) throws IOException {
+    boolean replicationForBulkLoadEnabled =
+        ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf);
+    boolean foundOtherEdits = false;
+    for (Cell cell : logEdit.getCells()) {
+      if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+        foundOtherEdits = true;
+        break;
+      }
+    }
+
+    if (!foundOtherEdits && logEdit.getCells().size() > 0) {
+      WALProtos.RegionEventDescriptor maybeEvent =
+          WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
+      if (maybeEvent != null &&
+        (maybeEvent.getEventType() == 
WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
+        // In serially replication, we use scopes when reading close marker.
+        foundOtherEdits = true;
+      }
+    }
+    if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || 
logEdit.isReplay()) {
+      ((WALKeyImpl) logKey).serializeReplicationScope(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3b603d2c/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 647e12d..eec8e8a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -40,7 +40,6 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -65,7 +64,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -84,7 +82,6 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -100,8 +97,10 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
@@ -270,23 +269,8 @@ public abstract class TestReplicationSourceManager {
     WALFactory wals =
       new WALFactory(utility.getConfiguration(), 
URLEncoder.encode("regionserver:60020", "UTF8"));
     ReplicationSourceManager replicationManager = 
replication.getReplicationManager();
-    wals.getWALProvider().addWALActionsListener(new WALActionsListener() {
-
-      @Override
-      public void preLogRoll(Path oldPath, Path newPath) throws IOException {
-        replicationManager.preLogRoll(newPath);
-      }
-
-      @Override
-      public void postLogRoll(Path oldPath, Path newPath) throws IOException {
-        replicationManager.postLogRoll(newPath);
-      }
-
-      @Override
-      public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) 
throws IOException {
-        replicationManager.scopeWALEdits(logKey, logEdit);
-      }
-    });
+    wals.getWALProvider()
+      .addWALActionsListener(new ReplicationSourceWALActionListener(conf, 
replicationManager));
     final WAL wal = wals.getWAL(hri);
     manager.init();
     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
@@ -459,7 +443,7 @@ public abstract class TestReplicationSourceManager {
     RegionInfo hri = 
RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW)
         .setEndKey(HConstants.EMPTY_END_ROW).build();
     WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
-    ReplicationSourceManager.scopeWALEdits(new WALKeyImpl(), edit, conf);
+    ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, 
conf);
   }
 
   @Test
@@ -471,7 +455,7 @@ public abstract class TestReplicationSourceManager {
     WALKeyImpl logKey = new WALKeyImpl(scope);
 
     // 3. Get the scopes for the key
-    ReplicationSourceManager.scopeWALEdits(logKey, logEdit, conf);
+    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf);
 
     // 4. Assert that no bulk load entry scopes are added if bulk load hfile 
replication is disabled
     assertNull("No bulk load entries scope should be added if bulk load 
replication is disabled.",
@@ -490,7 +474,7 @@ public abstract class TestReplicationSourceManager {
     bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
 
     // 4. Get the scopes for the key
-    ReplicationSourceManager.scopeWALEdits(logKey, logEdit, bulkLoadConf);
+    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, 
bulkLoadConf);
 
     NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
     // Assert family with replication scope global is present in the key scopes

Reply via email to