This is an automated email from the ASF dual-hosted git repository.
pankajkumar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 9130913dea3 HBASE-29473 Obtain target cluster's token for cross
clusters job (#7175)
9130913dea3 is described below
commit 9130913dea309e9fabfbaddeacc05c0fab1a2ca8
Author: mokai <[email protected]>
AuthorDate: Mon Aug 18 23:58:55 2025 +0800
HBASE-29473 Obtain target cluster's token for cross clusters job (#7175)
Signed-off-by: Nihal Jain <[email protected]>
Signed-off-by: Junegunn Choi <[email protected]>
Signed-off-by: Pankaj Kumar <[email protected]>
---
.../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 4 +-
.../TestHFileOutputFormat2WithSecurity.java | 131 +++++++++++++++++++++
.../hbase/mapreduce/TestTableMapReduceUtil.java | 44 +------
.../org/apache/hadoop/hbase/HBaseTestingUtil.java | 39 ++++++
4 files changed, 179 insertions(+), 39 deletions(-)
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index cc09904c826..be68d457596 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -753,7 +753,7 @@ public class HFileOutputFormat2 extends
FileOutputFormat<ImmutableBytesWritable,
* @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
* @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
*/
- public static void configureRemoteCluster(Job job, Configuration
clusterConf) {
+ public static void configureRemoteCluster(Job job, Configuration
clusterConf) throws IOException {
Configuration conf = job.getConfiguration();
if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY,
DEFAULT_LOCALITY_SENSITIVE)) {
@@ -770,6 +770,8 @@ public class HFileOutputFormat2 extends
FileOutputFormat<ImmutableBytesWritable,
conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
+ TableMapReduceUtil.initCredentialsForCluster(job, clusterConf);
+
LOG.info("ZK configs for remote cluster of bulkload is configured: " +
quorum + ":" + clientPort
+ "/" + parent);
}
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2WithSecurity.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2WithSecurity.java
new file mode 100644
index 00000000000..ac767f23775
--- /dev/null
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2WithSecurity.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static
org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests for {@link HFileOutputFormat2} with secure mode.
+ */
+@Category({ VerySlowMapReduceTests.class, LargeTests.class })
+public class TestHFileOutputFormat2WithSecurity extends
HFileOutputFormat2TestBase {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestHFileOutputFormat2WithSecurity.class);
+
+ private static final byte[] FAMILIES = Bytes.toBytes("test_cf");
+
+ private static final String HTTP_PRINCIPAL = "HTTP/localhost";
+
+ private HBaseTestingUtil utilA;
+
+ private Configuration confA;
+
+ private HBaseTestingUtil utilB;
+
+ private MiniKdc kdc;
+
+ private List<Closeable> clusters = new ArrayList<>();
+
+ @Before
+ public void setupSecurityClusters() throws Exception {
+ utilA = new HBaseTestingUtil();
+ confA = utilA.getConfiguration();
+
+ utilB = new HBaseTestingUtil();
+
+ // Prepare security configs.
+ File keytab = new File(utilA.getDataTestDir("keytab").toUri().getPath());
+ kdc = utilA.setupMiniKdc(keytab);
+ String username = UserGroupInformation.getLoginUser().getShortUserName();
+ String userPrincipal = username + "/localhost";
+ kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
+ loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(),
keytab.getAbsolutePath());
+
+ // Start security clusterA
+ clusters.add(utilA.startSecureMiniCluster(kdc, userPrincipal,
HTTP_PRINCIPAL));
+
+ // Start security clusterB
+ clusters.add(utilB.startSecureMiniCluster(kdc, userPrincipal,
HTTP_PRINCIPAL));
+ }
+
+ @After
+ public void teardownSecurityClusters() {
+ IOUtils.closeQuietly(clusters);
+ clusters.clear();
+ if (kdc != null) {
+ kdc.stop();
+ }
+ }
+
+ @Test
+ public void testIncrementalLoadInMultiClusterWithSecurity() throws Exception
{
+ TableName tableName =
TableName.valueOf("testIncrementalLoadInMultiClusterWithSecurity");
+
+ // Create table in clusterB
+ try (Table table = utilB.createTable(tableName, FAMILIES);
+ RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
+
+ // Create job in clusterA
+ Job job = Job.getInstance(confA,
"testIncrementalLoadInMultiClusterWithSecurity");
+ job.setWorkingDirectory(
+
utilA.getDataTestDirOnTestFS("testIncrementalLoadInMultiClusterWithSecurity"));
+ setupRandomGeneratorMapper(job, false);
+ HFileOutputFormat2.configureIncrementalLoad(job, table, r);
+
+ Map<Text, Token<? extends TokenIdentifier>> tokenMap =
job.getCredentials().getTokenMap();
+ assertEquals(2, tokenMap.size());
+
+ String remoteClusterId =
utilB.getHBaseClusterInterface().getClusterMetrics().getClusterId();
+ assertTrue(tokenMap.containsKey(new Text(remoteClusterId)));
+ } finally {
+ if (utilB.getAdmin().tableExists(tableName)) {
+ utilB.deleteTable(tableName);
+ }
+ }
+ }
+}
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
index 8c88d9bb4ee..17a94c99956 100644
---
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
@@ -30,15 +30,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
-import org.apache.hadoop.hbase.security.access.AccessController;
-import org.apache.hadoop.hbase.security.access.PermissionStorage;
-import org.apache.hadoop.hbase.security.access.SecureTestUtil;
import
org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
-import org.apache.hadoop.hbase.security.token.TokenProvider;
-import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -134,31 +127,6 @@ public class TestTableMapReduceUtil {
assertEquals("Table",
job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
}
- private static Closeable startSecureMiniCluster(HBaseTestingUtil util,
MiniKdc kdc,
- String principal) throws Exception {
- Configuration conf = util.getConfiguration();
-
- SecureTestUtil.enableSecurity(conf);
- VisibilityTestUtil.enableVisiblityLabels(conf);
- SecureTestUtil.verifyConfiguration(conf);
-
- conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
- AccessController.class.getName() + ',' + TokenProvider.class.getName());
-
- HBaseKerberosUtils.setSecuredConfiguration(conf, principal + '@' +
kdc.getRealm(),
- HTTP_PRINCIPAL + '@' + kdc.getRealm());
-
- util.startMiniCluster();
- try {
- util.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
- } catch (Exception e) {
- util.shutdownMiniCluster();
- throw e;
- }
-
- return util::shutdownMiniCluster;
- }
-
@Test
public void testInitCredentialsForCluster1() throws Exception {
HBaseTestingUtil util1 = new HBaseTestingUtil();
@@ -198,8 +166,8 @@ public class TestTableMapReduceUtil {
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(),
keytab.getAbsolutePath());
- try (Closeable ignored1 = startSecureMiniCluster(util1, kdc,
userPrincipal);
- Closeable ignored2 = startSecureMiniCluster(util2, kdc,
userPrincipal)) {
+ try (Closeable ignored1 = util1.startSecureMiniCluster(kdc,
userPrincipal, HTTP_PRINCIPAL);
+ Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal,
HTTP_PRINCIPAL)) {
Configuration conf1 = util1.getConfiguration();
Job job = Job.getInstance(conf1);
@@ -232,7 +200,7 @@ public class TestTableMapReduceUtil {
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(),
keytab.getAbsolutePath());
- try (Closeable ignored1 = startSecureMiniCluster(util1, kdc,
userPrincipal)) {
+ try (Closeable ignored1 = util1.startSecureMiniCluster(kdc,
userPrincipal, HTTP_PRINCIPAL)) {
HBaseTestingUtil util2 = new HBaseTestingUtil();
// Assume util2 is insecure cluster
// Do not start util2 because cannot boot secured mini cluster and
insecure mini cluster at
@@ -268,7 +236,7 @@ public class TestTableMapReduceUtil {
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(),
keytab.getAbsolutePath());
- try (Closeable ignored2 = startSecureMiniCluster(util2, kdc,
userPrincipal)) {
+ try (Closeable ignored2 = util2.startSecureMiniCluster(kdc,
userPrincipal, HTTP_PRINCIPAL)) {
Configuration conf1 = util1.getConfiguration();
Job job = Job.getInstance(conf1);
@@ -303,8 +271,8 @@ public class TestTableMapReduceUtil {
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(),
keytab.getAbsolutePath());
- try (Closeable ignored1 = startSecureMiniCluster(util1, kdc,
userPrincipal);
- Closeable ignored2 = startSecureMiniCluster(util2, kdc,
userPrincipal)) {
+ try (Closeable ignored1 = util1.startSecureMiniCluster(kdc,
userPrincipal, HTTP_PRINCIPAL);
+ Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal,
HTTP_PRINCIPAL)) {
Configuration conf1 = util1.getConfiguration();
Job job = Job.getInstance(conf1);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtil.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtil.java
index 6b659161d1c..93bde45a9d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtil.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import edu.umd.cs.findbugs.annotations.Nullable;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
@@ -88,6 +89,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -119,7 +121,12 @@ import
org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.PermissionStorage;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
+import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -345,6 +352,38 @@ public class HBaseTestingUtil extends HBaseZKTestingUtil {
r.getWAL().close();
}
+ /**
+ * Start mini secure cluster with given kdc and principals.
+ * @param kdc Mini kdc server
+ * @param servicePrincipal Service principal without realm.
+ * @param spnegoPrincipal Spnego principal without realm.
+ * @return Handler to shutdown the cluster
+ */
+ public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal,
+ String spnegoPrincipal) throws Exception {
+ Configuration conf = getConfiguration();
+
+ SecureTestUtil.enableSecurity(conf);
+ VisibilityTestUtil.enableVisiblityLabels(conf);
+ SecureTestUtil.verifyConfiguration(conf);
+
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ AccessController.class.getName() + ',' + TokenProvider.class.getName());
+
+ HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' +
kdc.getRealm(),
+ spnegoPrincipal + '@' + kdc.getRealm());
+
+ startMiniCluster();
+ try {
+ waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
+ } catch (Exception e) {
+ shutdownMiniCluster();
+ throw e;
+ }
+
+ return this::shutdownMiniCluster;
+ }
+
/**
* Returns this classes's instance of {@link Configuration}. Be careful how
you use the returned
* Configuration since {@link Connection} instances can be shared. The Map
of Connections is keyed