This is an automated email from the ASF dual-hosted git repository.

pankajkumar pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 4307f21bcbc HBASE-29473 Obtain target cluster's token for cross 
clusters job (#7198)
4307f21bcbc is described below

commit 4307f21bcbc30fd7c36460d3fb81b575a122a41e
Author: mokai <[email protected]>
AuthorDate: Tue Aug 19 00:06:33 2025 +0800

    HBASE-29473 Obtain target cluster's token for cross clusters job (#7198)
    
    Signed-off-by: Nihal Jain <[email protected]>
    Signed-off-by: Junegunn Choi <[email protected]>
    Signed-off-by: Pankaj Kumar <[email protected]>
    Reviewed-by: chaijunjie <[email protected]>
---
 .../hadoop/hbase/mapreduce/HFileOutputFormat2.java |   4 +-
 .../TestHFileOutputFormat2WithSecurity.java        | 132 +++++++++++++++++++++
 .../hbase/mapreduce/TestTableMapReduceUtil.java    |  46 ++-----
 .../apache/hadoop/hbase/HBaseTestingUtility.java   |  44 +++++++
 4 files changed, 186 insertions(+), 40 deletions(-)

diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 3f9da73aad7..dbd6421aa7c 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -788,7 +788,7 @@ public class HFileOutputFormat2 extends 
FileOutputFormat<ImmutableBytesWritable,
    * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
    * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
    */
-  public static void configureRemoteCluster(Job job, Configuration 
clusterConf) {
+  public static void configureRemoteCluster(Job job, Configuration 
clusterConf) throws IOException {
     Configuration conf = job.getConfiguration();
 
     if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, 
DEFAULT_LOCALITY_SENSITIVE)) {
@@ -805,6 +805,8 @@ public class HFileOutputFormat2 extends 
FileOutputFormat<ImmutableBytesWritable,
     conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
     conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
 
+    TableMapReduceUtil.initCredentialsForCluster(job, clusterConf);
+
     LOG.info("ZK configs for remote cluster of bulkload is configured: " + 
quorum + ":" + clientPort
       + "/" + parent);
   }
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2WithSecurity.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2WithSecurity.java
new file mode 100644
index 00000000000..b4cb6a8355f
--- /dev/null
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2WithSecurity.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static 
org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests for {@link HFileOutputFormat2} with secure mode.
+ */
+@Category({ VerySlowMapReduceTests.class, LargeTests.class })
+public class TestHFileOutputFormat2WithSecurity {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestHFileOutputFormat2WithSecurity.class);
+
+  private static final byte[] FAMILIES = Bytes.toBytes("test_cf");
+
+  private static final String HTTP_PRINCIPAL = "HTTP/localhost";
+
+  private HBaseTestingUtility utilA;
+
+  private Configuration confA;
+
+  private HBaseTestingUtility utilB;
+
+  private MiniKdc kdc;
+
+  private List<Closeable> clusters = new ArrayList<>();
+
+  @Before
+  public void setupSecurityClusters() throws Exception {
+    utilA = new HBaseTestingUtility();
+    confA = utilA.getConfiguration();
+
+    utilB = new HBaseTestingUtility();
+
+    // Prepare security configs.
+    File keytab = new File(utilA.getDataTestDir("keytab").toUri().getPath());
+    kdc = utilA.setupMiniKdc(keytab);
+    String username = UserGroupInformation.getLoginUser().getShortUserName();
+    String userPrincipal = username + "/localhost";
+    kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
+    loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), 
keytab.getAbsolutePath());
+
+    // Start security clusterA
+    clusters.add(utilA.startSecureMiniCluster(kdc, userPrincipal, 
HTTP_PRINCIPAL));
+
+    // Start security clusterB
+    clusters.add(utilB.startSecureMiniCluster(kdc, userPrincipal, 
HTTP_PRINCIPAL));
+  }
+
+  @After
+  public void teardownSecurityClusters() {
+    IOUtils.closeQuietly(clusters);
+    clusters.clear();
+    if (kdc != null) {
+      kdc.stop();
+    }
+  }
+
+  @Test
+  public void testIncrementalLoadInMultiClusterWithSecurity() throws Exception 
{
+    TableName tableName = 
TableName.valueOf("testIncrementalLoadInMultiClusterWithSecurity");
+
+    // Create table in clusterB
+    try (Table table = utilB.createTable(tableName, FAMILIES);
+      RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
+
+      // Create job in clusterA
+      Job job = Job.getInstance(confA, 
"testIncrementalLoadInMultiClusterWithSecurity");
+      job.setWorkingDirectory(
+        
utilA.getDataTestDirOnTestFS("testIncrementalLoadInMultiClusterWithSecurity"));
+      job.setInputFormatClass(NMapInputFormat.class);
+      
job.setMapperClass(TestHFileOutputFormat2.RandomKVGeneratingMapper.class);
+      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+      job.setMapOutputValueClass(KeyValue.class);
+      HFileOutputFormat2.configureIncrementalLoad(job, table, r);
+
+      assertEquals(2, job.getCredentials().getAllTokens().size());
+
+      String remoteClusterId = 
utilB.getHBaseClusterInterface().getClusterMetrics().getClusterId();
+      assertTrue(job.getCredentials().getToken(new Text(remoteClusterId)) != 
null);
+    } finally {
+      if (utilB.getAdmin().tableExists(tableName)) {
+        utilB.deleteTable(tableName);
+      }
+    }
+  }
+}
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
index 3b7392b3ae4..f661025ac06 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
@@ -29,15 +29,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
-import org.apache.hadoop.hbase.security.access.AccessController;
-import org.apache.hadoop.hbase.security.access.PermissionStorage;
-import org.apache.hadoop.hbase.security.access.SecureTestUtil;
 import 
org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
-import org.apache.hadoop.hbase.security.token.TokenProvider;
-import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
 import org.apache.hadoop.hbase.testclassification.MapReduceTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -48,7 +41,6 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.junit.After;
@@ -134,33 +126,6 @@ public class TestTableMapReduceUtil {
     assertEquals("Table", 
job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
   }
 
-  private static Closeable startSecureMiniCluster(HBaseTestingUtility util, 
MiniKdc kdc,
-    String principal) throws Exception {
-    Configuration conf = util.getConfiguration();
-
-    SecureTestUtil.enableSecurity(conf);
-    VisibilityTestUtil.enableVisiblityLabels(conf);
-    SecureTestUtil.verifyConfiguration(conf);
-
-    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
-      AccessController.class.getName() + ',' + TokenProvider.class.getName());
-
-    HBaseKerberosUtils.setSecuredConfiguration(conf, principal + '@' + 
kdc.getRealm(),
-      HTTP_PRINCIPAL + '@' + kdc.getRealm());
-
-    KerberosName.resetDefaultRealm();
-
-    util.startMiniCluster();
-    try {
-      util.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
-    } catch (Exception e) {
-      util.shutdownMiniCluster();
-      throw e;
-    }
-
-    return util::shutdownMiniCluster;
-  }
-
   @Test
   public void testInitCredentialsForCluster1() throws Exception {
     HBaseTestingUtility util1 = new HBaseTestingUtility();
@@ -199,8 +164,9 @@ public class TestTableMapReduceUtil {
     kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
     loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), 
keytab.getAbsolutePath());
 
-    try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, 
userPrincipal);
-      Closeable util2Closeable = startSecureMiniCluster(util2, kdc, 
userPrincipal)) {
+    try (
+      Closeable util1Closeable = util1.startSecureMiniCluster(kdc, 
userPrincipal, HTTP_PRINCIPAL);
+      Closeable util2Closeable = util2.startSecureMiniCluster(kdc, 
userPrincipal, HTTP_PRINCIPAL)) {
       try {
         Configuration conf1 = util1.getConfiguration();
         Job job = Job.getInstance(conf1);
@@ -233,7 +199,8 @@ public class TestTableMapReduceUtil {
     kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
     loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), 
keytab.getAbsolutePath());
 
-    try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, 
userPrincipal)) {
+    try (
+      Closeable util1Closeable = util1.startSecureMiniCluster(kdc, 
userPrincipal, HTTP_PRINCIPAL)) {
       try {
         HBaseTestingUtility util2 = new HBaseTestingUtility();
         // Assume util2 is insecure cluster
@@ -269,7 +236,8 @@ public class TestTableMapReduceUtil {
     kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
     loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), 
keytab.getAbsolutePath());
 
-    try (Closeable util2Closeable = startSecureMiniCluster(util2, kdc, 
userPrincipal)) {
+    try (
+      Closeable util2Closeable = util2.startSecureMiniCluster(kdc, 
userPrincipal, HTTP_PRINCIPAL)) {
       try {
         Configuration conf1 = util1.getConfiguration();
         Job job = Job.getInstance(conf1);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 01a9c4f06fb..3db73e1ad5f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import edu.umd.cs.findbugs.annotations.Nullable;
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -89,6 +90,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -121,7 +123,12 @@ import 
org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.PermissionStorage;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
 import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
+import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -152,6 +159,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooKeeper;
@@ -394,6 +402,42 @@ public class HBaseTestingUtility extends 
HBaseZKTestingUtility {
     r.getWAL().close();
   }
 
+  /**
+   * Start mini secure cluster with given kdc and principals.
+   * @param kdc              Mini kdc server
+   * @param servicePrincipal Service principal without realm.
+   * @param spnegoPrincipal  Spnego principal without realm.
+   * @return Handler to shutdown the cluster
+   */
+  public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal,
+    String spnegoPrincipal) throws Exception {
+    Configuration conf = getConfiguration();
+
+    SecureTestUtil.enableSecurity(conf);
+    VisibilityTestUtil.enableVisiblityLabels(conf);
+    SecureTestUtil.verifyConfiguration(conf);
+
+    // Reset the static default realm forcibly for hadoop-2.0.
+    // It has no impact but not required for hadoop-3.0.
+    KerberosName.resetDefaultRealm();
+
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+      AccessController.class.getName() + ',' + TokenProvider.class.getName());
+
+    HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + 
kdc.getRealm(),
+      spnegoPrincipal + '@' + kdc.getRealm());
+
+    startMiniCluster();
+    try {
+      waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
+    } catch (Exception e) {
+      shutdownMiniCluster();
+      throw e;
+    }
+
+    return this::shutdownMiniCluster;
+  }
+
   /**
    * Returns this classes's instance of {@link Configuration}. Be careful how 
you use the returned
    * Configuration since {@link Connection} instances can be shared. The Map 
of Connections is keyed

Reply via email to