This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 63f1580  HBASE-25671 Backport HBASE-25608 to branch-2 (#3058)
63f1580 is described below

commit 63f158001537adfe655796526fe85d2a27ab5e3d
Author: bitterfox <bitterf...@gmail.com>
AuthorDate: Fri Mar 19 04:00:25 2021 +0900

    HBASE-25671 Backport HBASE-25608 to branch-2 (#3058)
    
    Signed-off-by: stack <st...@apache.org>
---
 .../hadoop/hbase/mapreduce/HFileOutputFormat2.java |  72 +++++++-
 .../hbase/mapreduce/TestHFileOutputFormat2.java    | 192 +++++++++++++++++++++
 2 files changed, 263 insertions(+), 1 deletion(-)

diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index d9ba4bd..02b5768 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -176,6 +176,13 @@ public class HFileOutputFormat2
   static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
           "hbase.mapreduce.use.multi.table.hfileoutputformat";
 
+  public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
+    "hbase.hfileoutputformat.remote.cluster.zookeeper.quorum";
+  public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY =
+    "hbase.hfileoutputformat.remote.cluster.zookeeper." + 
HConstants.CLIENT_PORT_STR;
+  public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY =
+    "hbase.hfileoutputformat.remote.cluster." + 
HConstants.ZOOKEEPER_ZNODE_PARENT;
+
   public static final String STORAGE_POLICY_PROPERTY = 
HStore.BLOCK_STORAGE_POLICY_KEY;
   public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = 
STORAGE_POLICY_PROPERTY + ".";
 
@@ -288,7 +295,8 @@ public class HFileOutputFormat2
 
             String tableName = Bytes.toString(tableNameBytes);
             if (tableName != null) {
-              try (Connection connection = 
ConnectionFactory.createConnection(conf);
+              try (Connection connection = ConnectionFactory.createConnection(
+                createRemoteClusterConf(conf));
                      RegionLocator locator =
                        
connection.getRegionLocator(TableName.valueOf(tableName))) {
                 loc = locator.getRegionLocation(rowKey);
@@ -358,6 +366,22 @@ public class HFileOutputFormat2
         wl.written = 0;
       }
 
+      private Configuration createRemoteClusterConf(Configuration conf) {
+        final Configuration newConf = new Configuration(conf);
+
+        final String quorum = 
conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY);
+        final String clientPort = 
conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY);
+        final String parent = 
conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY);
+
+        if (quorum != null && clientPort != null && parent != null) {
+          newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
+          newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 
Integer.parseInt(clientPort));
+          newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent);
+        }
+
+        return newConf;
+      }
+
       /*
        * Create a new StoreFile.Writer.
        * @return A WriterLength, containing a new StoreFile.Writer.
@@ -536,6 +560,7 @@ public class HFileOutputFormat2
    *   <li>Sets the output key/value class to match HFileOutputFormat2's 
requirements</li>
    *   <li>Sets the reducer up to perform the appropriate sorting (either 
KeyValueSortReducer or
    *     PutSortReducer)</li>
+   *   <li>Sets the HBase cluster key to load region locations for 
locality-sensitive</li>
    * </ul>
    * The user should be sure to set the map output value class to either 
KeyValue or Put before
    * running this function.
@@ -543,6 +568,7 @@ public class HFileOutputFormat2
   public static void configureIncrementalLoad(Job job, Table table, 
RegionLocator regionLocator)
       throws IOException {
     configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
+    configureRemoteCluster(job, table.getConfiguration());
   }
 
   /**
@@ -673,6 +699,50 @@ public class HFileOutputFormat2
   }
 
   /**
+   * Configure HBase cluster key for remote cluster to load region location 
for locality-sensitive
+   * if it's enabled.
+   * It's not necessary to call this method explicitly when the cluster key 
for HBase cluster to be
+   * used to load region location is configured in the job configuration.
+   * Call this method when another HBase cluster key is configured in the job 
configuration.
+   * For example, you should call when you load data from HBase cluster A using
+   * {@link TableInputFormat} and generate hfiles for HBase cluster B.
+   * Otherwise, HFileOutputFormat2 fetch location from cluster A and 
locality-sensitive won't
+   * working correctly.
+   * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this 
method using
+   * {@link Table#getConfiguration} as clusterConf.
+   * See HBASE-25608.
+   *
+   * @param job which has configuration to be updated
+   * @param clusterConf which contains cluster key of the HBase cluster to be 
locality-sensitive
+   *
+   * @see #configureIncrementalLoad(Job, Table, RegionLocator)
+   * @see #LOCALITY_SENSITIVE_CONF_KEY
+   * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY
+   * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
+   * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
+   */
+  public static void configureRemoteCluster(Job job, Configuration 
clusterConf) {
+    Configuration conf = job.getConfiguration();
+
+    if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, 
DEFAULT_LOCALITY_SENSITIVE)) {
+      return;
+    }
+
+    final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM);
+    final int clientPort = clusterConf.getInt(
+      HConstants.ZOOKEEPER_CLIENT_PORT, 
HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
+    final String parent = clusterConf.get(
+      HConstants.ZOOKEEPER_ZNODE_PARENT, 
HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+
+    conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum);
+    conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
+    conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
+
+    LOG.info("ZK configs for remote cluster of bulkload is configured: " +
+      quorum + ":" + clientPort + "/" + parent);
+  }
+
+  /**
    * Runs inside the task to deserialize column family to compression algorithm
    * map from the configuration.
    *
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 3d7ce19..f94df25 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import static 
org.apache.hadoop.hbase.client.ConnectionFactory.createConnection;
 import static 
org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -35,7 +36,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
@@ -59,19 +64,25 @@ import org.apache.hadoop.hbase.HadoopShims;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PerformanceEvaluation;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Hbck;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableBuilder;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -85,6 +96,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
 import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
@@ -1555,5 +1567,185 @@ public class TestHFileOutputFormat2  {
     }
 
   }
+
+  @Test
+  public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception 
{
+    // Start cluster A
+    util = new HBaseTestingUtility();
+    Configuration confA = util.getConfiguration();
+    int hostCount = 3;
+    int regionNum = 20;
+    String[] hostnames = new String[hostCount];
+    for (int i = 0; i < hostCount; ++i) {
+      hostnames[i] = "datanode_" + i;
+    }
+    StartMiniClusterOption option = StartMiniClusterOption.builder()
+      .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
+    util.startMiniCluster(option);
+
+    // Start cluster B
+    HBaseTestingUtility utilB = new HBaseTestingUtility();
+    Configuration confB = utilB.getConfiguration();
+    utilB.startMiniCluster(option);
+
+    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
+
+    byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
+    TableName tableName = TableName.valueOf("table");
+    // Create table in cluster B
+    try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys);
+      RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
+      // Generate the bulk load files
+      // Job has zookeeper configuration for cluster A
+      // Assume reading from cluster A by TableInputFormat and creating hfiles 
to cluster B
+      Job job = new Job(confA, "testLocalMRIncrementalLoad");
+      Configuration jobConf = job.getConfiguration();
+      final UUID key = 
ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
+      
job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
+      setupRandomGeneratorMapper(job, false);
+      HFileOutputFormat2.configureIncrementalLoad(job, table, r);
+
+      assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
+        
jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY));
+      assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
+        
jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY));
+      assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
+        
jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY));
+
+      FileOutputFormat.setOutputPath(job, testDir);
+
+      assertFalse(util.getTestFileSystem().exists(testDir));
+
+      assertTrue(job.waitForCompletion(true));
+
+      final List<Configuration> configs =
+        ConfigurationCaptorConnection.getCapturedConfigarutions(key);
+
+      assertFalse(configs.isEmpty());
+      for (Configuration config : configs) {
+        assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
+          config.get(HConstants.ZOOKEEPER_QUORUM));
+        assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
+          config.get(HConstants.ZOOKEEPER_CLIENT_PORT));
+        assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
+          config.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+      }
+    } finally {
+      utilB.deleteTable(tableName);
+      testDir.getFileSystem(confA).delete(testDir, true);
+      util.shutdownMiniCluster();
+      utilB.shutdownMiniCluster();
+    }
+  }
+
+  private static class ConfigurationCaptorConnection implements Connection {
+    private static final String UUID_KEY = 
"ConfigurationCaptorConnection.uuid";
+
+    private static final Map<UUID, List<Configuration>> confs = new 
ConcurrentHashMap<>();
+
+    private final Connection delegate;
+
+    public ConfigurationCaptorConnection(Configuration conf, ExecutorService 
es, User user)
+      throws IOException {
+      Configuration confForDelegate = new Configuration(conf);
+      confForDelegate.unset(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL);
+      delegate = createConnection(confForDelegate, es, user);
+
+      final String uuid = conf.get(UUID_KEY);
+      if (uuid != null) {
+        confs.computeIfAbsent(UUID.fromString(uuid), u -> new 
CopyOnWriteArrayList<>()).add(conf);
+      }
+    }
+
+    static UUID configureConnectionImpl(Configuration conf) {
+      conf.setClass(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
+        ConfigurationCaptorConnection.class, Connection.class);
+
+      final UUID uuid = UUID.randomUUID();
+      conf.set(UUID_KEY, uuid.toString());
+      return uuid;
+    }
+
+    static List<Configuration> getCapturedConfigarutions(UUID key) {
+      return confs.get(key);
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return delegate.getConfiguration();
+    }
+
+    @Override
+    public Table getTable(TableName tableName) throws IOException {
+      return delegate.getTable(tableName);
+    }
+
+    @Override
+    public Table getTable(TableName tableName, ExecutorService pool) throws 
IOException {
+      return delegate.getTable(tableName, pool);
+    }
+
+    @Override
+    public BufferedMutator getBufferedMutator(TableName tableName) throws 
IOException {
+      return delegate.getBufferedMutator(tableName);
+    }
+
+    @Override
+    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) 
throws IOException {
+      return delegate.getBufferedMutator(params);
+    }
+
+    @Override
+    public RegionLocator getRegionLocator(TableName tableName) throws 
IOException {
+      return delegate.getRegionLocator(tableName);
+    }
+
+    @Override
+    public void clearRegionLocationCache() {
+      delegate.clearRegionLocationCache();
+    }
+
+    @Override
+    public Admin getAdmin() throws IOException {
+      return delegate.getAdmin();
+    }
+
+    @Override
+    public void close() throws IOException {
+      delegate.close();
+    }
+
+    @Override
+    public boolean isClosed() {
+      return delegate.isClosed();
+    }
+
+    @Override
+    public TableBuilder getTableBuilder(TableName tableName, ExecutorService 
pool) {
+      return delegate.getTableBuilder(tableName, pool);
+    }
+
+    @Override
+    public Hbck getHbck()
+      throws IOException {
+      return delegate.getHbck();
+    }
+
+    @Override
+    public Hbck getHbck(ServerName masterServer) throws IOException {
+      return delegate.getHbck(masterServer);
+    }
+
+    @Override
+    public void abort(String why, Throwable e) {
+      delegate.abort(why, e);
+    }
+
+    @Override
+    public boolean isAborted() {
+      return delegate.isAborted();
+    }
+  }
+
 }
 

Reply via email to