This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit e55413419ffee4b88affc957c99aa37a34e41801 Author: Duo Zhang <[email protected]> AuthorDate: Tue Nov 4 14:55:12 2025 +0800 Reapply "HBASE-29473 Obtain target cluster's token for cross clusters job (#7198)" This reverts commit 3bed95feb7d218fbca506f192b3271ab7f663aed. --- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 4 +- .../TestHFileOutputFormat2WithSecurity.java | 132 +++++++++++++++++++++ .../hbase/mapreduce/TestTableMapReduceUtil.java | 46 ++----- .../apache/hadoop/hbase/HBaseTestingUtility.java | 44 +++++++ 4 files changed, 186 insertions(+), 40 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 15a3a3ddeb2..2906238edc7 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -811,7 +811,7 @@ public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY */ - public static void configureRemoteCluster(Job job, Configuration clusterConf) { + public static void configureRemoteCluster(Job job, Configuration clusterConf) throws IOException { Configuration conf = job.getConfiguration(); if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { @@ -828,6 +828,8 @@ public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort); conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent); + TableMapReduceUtil.initCredentialsForCluster(job, clusterConf); + LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort + "/" + parent); } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2WithSecurity.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2WithSecurity.java new file mode 100644 index 00000000000..b4cb6a8355f --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2WithSecurity.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.Closeable; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests for {@link HFileOutputFormat2} with secure mode. + */ +@Category({ VerySlowMapReduceTests.class, LargeTests.class }) +public class TestHFileOutputFormat2WithSecurity { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHFileOutputFormat2WithSecurity.class); + + private static final byte[] FAMILIES = Bytes.toBytes("test_cf"); + + private static final String HTTP_PRINCIPAL = "HTTP/localhost"; + + private HBaseTestingUtility utilA; + + private Configuration confA; + + private HBaseTestingUtility utilB; + + private MiniKdc kdc; + + private List<Closeable> clusters = new ArrayList<>(); + + @Before + public void setupSecurityClusters() throws Exception { + utilA = new HBaseTestingUtility(); + confA = utilA.getConfiguration(); + + utilB = new HBaseTestingUtility(); + + // Prepare security configs. + File keytab = new File(utilA.getDataTestDir("keytab").toUri().getPath()); + kdc = utilA.setupMiniKdc(keytab); + String username = UserGroupInformation.getLoginUser().getShortUserName(); + String userPrincipal = username + "/localhost"; + kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); + loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); + + // Start security clusterA + clusters.add(utilA.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)); + + // Start security clusterB + clusters.add(utilB.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)); + } + + @After + public void teardownSecurityClusters() { + IOUtils.closeQuietly(clusters); + clusters.clear(); + if (kdc != null) { + kdc.stop(); + } + } + + @Test + public void testIncrementalLoadInMultiClusterWithSecurity() throws Exception { + TableName tableName = TableName.valueOf("testIncrementalLoadInMultiClusterWithSecurity"); + + // Create table in clusterB + try (Table table = utilB.createTable(tableName, FAMILIES); + RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) { + + // Create job in clusterA + Job job = Job.getInstance(confA, "testIncrementalLoadInMultiClusterWithSecurity"); + job.setWorkingDirectory( + utilA.getDataTestDirOnTestFS("testIncrementalLoadInMultiClusterWithSecurity")); + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(TestHFileOutputFormat2.RandomKVGeneratingMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + HFileOutputFormat2.configureIncrementalLoad(job, table, r); + + assertEquals(2, job.getCredentials().getAllTokens().size()); + + String remoteClusterId = utilB.getHBaseClusterInterface().getClusterMetrics().getClusterId(); + assertTrue(job.getCredentials().getToken(new Text(remoteClusterId)) != null); + } finally { + if (utilB.getAdmin().tableExists(tableName)) { + utilB.deleteTable(tableName); + } + } + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java index 3b7392b3ae4..f661025ac06 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java @@ -29,15 +29,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.security.HBaseKerberosUtils; -import org.apache.hadoop.hbase.security.access.AccessController; -import org.apache.hadoop.hbase.security.access.PermissionStorage; -import org.apache.hadoop.hbase.security.access.SecureTestUtil; import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders; import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; -import org.apache.hadoop.hbase.security.token.TokenProvider; -import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -48,7 +41,6 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.junit.After; @@ -134,33 +126,6 @@ public class TestTableMapReduceUtil { assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); } - private static Closeable startSecureMiniCluster(HBaseTestingUtility util, MiniKdc kdc, - String principal) throws Exception { - Configuration conf = util.getConfiguration(); - - SecureTestUtil.enableSecurity(conf); - VisibilityTestUtil.enableVisiblityLabels(conf); - SecureTestUtil.verifyConfiguration(conf); - - conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - AccessController.class.getName() + ',' + TokenProvider.class.getName()); - - HBaseKerberosUtils.setSecuredConfiguration(conf, principal + '@' + kdc.getRealm(), - HTTP_PRINCIPAL + '@' + kdc.getRealm()); - - KerberosName.resetDefaultRealm(); - - util.startMiniCluster(); - try { - util.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME); - } catch (Exception e) { - util.shutdownMiniCluster(); - throw e; - } - - return util::shutdownMiniCluster; - } - @Test public void testInitCredentialsForCluster1() throws Exception { HBaseTestingUtility util1 = new HBaseTestingUtility(); @@ -199,8 +164,9 @@ public class TestTableMapReduceUtil { kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); - try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal); - Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) { + try ( + Closeable util1Closeable = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL); + Closeable util2Closeable = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { try { Configuration conf1 = util1.getConfiguration(); Job job = Job.getInstance(conf1); @@ -233,7 +199,8 @@ public class TestTableMapReduceUtil { kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); - try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal)) { + try ( + Closeable util1Closeable = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { try { HBaseTestingUtility util2 = new HBaseTestingUtility(); // Assume util2 is insecure cluster @@ -269,7 +236,8 @@ public class TestTableMapReduceUtil { kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); - try (Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) { + try ( + Closeable util2Closeable = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { try { Configuration conf1 = util1.getConfiguration(); Job job = Job.getInstance(conf1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 7cb985aee96..48a7615b2a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import edu.umd.cs.findbugs.annotations.Nullable; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.OutputStream; @@ -88,6 +89,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; @@ -122,7 +124,12 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.security.HBaseKerberosUtils; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.access.AccessController; +import org.apache.hadoop.hbase.security.access.PermissionStorage; +import org.apache.hadoop.hbase.security.access.SecureTestUtil; +import org.apache.hadoop.hbase.security.token.TokenProvider; import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; +import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -151,6 +158,7 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooKeeper; @@ -393,6 +401,42 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { r.getWAL().close(); } + /** + * Start mini secure cluster with given kdc and principals. + * @param kdc Mini kdc server + * @param servicePrincipal Service principal without realm. + * @param spnegoPrincipal Spnego principal without realm. + * @return Handler to shutdown the cluster + */ + public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal, + String spnegoPrincipal) throws Exception { + Configuration conf = getConfiguration(); + + SecureTestUtil.enableSecurity(conf); + VisibilityTestUtil.enableVisiblityLabels(conf); + SecureTestUtil.verifyConfiguration(conf); + + // Reset the static default realm forcibly for hadoop-2.0. + // It has no impact but not required for hadoop-3.0. + KerberosName.resetDefaultRealm(); + + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + AccessController.class.getName() + ',' + TokenProvider.class.getName()); + + HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(), + spnegoPrincipal + '@' + kdc.getRealm()); + + startMiniCluster(); + try { + waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME); + } catch (Exception e) { + shutdownMiniCluster(); + throw e; + } + + return this::shutdownMiniCluster; + } + /** * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
