This is an automated email from the ASF dual-hosted git repository. sunxin pushed a commit to branch HBASE-24666 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit c9a01b2dab47bfa852e0bb6c66cb48c4d4e6f2bc Author: XinSun <ddu...@gmail.com> AuthorDate: Fri Sep 4 18:53:46 2020 +0800 HBASE-24683 Add a basic ReplicationServer which only implement ReplicationSink Service (#2111) Signed-off-by: Guanghao Zhang <zg...@apache.org> --- .../java/org/apache/hadoop/hbase/util/DNS.java | 3 +- .../hbase/replication/HReplicationServer.java | 391 ++++++++++++++++ .../replication/ReplicationServerRpcServices.java | 516 +++++++++++++++++++++ .../hbase/replication/TestReplicationServer.java | 151 ++++++ 4 files changed, 1060 insertions(+), 1 deletion(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java index 5c23ddc..cf94b2a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java @@ -64,7 +64,8 @@ public final class DNS { public enum ServerType { MASTER("master"), - REGIONSERVER("regionserver"); + REGIONSERVER("regionserver"), + REPLICATIONSERVER("replicationserver"); private String name; ServerType(String name) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java new file mode 100644 index 0000000..31dec0c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java @@ -0,0 +1,391 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.ClusterConnectionFactory; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.regionserver.ReplicationService; +import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hadoop.hbase.util.Sleeper; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HReplicationServer which is responsible to all replication stuff. It checks in with + * the HMaster. There are many HReplicationServers in a single HBase deployment. + */ +@InterfaceAudience.Private +@SuppressWarnings({ "deprecation"}) +public class HReplicationServer extends Thread implements Server { + + private static final Logger LOG = LoggerFactory.getLogger(HReplicationServer.class); + + /** replication server process name */ + public static final String REPLICATION_SERVER = "replicationserver"; + + /** + * This servers start code. + */ + protected final long startCode; + + private volatile boolean stopped = false; + + // Go down hard. Used if file system becomes unavailable and also in + // debugging and unit tests. + private AtomicBoolean abortRequested; + + // flag set after we're done setting up server threads + final AtomicBoolean online = new AtomicBoolean(false); + + /** + * The server name the Master sees us as. Its made from the hostname the + * master passes us, port, and server start code. Gets set after registration + * against Master. + */ + private ServerName serverName; + + protected final Configuration conf; + + private ReplicationSinkService replicationSinkService; + + final int msgInterval; + // A sleeper that sleeps for msgInterval. + protected final Sleeper sleeper; + + // zookeeper connection and watcher + protected final ZKWatcher zooKeeper; + + /** + * The asynchronous cluster connection to be shared by services. + */ + protected AsyncClusterConnection asyncClusterConnection; + + private UserProvider userProvider; + + protected final ReplicationServerRpcServices rpcServices; + + public HReplicationServer(final Configuration conf) throws IOException { + TraceUtil.initTracer(conf); + try { + this.startCode = System.currentTimeMillis(); + this.conf = conf; + + this.abortRequested = new AtomicBoolean(false); + + this.rpcServices = createRpcServices(); + + String hostName = this.rpcServices.isa.getHostName(); + serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startCode); + + this.userProvider = UserProvider.instantiate(conf); + + this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000); + this.sleeper = new Sleeper(this.msgInterval, this); + + // Some unit tests don't need a cluster, so no zookeeper at all + if (!conf.getBoolean("hbase.testing.nocluster", false)) { + // Open connection to zookeeper and set primary watcher + zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + + rpcServices.isa.getPort(), this, false); + } else { + zooKeeper = null; + } + + this.rpcServices.start(zooKeeper); + } catch (Throwable t) { + // Make sure we log the exception. HReplicationServer is often started via reflection and the + // cause of failed startup is lost. + LOG.error("Failed construction ReplicationServer", t); + throw t; + } + } + + public String getProcessName() { + return REPLICATION_SERVER; + } + + @Override + public void run() { + if (isStopped()) { + LOG.info("Skipping run; stopped"); + return; + } + try { + // Do pre-registration initializations; zookeeper, lease threads, etc. + preRegistrationInitialization(); + } catch (Throwable e) { + abort("Fatal exception during initialization", e); + } + try { + setupReplication(); + startReplicationService(); + + online.set(true); + + long lastMsg = System.currentTimeMillis(); + // The main run loop. + while (!isStopped()) { + long now = System.currentTimeMillis(); + if ((now - lastMsg) >= msgInterval) { + lastMsg = System.currentTimeMillis(); + } + if (!isStopped() && !isAborted()) { + this.sleeper.sleep(); + } + } + + stopServiceThreads(); + + if (this.rpcServices != null) { + this.rpcServices.stop(); + } + } catch (Throwable t) { + abort(t.getMessage(), t); + } + + if (this.zooKeeper != null) { + this.zooKeeper.close(); + } + LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); + } + + private Configuration cleanupConfiguration() { + Configuration conf = this.conf; + conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + HConstants.ZK_CONNECTION_REGISTRY_CLASS); + if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { + // Use server ZK cluster for server-issued connections, so we clone + // the conf and unset the client ZK related properties + conf = new Configuration(this.conf); + conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); + } + return conf; + } + + /** + * All initialization needed before we go register with Master.<br> + * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> + * In here we just put up the RpcServer, setup Connection, and ZooKeeper. + */ + private void preRegistrationInitialization() { + try { + setupClusterConnection(); + } catch (Throwable t) { + // Call stop if error or process will stick around for ever since server + // puts up non-daemon threads. + this.rpcServices.stop(); + abort("Initialization of RS failed. Hence aborting RS.", t); + } + } + + /** + * Setup our cluster connection if not already initialized. + */ + protected final synchronized void setupClusterConnection() throws IOException { + if (asyncClusterConnection == null) { + Configuration conf = cleanupConfiguration(); + InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0); + User user = userProvider.getCurrent(); + asyncClusterConnection = + ClusterConnectionFactory.createAsyncClusterConnection(conf, localAddress, user); + } + } + + /** + * Wait on all threads to finish. Presumption is that all closes and stops + * have already been called. + */ + protected void stopServiceThreads() { + if (this.replicationSinkService != null) { + this.replicationSinkService.stopReplicationService(); + } + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZKWatcher getZooKeeper() { + return zooKeeper; + } + + @Override + public Connection getConnection() { + return getAsyncConnection().toConnection(); + } + + @Override + public Connection createConnection(Configuration conf) throws IOException { + throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer.")); + } + + @Override + public AsyncClusterConnection getAsyncClusterConnection() { + return this.asyncClusterConnection; + } + + @Override + public ServerName getServerName() { + return serverName; + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + return null; + } + + @Override + public ChoreService getChoreService() { + return null; + } + + @Override + public void abort(String why, Throwable cause) { + if (!setAbortRequested()) { + // Abort already in progress, ignore the new request. + LOG.debug( + "Abort already in progress. Ignoring the current request with reason: {}", why); + return; + } + String msg = "***** ABORTING replication server " + this + ": " + why + " *****"; + if (cause != null) { + LOG.error(HBaseMarkers.FATAL, msg, cause); + } else { + LOG.error(HBaseMarkers.FATAL, msg); + } + stop(why); + } + + @Override + public boolean isAborted() { + return abortRequested.get(); + } + + @Override + public void stop(final String msg) { + if (!this.stopped) { + LOG.info("***** STOPPING region server '" + this + "' *****"); + this.stopped = true; + LOG.info("STOPPED: " + msg); + // Wakes run() if it is sleeping + sleeper.skipSleepCycle(); + } + } + + @Override + public boolean isStopped() { + return this.stopped; + } + + /** + * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to + * be hooked up to WAL. + */ + private void setupReplication() throws IOException { + // Instantiate replication if replication enabled. Pass it the log directories. + createNewReplicationInstance(conf, this); + } + + /** + * Load the replication executorService objects, if any + */ + private static void createNewReplicationInstance(Configuration conf, HReplicationServer server) + throws IOException { + // read in the name of the sink replication class from the config file. + String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, + HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); + + server.replicationSinkService = newReplicationInstance(sinkClassname, + ReplicationSinkService.class, conf, server); + } + + private static <T extends ReplicationService> T newReplicationInstance(String classname, + Class<T> xface, Configuration conf, HReplicationServer server) throws IOException { + final Class<? extends T> clazz; + try { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + clazz = Class.forName(classname, true, classLoader).asSubclass(xface); + } catch (java.lang.ClassNotFoundException nfe) { + throw new IOException("Could not find class for " + classname); + } + T service = ReflectionUtils.newInstance(clazz, conf); + service.initialize(server, null, null, null, null); + return service; + } + + /** + * Start up replication source and sink handlers. + */ + private void startReplicationService() throws IOException { + if (this.replicationSinkService != null) { + this.replicationSinkService.startReplicationService(); + } + } + + /** + * @return Return the object that implements the replication sink executorService. + */ + public ReplicationSinkService getReplicationSinkService() { + return replicationSinkService; + } + + /** + * Report the status of the server. A server is online once all the startup is + * completed (setting up filesystem, starting executorService threads, etc.). This + * method is designed mostly to be useful in tests. + * + * @return true if online, false if not. + */ + public boolean isOnline() { + return online.get(); + } + + protected ReplicationServerRpcServices createRpcServices() throws IOException { + return new ReplicationServerRpcServices(this); + } + + /** + * Sets the abort state if not already set. + * @return True if abortRequested set to True successfully, false if an abort is already in + * progress. + */ + protected boolean setAbortRequested() { + return abortRequested.compareAndSet(false, true); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java new file mode 100644 index 0000000..1b9b699 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java @@ -0,0 +1,516 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.QosPriority; +import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.ipc.RpcServerFactory; +import org.apache.hadoop.hbase.ipc.RpcServerInterface; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; +import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.access.AccessChecker; +import org.apache.hadoop.hbase.security.access.NoopAccessChecker; +import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; +import org.apache.hadoop.hbase.util.DNS; +import org.apache.hadoop.hbase.util.DNS.ServerType; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +/** + * Implements the regionserver RPC services for {@link HReplicationServer}. + */ +@InterfaceAudience.Private +@SuppressWarnings("deprecation") +public class ReplicationServerRpcServices implements HBaseRPCErrorHandler, + AdminService.BlockingInterface, PriorityFunction { + + protected static final Logger LOG = LoggerFactory.getLogger(ReplicationServerRpcServices.class); + + /** Parameter name for port replication server listens on. */ + public static final String REPLICATION_SERVER_PORT = "hbase.replicationserver.port"; + + /** Default port replication server listens on. */ + public static final int DEFAULT_REPLICATION_SERVER_PORT = 16040; + + /** default port for replication server web api */ + public static final int DEFAULT_REPLICATION_SERVER_INFOPORT = 16050; + + // Request counter. + final LongAdder requestCount = new LongAdder(); + + // Server to handle client requests. + final RpcServerInterface rpcServer; + final InetSocketAddress isa; + + protected final HReplicationServer replicationServer; + + // The reference to the priority extraction function + private final PriorityFunction priority; + + private AccessChecker accessChecker; + private ZKPermissionWatcher zkPermissionWatcher; + + public ReplicationServerRpcServices(final HReplicationServer rs) throws IOException { + final Configuration conf = rs.getConfiguration(); + replicationServer = rs; + + final RpcSchedulerFactory rpcSchedulerFactory; + try { + rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) + .getDeclaredConstructor().newInstance(); + } catch (NoSuchMethodException | InvocationTargetException | + InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException(e); + } + // Server to handle client requests. + final InetSocketAddress initialIsa; + final InetSocketAddress bindAddress; + + String hostname = DNS.getHostname(conf, ServerType.REPLICATIONSERVER); + int port = conf.getInt(REPLICATION_SERVER_PORT, DEFAULT_REPLICATION_SERVER_PORT); + // Creation of a HSA will force a resolve. + initialIsa = new InetSocketAddress(hostname, port); + bindAddress = new InetSocketAddress( + conf.get("hbase.replicationserver.ipc.address", hostname), port); + + if (initialIsa.getAddress() == null) { + throw new IllegalArgumentException("Failed resolve of " + initialIsa); + } + priority = createPriority(); + // Using Address means we don't get the IP too. Shorten it more even to just the host name + // w/o the domain. + final String name = rs.getProcessName() + "/" + + Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain(); + // Set how many times to retry talking to another server over Connection. + ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG); + rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name); + + final InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + // Set our address, however we need the final port that was given to rpcServer + isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); + rpcServer.setErrorHandler(this); + rs.setName(name); + } + + protected RpcServerInterface createRpcServer( + final Server server, + final RpcSchedulerFactory rpcSchedulerFactory, + final InetSocketAddress bindAddress, + final String name + ) throws IOException { + final Configuration conf = server.getConfiguration(); + boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); + try { + return RpcServerFactory.createRpcServer(server, name, getServices(), + bindAddress, // use final bindAddress for this server. + conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); + } catch (BindException be) { + throw new IOException(be.getMessage() + ". To switch ports use the '" + + REPLICATION_SERVER_PORT + "' configuration property.", + be.getCause() != null ? be.getCause() : be); + } + } + + protected Class<?> getRpcSchedulerFactoryClass() { + final Configuration conf = replicationServer.getConfiguration(); + return conf.getClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + SimpleRpcSchedulerFactory.class); + } + + public PriorityFunction getPriority() { + return priority; + } + + public Configuration getConfiguration() { + return replicationServer.getConfiguration(); + } + + void start(ZKWatcher zkWatcher) { + if (AccessChecker.isAuthorizationSupported(getConfiguration())) { + accessChecker = new AccessChecker(getConfiguration()); + } else { + accessChecker = new NoopAccessChecker(getConfiguration()); + } + if (!getConfiguration().getBoolean("hbase.testing.nocluster", false) && zkWatcher != null) { + zkPermissionWatcher = + new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration()); + try { + zkPermissionWatcher.start(); + } catch (KeeperException e) { + LOG.error("ZooKeeper permission watcher initialization failed", e); + } + } + rpcServer.start(); + } + + void stop() { + if (zkPermissionWatcher != null) { + zkPermissionWatcher.close(); + } + rpcServer.stop(); + } + + /** + * By default, put up an Admin Service. + * @return immutable list of blocking services and the security info classes that this server + * supports + */ + protected List<BlockingServiceAndInterface> getServices() { + List<BlockingServiceAndInterface> bssi = new ArrayList<>(); + bssi.add(new BlockingServiceAndInterface( + AdminService.newReflectiveBlockingService(this), + AdminService.BlockingInterface.class)); + return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); + } + + public InetSocketAddress getSocketAddress() { + return isa; + } + + @Override + public int getPriority(RequestHeader header, Message param, User user) { + return priority.getPriority(header, param, user); + } + + @Override + public long getDeadline(RequestHeader header, Message param) { + return priority.getDeadline(header, param); + } + + /* + * Check if an OOME and, if so, abort immediately to avoid creating more objects. + * + * @param e + * + * @return True if we OOME'd and are aborting. + */ + @Override + public boolean checkOOME(final Throwable e) { + return exitIfOOME(e); + } + + public static boolean exitIfOOME(final Throwable e) { + boolean stop = false; + try { + if (e instanceof OutOfMemoryError + || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError) + || (e.getMessage() != null && e.getMessage().contains( + "java.lang.OutOfMemoryError"))) { + stop = true; + LOG.error(HBaseMarkers.FATAL, "Run out of memory; " + + ReplicationServerRpcServices.class.getSimpleName() + " will abort itself immediately", + e); + } + } finally { + if (stop) { + Runtime.getRuntime().halt(1); + } + } + return stop; + } + + /** + * Called to verify that this server is up and running. + */ + protected void checkOpen() throws IOException { + if (replicationServer.isAborted()) { + throw new RegionServerAbortedException("Server " + replicationServer.getServerName() + + " aborting"); + } + if (replicationServer.isStopped()) { + throw new RegionServerStoppedException("Server " + replicationServer.getServerName() + + " stopping"); + } + if (!replicationServer.isOnline()) { + throw new ServerNotRunningYetException("Server " + replicationServer.getServerName() + + " is not running yet"); + } + } + + @Override + public GetRegionInfoResponse getRegionInfo(RpcController controller, GetRegionInfoRequest request) + throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request) + throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public GetOnlineRegionResponse getOnlineRegion(RpcController controller, + GetOnlineRegionRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request) + throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public WarmupRegionResponse warmupRegion(RpcController controller, WarmupRegionRequest request) + throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request) + throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request) + throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public CompactionSwitchResponse compactionSwitch(RpcController controller, + CompactionSwitchRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public CompactRegionResponse compactRegion(RpcController controller, + CompactRegionRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public ReplicateWALEntryResponse replay(RpcController controller, + ReplicateWALEntryRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public RollWALWriterResponse rollWALWriter(RpcController controller, RollWALWriterRequest request) + throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public GetServerInfoResponse getServerInfo(RpcController controller, GetServerInfoRequest request) + throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + /** + * Stop the replication server. + * + * @param controller the RPC controller + * @param request the request + */ + @Override + @QosPriority(priority=HConstants.ADMIN_QOS) + public StopServerResponse stopServer(final RpcController controller, + final StopServerRequest request) { + requestCount.increment(); + String reason = request.getReason(); + replicationServer.stop(reason); + return StopServerResponse.newBuilder().build(); + } + + @Override + public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, + UpdateFavoredNodesRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public UpdateConfigurationResponse updateConfiguration(RpcController controller, + UpdateConfigurationRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public GetRegionLoadResponse getRegionLoad(RpcController controller, + GetRegionLoadRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, + ClearCompactionQueuesRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, + ClearRegionBlockCacheRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, + GetSpaceQuotaSnapshotsRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public ExecuteProceduresResponse executeProcedures(RpcController controller, + ExecuteProceduresRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public SlowLogResponses getSlowLogResponses(RpcController controller, + SlowLogResponseRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public SlowLogResponses getLargeLogResponses(RpcController controller, + SlowLogResponseRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + @Override + public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller, + ClearSlowLogResponseRequest request) throws ServiceException { + throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); + } + + protected AccessChecker getAccessChecker() { + return accessChecker; + } + + protected PriorityFunction createPriority() { + return new PriorityFunction() { + @Override + public int getPriority(RequestHeader header, Message param, User user) { + return 0; + } + + @Override + public long getDeadline(RequestHeader header, Message param) { + return 0; + } + }; + } + + @Override + public ReplicateWALEntryResponse replicateWALEntry(RpcController controller, + ReplicateWALEntryRequest request) throws ServiceException { + try { + checkOpen(); + if (replicationServer.getReplicationSinkService() != null) { + requestCount.increment(); + List<WALEntry> entries = request.getEntryList(); + CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner(); + // TODO: CP pre + replicationServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner, + request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), + request.getSourceHFileArchiveDirPath()); + // TODO: CP post + return ReplicateWALEntryResponse.newBuilder().build(); + } else { + throw new ServiceException("Replication services are not initialized yet"); + } + } catch (IOException ie) { + throw new ServiceException(ie); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java new file mode 100644 index 0000000..6a0ef3d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ReplicationTests.class, MediumTests.class}) +public class TestReplicationServer { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationServer.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServer.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static Configuration CONF = TEST_UTIL.getConfiguration(); + + private static HMaster MASTER; + + private static HReplicationServer replicationServer; + + private static Path baseNamespaceDir; + private static Path hfileArchiveDir; + private static String replicationClusterId; + + private static int BATCH_SIZE = 10; + + private static TableName TABLENAME = TableName.valueOf("t"); + private static String FAMILY = "C"; + + @BeforeClass + public static void beforeClass() throws Exception { + TEST_UTIL.startMiniCluster(); + MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster(); + + replicationServer = new HReplicationServer(CONF); + replicationServer.start(); + + TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(); + TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline()); + + Path rootDir = CommonFSUtils.getRootDir(CONF); + baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)); + hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)); + replicationClusterId = "12345"; + } + + @AfterClass + public static void afterClass() throws IOException { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void before() throws Exception { + TEST_UTIL.createTable(TABLENAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLENAME); + } + + @After + public void after() throws IOException { + TEST_UTIL.deleteTableIfAny(TABLENAME); + } + + @Test + public void testReplicateWAL() throws Exception { + AsyncClusterConnection conn = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0) + .getRegionServer().getAsyncClusterConnection(); + AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(replicationServer.getServerName()); + + Entry[] entries = new Entry[BATCH_SIZE]; + for(int i = 0; i < BATCH_SIZE; i++) { + entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i)); + } + + ReplicationProtbufUtil.replicateWALEntry(rsAdmin, entries, replicationClusterId, + baseNamespaceDir, hfileArchiveDir, 1000); + + for (int i = 0; i < BATCH_SIZE; i++) { + Table table = TEST_UTIL.getConnection().getTable(TABLENAME); + Result result = table.get(new Get(Bytes.toBytes(i))); + Cell cell = result.getColumnLatestCell(Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY)); + assertNotNull(cell); + assertTrue(Bytes.equals(CellUtil.cloneValue(cell), Bytes.toBytes(i))); + } + } + + private static WAL.Entry generateEdit(int i, TableName tableName, byte[] row) { + Threads.sleep(1); + long timestamp = System.currentTimeMillis(); + WALKeyImpl key = new WALKeyImpl(new byte[32], tableName, i, timestamp, + HConstants.DEFAULT_CLUSTER_ID, null); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(row, Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY), timestamp, row)); + return new WAL.Entry(key, edit); + } +}