This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit c9a01b2dab47bfa852e0bb6c66cb48c4d4e6f2bc
Author: XinSun <ddu...@gmail.com>
AuthorDate: Fri Sep 4 18:53:46 2020 +0800

    HBASE-24683 Add a basic ReplicationServer which only implement 
ReplicationSink Service (#2111)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../java/org/apache/hadoop/hbase/util/DNS.java     |   3 +-
 .../hbase/replication/HReplicationServer.java      | 391 ++++++++++++++++
 .../replication/ReplicationServerRpcServices.java  | 516 +++++++++++++++++++++
 .../hbase/replication/TestReplicationServer.java   | 151 ++++++
 4 files changed, 1060 insertions(+), 1 deletion(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java
index 5c23ddc..cf94b2a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java
@@ -64,7 +64,8 @@ public final class DNS {
 
   public enum ServerType {
     MASTER("master"),
-    REGIONSERVER("regionserver");
+    REGIONSERVER("regionserver"),
+    REPLICATIONSERVER("replicationserver");
 
     private String name;
     ServerType(String name) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
new file mode 100644
index 0000000..31dec0c
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.regionserver.ReplicationService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HReplicationServer which is responsible to all replication stuff. It checks 
in with
+ * the HMaster. There are many HReplicationServers in a single HBase 
deployment.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings({ "deprecation"})
+public class HReplicationServer extends Thread implements Server {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HReplicationServer.class);
+
+  /** replication server process name */
+  public static final String REPLICATION_SERVER = "replicationserver";
+
+  /**
+   * This servers start code.
+   */
+  protected final long startCode;
+
+  private volatile boolean stopped = false;
+
+  // Go down hard. Used if file system becomes unavailable and also in
+  // debugging and unit tests.
+  private AtomicBoolean abortRequested;
+
+  // flag set after we're done setting up server threads
+  final AtomicBoolean online = new AtomicBoolean(false);
+
+  /**
+   * The server name the Master sees us as.  Its made from the hostname the
+   * master passes us, port, and server start code. Gets set after registration
+   * against Master.
+   */
+  private ServerName serverName;
+
+  protected final Configuration conf;
+
+  private ReplicationSinkService replicationSinkService;
+
+  final int msgInterval;
+  // A sleeper that sleeps for msgInterval.
+  protected final Sleeper sleeper;
+
+  // zookeeper connection and watcher
+  protected final ZKWatcher zooKeeper;
+
+  /**
+   * The asynchronous cluster connection to be shared by services.
+   */
+  protected AsyncClusterConnection asyncClusterConnection;
+
+  private UserProvider userProvider;
+
+  protected final ReplicationServerRpcServices rpcServices;
+
+  public HReplicationServer(final Configuration conf) throws IOException {
+    TraceUtil.initTracer(conf);
+    try {
+      this.startCode = System.currentTimeMillis();
+      this.conf = conf;
+
+      this.abortRequested = new AtomicBoolean(false);
+
+      this.rpcServices = createRpcServices();
+
+      String hostName = this.rpcServices.isa.getHostName();
+      serverName = ServerName.valueOf(hostName, 
this.rpcServices.isa.getPort(), this.startCode);
+
+      this.userProvider = UserProvider.instantiate(conf);
+
+      this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 
* 1000);
+      this.sleeper = new Sleeper(this.msgInterval, this);
+
+      // Some unit tests don't need a cluster, so no zookeeper at all
+      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
+        // Open connection to zookeeper and set primary watcher
+        zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
+            rpcServices.isa.getPort(), this, false);
+      } else {
+        zooKeeper = null;
+      }
+
+      this.rpcServices.start(zooKeeper);
+    } catch (Throwable t) {
+      // Make sure we log the exception. HReplicationServer is often started 
via reflection and the
+      // cause of failed startup is lost.
+      LOG.error("Failed construction ReplicationServer", t);
+      throw t;
+    }
+  }
+
+  public String getProcessName() {
+    return REPLICATION_SERVER;
+  }
+
+  @Override
+  public void run() {
+    if (isStopped()) {
+      LOG.info("Skipping run; stopped");
+      return;
+    }
+    try {
+      // Do pre-registration initializations; zookeeper, lease threads, etc.
+      preRegistrationInitialization();
+    } catch (Throwable e) {
+      abort("Fatal exception during initialization", e);
+    }
+    try {
+      setupReplication();
+      startReplicationService();
+
+      online.set(true);
+
+      long lastMsg = System.currentTimeMillis();
+      // The main run loop.
+      while (!isStopped()) {
+        long now = System.currentTimeMillis();
+        if ((now - lastMsg) >= msgInterval) {
+          lastMsg = System.currentTimeMillis();
+        }
+        if (!isStopped() && !isAborted()) {
+          this.sleeper.sleep();
+        }
+      }
+
+      stopServiceThreads();
+
+      if (this.rpcServices != null) {
+        this.rpcServices.stop();
+      }
+    } catch (Throwable t) {
+      abort(t.getMessage(), t);
+    }
+
+    if (this.zooKeeper != null) {
+      this.zooKeeper.close();
+    }
+    LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection 
closed.");
+  }
+
+  private Configuration cleanupConfiguration() {
+    Configuration conf = this.conf;
+    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
+    if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
+      // Use server ZK cluster for server-issued connections, so we clone
+      // the conf and unset the client ZK related properties
+      conf = new Configuration(this.conf);
+      conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+    }
+    return conf;
+  }
+
+  /**
+   * All initialization needed before we go register with Master.<br>
+   * Do bare minimum. Do bulk of initializations AFTER we've connected to the 
Master.<br>
+   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
+   */
+  private void preRegistrationInitialization() {
+    try {
+      setupClusterConnection();
+    } catch (Throwable t) {
+      // Call stop if error or process will stick around for ever since server
+      // puts up non-daemon threads.
+      this.rpcServices.stop();
+      abort("Initialization of RS failed.  Hence aborting RS.", t);
+    }
+  }
+
+  /**
+   * Setup our cluster connection if not already initialized.
+   */
+  protected final synchronized void setupClusterConnection() throws 
IOException {
+    if (asyncClusterConnection == null) {
+      Configuration conf = cleanupConfiguration();
+      InetSocketAddress localAddress = new 
InetSocketAddress(this.rpcServices.isa.getAddress(), 0);
+      User user = userProvider.getCurrent();
+      asyncClusterConnection =
+          ClusterConnectionFactory.createAsyncClusterConnection(conf, 
localAddress, user);
+    }
+  }
+
+  /**
+   * Wait on all threads to finish. Presumption is that all closes and stops
+   * have already been called.
+   */
+  protected void stopServiceThreads() {
+    if (this.replicationSinkService != null) {
+      this.replicationSinkService.stopReplicationService();
+    }
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public ZKWatcher getZooKeeper() {
+    return zooKeeper;
+  }
+
+  @Override
+  public Connection getConnection() {
+    return getAsyncConnection().toConnection();
+  }
+
+  @Override
+  public Connection createConnection(Configuration conf) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's 
ReplicationServer."));
+  }
+
+  @Override
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return this.asyncClusterConnection;
+  }
+
+  @Override
+  public ServerName getServerName() {
+    return serverName;
+  }
+
+  @Override
+  public CoordinatedStateManager getCoordinatedStateManager() {
+    return null;
+  }
+
+  @Override
+  public ChoreService getChoreService() {
+    return null;
+  }
+
+  @Override
+  public void abort(String why, Throwable cause) {
+    if (!setAbortRequested()) {
+      // Abort already in progress, ignore the new request.
+      LOG.debug(
+          "Abort already in progress. Ignoring the current request with 
reason: {}", why);
+      return;
+    }
+    String msg = "***** ABORTING replication server " + this + ": " + why + " 
*****";
+    if (cause != null) {
+      LOG.error(HBaseMarkers.FATAL, msg, cause);
+    } else {
+      LOG.error(HBaseMarkers.FATAL, msg);
+    }
+    stop(why);
+  }
+
+  @Override
+  public boolean isAborted() {
+    return abortRequested.get();
+  }
+
+  @Override
+  public void stop(final String msg) {
+    if (!this.stopped) {
+      LOG.info("***** STOPPING region server '" + this + "' *****");
+      this.stopped = true;
+      LOG.info("STOPPED: " + msg);
+      // Wakes run() if it is sleeping
+      sleeper.skipSleepCycle();
+    }
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  /**
+   * Setup WAL log and replication if enabled. Replication setup is done in 
here because it wants to
+   * be hooked up to WAL.
+   */
+  private void setupReplication() throws IOException {
+    // Instantiate replication if replication enabled. Pass it the log 
directories.
+    createNewReplicationInstance(conf, this);
+  }
+
+  /**
+   * Load the replication executorService objects, if any
+   */
+  private static void createNewReplicationInstance(Configuration conf, 
HReplicationServer server)
+      throws IOException {
+    // read in the name of the sink replication class from the config file.
+    String sinkClassname = 
conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
+        HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+
+    server.replicationSinkService = newReplicationInstance(sinkClassname,
+        ReplicationSinkService.class, conf, server);
+  }
+
+  private static <T extends ReplicationService> T 
newReplicationInstance(String classname,
+      Class<T> xface, Configuration conf, HReplicationServer server) throws 
IOException {
+    final Class<? extends T> clazz;
+    try {
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
+    } catch (java.lang.ClassNotFoundException nfe) {
+      throw new IOException("Could not find class for " + classname);
+    }
+    T service = ReflectionUtils.newInstance(clazz, conf);
+    service.initialize(server, null, null, null, null);
+    return service;
+  }
+
+  /**
+   * Start up replication source and sink handlers.
+   */
+  private void startReplicationService() throws IOException {
+    if (this.replicationSinkService != null) {
+      this.replicationSinkService.startReplicationService();
+    }
+  }
+
+  /**
+   * @return Return the object that implements the replication sink 
executorService.
+   */
+  public ReplicationSinkService getReplicationSinkService() {
+    return replicationSinkService;
+  }
+
+  /**
+   * Report the status of the server. A server is online once all the startup 
is
+   * completed (setting up filesystem, starting executorService threads, 
etc.). This
+   * method is designed mostly to be useful in tests.
+   *
+   * @return true if online, false if not.
+   */
+  public boolean isOnline() {
+    return online.get();
+  }
+
+  protected ReplicationServerRpcServices createRpcServices() throws 
IOException {
+    return new ReplicationServerRpcServices(this);
+  }
+
+  /**
+   * Sets the abort state if not already set.
+   * @return True if abortRequested set to True successfully, false if an 
abort is already in
+   * progress.
+   */
+  protected boolean setAbortRequested() {
+    return abortRequested.compareAndSet(false, true);
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
new file mode 100644
index 0000000..1b9b699
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.QosPriority;
+import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.ipc.RpcServerFactory;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
+import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.NoopAccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
+import org.apache.hadoop.hbase.util.DNS;
+import org.apache.hadoop.hbase.util.DNS.ServerType;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+/**
+ * Implements the regionserver RPC services for {@link HReplicationServer}.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("deprecation")
+public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
+    AdminService.BlockingInterface, PriorityFunction {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(ReplicationServerRpcServices.class);
+
+  /** Parameter name for port replication server listens on. */
+  public static final String REPLICATION_SERVER_PORT = 
"hbase.replicationserver.port";
+
+  /** Default port replication server listens on. */
+  public static final int DEFAULT_REPLICATION_SERVER_PORT = 16040;
+
+  /** default port for replication server web api */
+  public static final int DEFAULT_REPLICATION_SERVER_INFOPORT = 16050;
+
+  // Request counter.
+  final LongAdder requestCount = new LongAdder();
+
+  // Server to handle client requests.
+  final RpcServerInterface rpcServer;
+  final InetSocketAddress isa;
+
+  protected final HReplicationServer replicationServer;
+
+  // The reference to the priority extraction function
+  private final PriorityFunction priority;
+
+  private AccessChecker accessChecker;
+  private ZKPermissionWatcher zkPermissionWatcher;
+
+  public ReplicationServerRpcServices(final HReplicationServer rs) throws 
IOException {
+    final Configuration conf = rs.getConfiguration();
+    replicationServer = rs;
+
+    final RpcSchedulerFactory rpcSchedulerFactory;
+    try {
+      rpcSchedulerFactory = 
getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
+          .getDeclaredConstructor().newInstance();
+    } catch (NoSuchMethodException | InvocationTargetException |
+        InstantiationException | IllegalAccessException e) {
+      throw new IllegalArgumentException(e);
+    }
+    // Server to handle client requests.
+    final InetSocketAddress initialIsa;
+    final InetSocketAddress bindAddress;
+
+    String hostname = DNS.getHostname(conf, ServerType.REPLICATIONSERVER);
+    int port = conf.getInt(REPLICATION_SERVER_PORT, 
DEFAULT_REPLICATION_SERVER_PORT);
+    // Creation of a HSA will force a resolve.
+    initialIsa = new InetSocketAddress(hostname, port);
+    bindAddress = new InetSocketAddress(
+        conf.get("hbase.replicationserver.ipc.address", hostname), port);
+
+    if (initialIsa.getAddress() == null) {
+      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
+    }
+    priority = createPriority();
+    // Using Address means we don't get the IP too. Shorten it more even to 
just the host name
+    // w/o the domain.
+    final String name = rs.getProcessName() + "/" +
+        Address.fromParts(initialIsa.getHostName(), 
initialIsa.getPort()).toStringWithoutDomain();
+    // Set how many times to retry talking to another server over Connection.
+    ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
+    rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
+
+    final InetSocketAddress address = rpcServer.getListenerAddress();
+    if (address == null) {
+      throw new IOException("Listener channel is closed");
+    }
+    // Set our address, however we need the final port that was given to 
rpcServer
+    isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
+    rpcServer.setErrorHandler(this);
+    rs.setName(name);
+  }
+
+  protected RpcServerInterface createRpcServer(
+      final Server server,
+      final RpcSchedulerFactory rpcSchedulerFactory,
+      final InetSocketAddress bindAddress,
+      final String name
+  ) throws IOException {
+    final Configuration conf = server.getConfiguration();
+    boolean reservoirEnabled = 
conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
+    try {
+      return RpcServerFactory.createRpcServer(server, name, getServices(),
+          bindAddress, // use final bindAddress for this server.
+          conf, rpcSchedulerFactory.create(conf, this, server), 
reservoirEnabled);
+    } catch (BindException be) {
+      throw new IOException(be.getMessage() + ". To switch ports use the '"
+          + REPLICATION_SERVER_PORT + "' configuration property.",
+          be.getCause() != null ? be.getCause() : be);
+    }
+  }
+
+  protected Class<?> getRpcSchedulerFactoryClass() {
+    final Configuration conf = replicationServer.getConfiguration();
+    return 
conf.getClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+      SimpleRpcSchedulerFactory.class);
+  }
+
+  public PriorityFunction getPriority() {
+    return priority;
+  }
+
+  public Configuration getConfiguration() {
+    return replicationServer.getConfiguration();
+  }
+
+  void start(ZKWatcher zkWatcher) {
+    if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
+      accessChecker = new AccessChecker(getConfiguration());
+    } else {
+      accessChecker = new NoopAccessChecker(getConfiguration());
+    }
+    if (!getConfiguration().getBoolean("hbase.testing.nocluster", false) && 
zkWatcher != null) {
+      zkPermissionWatcher =
+          new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), 
getConfiguration());
+      try {
+        zkPermissionWatcher.start();
+      } catch (KeeperException e) {
+        LOG.error("ZooKeeper permission watcher initialization failed", e);
+      }
+    }
+    rpcServer.start();
+  }
+
+  void stop() {
+    if (zkPermissionWatcher != null) {
+      zkPermissionWatcher.close();
+    }
+    rpcServer.stop();
+  }
+
+  /**
+   * By default, put up an Admin Service.
+   * @return immutable list of blocking services and the security info classes 
that this server
+   *   supports
+   */
+  protected List<BlockingServiceAndInterface> getServices() {
+    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
+    bssi.add(new BlockingServiceAndInterface(
+      AdminService.newReflectiveBlockingService(this),
+      AdminService.BlockingInterface.class));
+    return new 
ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
+  }
+
+  public InetSocketAddress getSocketAddress() {
+    return isa;
+  }
+
+  @Override
+  public int getPriority(RequestHeader header, Message param, User user) {
+    return priority.getPriority(header, param, user);
+  }
+
+  @Override
+  public long getDeadline(RequestHeader header, Message param) {
+    return priority.getDeadline(header, param);
+  }
+
+  /*
+   * Check if an OOME and, if so, abort immediately to avoid creating more 
objects.
+   *
+   * @param e
+   *
+   * @return True if we OOME'd and are aborting.
+   */
+  @Override
+  public boolean checkOOME(final Throwable e) {
+    return exitIfOOME(e);
+  }
+
+  public static boolean exitIfOOME(final Throwable e) {
+    boolean stop = false;
+    try {
+      if (e instanceof OutOfMemoryError
+          || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
+          || (e.getMessage() != null && e.getMessage().contains(
+              "java.lang.OutOfMemoryError"))) {
+        stop = true;
+        LOG.error(HBaseMarkers.FATAL, "Run out of memory; "
+          + ReplicationServerRpcServices.class.getSimpleName() + " will abort 
itself immediately",
+          e);
+      }
+    } finally {
+      if (stop) {
+        Runtime.getRuntime().halt(1);
+      }
+    }
+    return stop;
+  }
+
+  /**
+   * Called to verify that this server is up and running.
+   */
+  protected void checkOpen() throws IOException {
+    if (replicationServer.isAborted()) {
+      throw new RegionServerAbortedException("Server " + 
replicationServer.getServerName()
+          + " aborting");
+    }
+    if (replicationServer.isStopped()) {
+      throw new RegionServerStoppedException("Server " + 
replicationServer.getServerName()
+          + " stopping");
+    }
+    if (!replicationServer.isOnline()) {
+      throw new ServerNotRunningYetException("Server " + 
replicationServer.getServerName()
+          + " is not running yet");
+    }
+  }
+
+  @Override
+  public GetRegionInfoResponse getRegionInfo(RpcController controller, 
GetRegionInfoRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public GetStoreFileResponse getStoreFile(RpcController controller, 
GetStoreFileRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
+      GetOnlineRegionRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public OpenRegionResponse openRegion(RpcController controller, 
OpenRegionRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public WarmupRegionResponse warmupRegion(RpcController controller, 
WarmupRegionRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public CloseRegionResponse closeRegion(RpcController controller, 
CloseRegionRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public FlushRegionResponse flushRegion(RpcController controller, 
FlushRegionRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public CompactionSwitchResponse compactionSwitch(RpcController controller,
+      CompactionSwitchRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public CompactRegionResponse compactRegion(RpcController controller,
+      CompactRegionRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public ReplicateWALEntryResponse replay(RpcController controller,
+      ReplicateWALEntryRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public RollWALWriterResponse rollWALWriter(RpcController controller, 
RollWALWriterRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public GetServerInfoResponse getServerInfo(RpcController controller, 
GetServerInfoRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  /**
+   * Stop the replication server.
+   *
+   * @param controller the RPC controller
+   * @param request the request
+   */
+  @Override
+  @QosPriority(priority=HConstants.ADMIN_QOS)
+  public StopServerResponse stopServer(final RpcController controller,
+      final StopServerRequest request) {
+    requestCount.increment();
+    String reason = request.getReason();
+    replicationServer.stop(reason);
+    return StopServerResponse.newBuilder().build();
+  }
+
+  @Override
+  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController 
controller,
+      UpdateFavoredNodesRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public UpdateConfigurationResponse updateConfiguration(RpcController 
controller,
+      UpdateConfigurationRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public GetRegionLoadResponse getRegionLoad(RpcController controller,
+      GetRegionLoadRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController 
controller,
+      ClearCompactionQueuesRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController 
controller,
+      ClearRegionBlockCacheRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController 
controller,
+      GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public ExecuteProceduresResponse executeProcedures(RpcController controller,
+      ExecuteProceduresRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public SlowLogResponses getSlowLogResponses(RpcController controller,
+      SlowLogResponseRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public SlowLogResponses getLargeLogResponses(RpcController controller,
+      SlowLogResponseRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  @Override
+  public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller,
+      ClearSlowLogResponseRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's 
Replication Server"));
+  }
+
+  protected AccessChecker getAccessChecker() {
+    return accessChecker;
+  }
+
+  protected PriorityFunction createPriority() {
+    return new PriorityFunction() {
+      @Override
+      public int getPriority(RequestHeader header, Message param, User user) {
+        return 0;
+      }
+
+      @Override
+      public long getDeadline(RequestHeader header, Message param) {
+        return 0;
+      }
+    };
+  }
+
+  @Override
+  public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
+      ReplicateWALEntryRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      if (replicationServer.getReplicationSinkService() != null) {
+        requestCount.increment();
+        List<WALEntry> entries = request.getEntryList();
+        CellScanner cellScanner = ((HBaseRpcController) 
controller).cellScanner();
+        // TODO: CP pre
+        
replicationServer.getReplicationSinkService().replicateLogEntries(entries, 
cellScanner,
+            request.getReplicationClusterId(), 
request.getSourceBaseNamespaceDirPath(),
+            request.getSourceHFileArchiveDirPath());
+        // TODO: CP post
+        return ReplicateWALEntryResponse.newBuilder().build();
+      } else {
+        throw new ServiceException("Replication services are not initialized 
yet");
+      }
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
new file mode 100644
index 0000000..6a0ef3d
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ReplicationTests.class, MediumTests.class})
+public class TestReplicationServer {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestReplicationServer.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationServer.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static Configuration CONF = TEST_UTIL.getConfiguration();
+
+  private static HMaster MASTER;
+
+  private static HReplicationServer replicationServer;
+
+  private static Path baseNamespaceDir;
+  private static Path hfileArchiveDir;
+  private static String replicationClusterId;
+
+  private static int BATCH_SIZE = 10;
+
+  private static TableName TABLENAME = TableName.valueOf("t");
+  private static String FAMILY = "C";
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster();
+    MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
+
+    replicationServer = new HReplicationServer(CONF);
+    replicationServer.start();
+
+    TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
+    TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline());
+
+    Path rootDir = CommonFSUtils.getRootDir(CONF);
+    baseNamespaceDir = new Path(rootDir, new 
Path(HConstants.BASE_NAMESPACE_DIR));
+    hfileArchiveDir = new Path(rootDir, new 
Path(HConstants.HFILE_ARCHIVE_DIRECTORY));
+    replicationClusterId = "12345";
+  }
+
+  @AfterClass
+  public static void afterClass() throws IOException {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws Exception {
+    TEST_UTIL.createTable(TABLENAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLENAME);
+  }
+
+  @After
+  public void after() throws IOException {
+    TEST_UTIL.deleteTableIfAny(TABLENAME);
+  }
+
+  @Test
+  public void testReplicateWAL() throws Exception {
+    AsyncClusterConnection conn = 
TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
+        .getRegionServer().getAsyncClusterConnection();
+    AsyncRegionServerAdmin rsAdmin = 
conn.getRegionServerAdmin(replicationServer.getServerName());
+
+    Entry[] entries = new Entry[BATCH_SIZE];
+    for(int i = 0; i < BATCH_SIZE; i++) {
+      entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i));
+    }
+
+    ReplicationProtbufUtil.replicateWALEntry(rsAdmin, entries, 
replicationClusterId,
+        baseNamespaceDir, hfileArchiveDir, 1000);
+
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      Table table = TEST_UTIL.getConnection().getTable(TABLENAME);
+      Result result = table.get(new Get(Bytes.toBytes(i)));
+      Cell cell = result.getColumnLatestCell(Bytes.toBytes(FAMILY), 
Bytes.toBytes(FAMILY));
+      assertNotNull(cell);
+      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), Bytes.toBytes(i)));
+    }
+  }
+
+  private static WAL.Entry generateEdit(int i, TableName tableName, byte[] 
row) {
+    Threads.sleep(1);
+    long timestamp = System.currentTimeMillis();
+    WALKeyImpl key = new WALKeyImpl(new byte[32], tableName, i, timestamp,
+        HConstants.DEFAULT_CLUSTER_ID, null);
+    WALEdit edit = new WALEdit();
+    edit.add(new KeyValue(row, Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY), 
timestamp, row));
+    return new WAL.Entry(key, edit);
+  }
+}

Reply via email to