This is an automated email from the ASF dual-hosted git repository. stack pushed a commit to branch HBASE-18070 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 4d0ab6eb5f03eea0e900b46b69e49f915e6eb649 Author: stack <st...@apache.org> AuthorDate: Fri Sep 18 17:29:23 2020 -0700 HBASE-25068 Pass WALFactory to Replication so it knows of all WALProviders, not just default/user-space Pass WALFactory to Replication instead of WALProvider. WALFactory has all WALProviders in it, not just the user-space WALProvider. Do this so ReplicationService has access to all WALProviders in the Server (To be exploited by the follow-on patch in HBASE-25055) --- .../apache/hadoop/hbase/regionserver/HRegionServer.java | 15 +++++++-------- .../hadoop/hbase/regionserver/ReplicationService.java | 11 ++++------- .../hbase/replication/regionserver/Replication.java | 7 ++++--- .../hbase/replication/regionserver/ReplicationSyncUp.java | 6 ++++-- .../hadoop/hbase/replication/TestReplicationBase.java | 2 +- .../regionserver/TestReplicationSourceManager.java | 3 ++- 6 files changed, 22 insertions(+), 22 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 74aec58..911deea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1943,8 +1943,7 @@ public class HRegionServer extends Thread implements throw new IOException("Can not create wal directory " + logDir); } // Instantiate replication if replication enabled. Pass it the log directories. - createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, - factory.getWALProvider()); + createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); } this.walFactory = factory; } @@ -3096,7 +3095,7 @@ public class HRegionServer extends Thread implements * Load the replication executorService objects, if any */ private static void createNewReplicationInstance(Configuration conf, HRegionServer server, - FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException { + FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException { // read in the name of the source replication class from the config file. String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); @@ -3109,21 +3108,21 @@ public class HRegionServer extends Thread implements // only one object. if (sourceClassname.equals(sinkClassname)) { server.replicationSourceHandler = newReplicationInstance(sourceClassname, - ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); + ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; server.sameReplicationSourceAndSink = true; } else { server.replicationSourceHandler = newReplicationInstance(sourceClassname, - ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); + ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); server.replicationSinkHandler = newReplicationInstance(sinkClassname, - ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider); + ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory); server.sameReplicationSourceAndSink = false; } } private static <T extends ReplicationService> T newReplicationInstance(String classname, Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, - Path oldLogDir, WALProvider walProvider) throws IOException { + Path oldLogDir, WALFactory walFactory) throws IOException { final Class<? extends T> clazz; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); @@ -3132,7 +3131,7 @@ public class HRegionServer extends Thread implements throw new IOException("Could not find class for " + classname); } T service = ReflectionUtils.newInstance(clazz, conf); - service.initialize(server, walFs, logDir, oldLogDir, walProvider); + service.initialize(server, walFs, logDir, oldLogDir, walFactory); return service; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index e9bbaea..33b3321 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; -import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.yetus.audience.InterfaceAudience; /** @@ -32,14 +32,11 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private public interface ReplicationService { - /** * Initializes the replication service object. - * @param walProvider can be null if not initialized inside a live region server environment, for - * example, {@code ReplicationSyncUp}. */ - void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider) - throws IOException; + void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALFactory walFactory) + throws IOException; /** * Start replication services. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 9be7b9a..58206e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.yetus.audience.InterfaceAudience; @@ -81,7 +82,7 @@ public class Replication implements ReplicationSourceService { @Override public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, - WALProvider walProvider) throws IOException { + WALFactory walFactory) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); this.isReplicationForBulkLoadDataEnabled = @@ -115,6 +116,7 @@ public class Replication implements ReplicationSourceService { SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager(); this.globalMetricsSource = CompatibilitySingletonFactory .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); + WALProvider walProvider = walFactory.getWALProvider(); this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), @@ -165,7 +167,6 @@ public class Replication implements ReplicationSourceService { /** * If replication is enabled and this cluster is a master, * it starts - * @throws IOException */ @Override public void startReplicationService() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index 98490f1..b04c7eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -82,7 +83,8 @@ public class ReplicationSyncUp extends Configured implements Tool { System.out.println("Start Replication Server start"); Replication replication = new Replication(); - replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null); + replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, + new WALFactory(conf, "test", false)); ReplicationSourceManager manager = replication.getReplicationManager(); manager.init().get(); while (manager.activeFailoverTaskCount() > 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 6e1692a..455b272 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 8e38114..4abb00f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -194,7 +194,8 @@ public abstract class TestReplicationSourceManager { logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME); remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME); replication = new Replication(); - replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null); + replication.initialize(new DummyServer(), fs, logDir, oldLogDir, + new WALFactory(conf, "test", false)); managerOfCluster = getManagerFromCluster(); if (managerOfCluster != null) { // After replication procedure, we need to add peer by hand (other than by receiving