This is an automated email from the ASF dual-hosted git repository. ferhui pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 2c963570517 HDFS-15079. RBF: Namenode needs to use the actual client Id and callId when going through RBF proxy. (#4530) 2c963570517 is described below commit 2c96357051718a519af7efbe748269977351fa89 Author: xuzq <15040255...@163.com> AuthorDate: Sat Jul 23 22:19:37 2022 +0800 HDFS-15079. RBF: Namenode needs to use the actual client Id and callId when going through RBF proxy. (#4530) --- .../hadoop/io/retry/RetryInvocationHandler.java | 6 +- .../java/org/apache/hadoop/ipc/CallerContext.java | 2 + .../java/org/apache/hadoop/ipc/RetryCache.java | 50 ++++--- .../java/org/apache/hadoop/ipc/TestRetryCache.java | 23 ++-- .../server/federation/router/RouterRpcClient.java | 14 +- .../federation/router/TestRouterRetryCache.java | 144 +++++++++++++++++++++ .../server/federation/router/TestRouterRpc.java | 37 ++++++ .../hadoop/hdfs/server/namenode/FSEditLog.java | 13 +- .../hadoop/hdfs/server/namenode/NameNode.java | 91 +++++++++++++ .../hdfs/server/namenode/NameNodeRpcServer.java | 123 ++++++++---------- 10 files changed, 395 insertions(+), 108 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index 3960b189665..9707ee388e1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -46,6 +46,10 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler { public static final Logger LOG = LoggerFactory.getLogger( RetryInvocationHandler.class); + @VisibleForTesting + public static final ThreadLocal<Boolean> SET_CALL_ID_FOR_TEST = + ThreadLocal.withInitial(() -> true); + static class Call { private final Method method; private final Object[] args; @@ -159,7 +163,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler { } Object invokeMethod() throws Throwable { - if (isRpc) { + if (isRpc && SET_CALL_ID_FOR_TEST.get()) { Client.setCallIdAndRetryCount(callId, counters.retries, retryInvocationHandler.asyncCallHandler); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java index dbd9184a2b9..98d7e82c70e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java @@ -47,6 +47,8 @@ public final class CallerContext { // field names public static final String CLIENT_IP_STR = "clientIp"; public static final String CLIENT_PORT_STR = "clientPort"; + public static final String CLIENT_ID_STR = "clientId"; + public static final String CLIENT_CALL_ID_STR = "clientCallId"; /** The caller context. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java index 3d64a84bfb4..624cc08ac25 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java @@ -55,14 +55,14 @@ public class RetryCache { /** * Processing state of the requests. */ - private static byte INPROGRESS = 0; - private static byte SUCCESS = 1; - private static byte FAILED = 2; + private static final byte INPROGRESS = 0; + private static final byte SUCCESS = 1; + private static final byte FAILED = 2; private byte state = INPROGRESS; // Store uuid as two long for better memory utilization - private final long clientIdMsb; // Most signficant bytes + private final long clientIdMsb; // Most significant bytes private final long clientIdLsb; // Least significant bytes private final int callId; @@ -140,8 +140,8 @@ public class RetryCache { @Override public String toString() { - return (new UUID(this.clientIdMsb, this.clientIdLsb)).toString() + ":" - + this.callId + ":" + this.state; + return String.format("%s:%s:%s", new UUID(this.clientIdMsb, this.clientIdLsb), + this.callId, this.state); } } @@ -183,7 +183,7 @@ public class RetryCache { private final LightWeightGSet<CacheEntry, CacheEntry> set; private final long expirationTime; - private String cacheName; + private final String cacheName; private final ReentrantLock lock = new ReentrantLock(); @@ -195,7 +195,7 @@ public class RetryCache { */ public RetryCache(String cacheName, double percentage, long expirationTime) { int capacity = LightWeightGSet.computeCapacity(percentage, cacheName); - capacity = capacity > MAX_CAPACITY ? capacity : MAX_CAPACITY; + capacity = Math.max(capacity, MAX_CAPACITY); this.set = new LightWeightCache<CacheEntry, CacheEntry>(capacity, capacity, expirationTime, 0); this.expirationTime = expirationTime; @@ -203,11 +203,11 @@ public class RetryCache { this.retryCacheMetrics = RetryCacheMetrics.create(this); } - private static boolean skipRetryCache() { + private static boolean skipRetryCache(byte[] clientId, int callId) { // Do not track non RPC invocation or RPC requests with // invalid callId or clientId in retry cache - return !Server.isRpcInvocation() || Server.getCallId() < 0 - || Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID); + return !Server.isRpcInvocation() || callId < 0 + || Arrays.equals(clientId, RpcConstants.DUMMY_CLIENT_ID); } public void lock() { @@ -332,43 +332,51 @@ public class RetryCache { retryCacheMetrics.incrCacheUpdated(); } - private static CacheEntry newEntry(long expirationTime) { - return new CacheEntry(Server.getClientId(), Server.getCallId(), + private static CacheEntry newEntry(long expirationTime, + byte[] clientId, int callId) { + return new CacheEntry(clientId, callId, System.nanoTime() + expirationTime); } private static CacheEntryWithPayload newEntry(Object payload, - long expirationTime) { - return new CacheEntryWithPayload(Server.getClientId(), Server.getCallId(), + long expirationTime, byte[] clientId, int callId) { + return new CacheEntryWithPayload(clientId, callId, payload, System.nanoTime() + expirationTime); } /** * Static method that provides null check for retryCache. * @param cache input Cache. + * @param clientId client id of this request + * @param callId client call id of this request * @return CacheEntry. */ - public static CacheEntry waitForCompletion(RetryCache cache) { - if (skipRetryCache()) { + public static CacheEntry waitForCompletion(RetryCache cache, + byte[] clientId, int callId) { + if (skipRetryCache(clientId, callId)) { return null; } return cache != null ? cache - .waitForCompletion(newEntry(cache.expirationTime)) : null; + .waitForCompletion(newEntry(cache.expirationTime, + clientId, callId)) : null; } /** * Static method that provides null check for retryCache. * @param cache input cache. * @param payload input payload. + * @param clientId client id of this request + * @param callId client call id of this request * @return CacheEntryWithPayload. */ public static CacheEntryWithPayload waitForCompletion(RetryCache cache, - Object payload) { - if (skipRetryCache()) { + Object payload, byte[] clientId, int callId) { + if (skipRetryCache(clientId, callId)) { return null; } return (CacheEntryWithPayload) (cache != null ? cache - .waitForCompletion(newEntry(payload, cache.expirationTime)) : null); + .waitForCompletion(newEntry(payload, cache.expirationTime, + clientId, callId)) : null); } public static void setState(CacheEntry e, boolean success) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java index 64607deb908..b789ada5271 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java @@ -50,14 +50,14 @@ public class TestRetryCache { static class TestServer { AtomicInteger retryCount = new AtomicInteger(); AtomicInteger operationCount = new AtomicInteger(); - private RetryCache retryCache = new RetryCache("TestRetryCache", 1, - 100 * 1000 * 1000 * 1000L); + private final RetryCache retryCache = new RetryCache( + "TestRetryCache", 1, 100 * 1000 * 1000 * 1000L); /** * A server method implemented using {@link RetryCache}. * * @param input is returned back in echo, if {@code success} is true. - * @param failureOuput returned on failure, if {@code success} is false. + * @param failureOutput returned on failure, if {@code success} is false. * @param methodTime time taken by the operation. By passing smaller/larger * value one can simulate an operation that takes short/long time. * @param success whether this operation completes successfully or not @@ -67,7 +67,7 @@ public class TestRetryCache { int echo(int input, int failureOutput, long methodTime, boolean success) throws InterruptedException { CacheEntryWithPayload entry = RetryCache.waitForCompletion(retryCache, - null); + null, Server.getClientId(), Server.getCallId()); if (entry != null && entry.isSuccess()) { System.out.println("retryCount incremented " + retryCount.get()); retryCount.incrementAndGet(); @@ -173,16 +173,13 @@ public class TestRetryCache { final int failureOutput = input + 1; ExecutorService executorService = Executors .newFixedThreadPool(numberOfThreads); - List<Future<Integer>> list = new ArrayList<Future<Integer>>(); + List<Future<Integer>> list = new ArrayList<>(); for (int i = 0; i < numberOfThreads; i++) { - Callable<Integer> worker = new Callable<Integer>() { - @Override - public Integer call() throws Exception { - Server.getCurCall().set(call); - Assert.assertEquals(Server.getCurCall().get(), call); - int randomPause = pause == 0 ? pause : r.nextInt(pause); - return testServer.echo(input, failureOutput, randomPause, success); - } + Callable<Integer> worker = () -> { + Server.getCurCall().set(call); + Assert.assertEquals(Server.getCurCall().get(), call); + int randomPause = pause == 0 ? pause : r.nextInt(pause); + return testServer.echo(input, failureOutput, randomPause, success); }; Future<Integer> submit = executorService.submit(worker); list.add(submit); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index ff90854ebb7..e90cc5fda41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -80,6 +80,7 @@ import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -464,7 +465,7 @@ public class RouterRpcClient { + router.getRouterId()); } - addClientIpToCallerContext(); + addClientInfoToCallerContext(); Object ret = null; if (rpcMonitor != null) { @@ -584,12 +585,13 @@ public class RouterRpcClient { } /** - * For tracking which is the actual client address. - * It adds trace info "clientIp:ip" and "clientPort:port" + * For tracking some information about the actual client. + * It adds trace info "clientIp:ip", "clientPort:port", + * "clientId:id" and "clientCallId:callId" * in the caller context, removing the old values if they were * already present. */ - private void addClientIpToCallerContext() { + private void addClientInfoToCallerContext() { CallerContext ctx = CallerContext.getCurrent(); String origContext = ctx == null ? null : ctx.getContext(); byte[] origSignature = ctx == null ? null : ctx.getSignature(); @@ -598,6 +600,10 @@ public class RouterRpcClient { .append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress()) .append(CallerContext.CLIENT_PORT_STR, Integer.toString(Server.getRemotePort())) + .append(CallerContext.CLIENT_ID_STR, + StringUtils.byteToHexString(Server.getClientId())) + .append(CallerContext.CLIENT_CALL_ID_STR, + Integer.toString(Server.getCallId())) .setSignature(origSignature); // Append the original caller context if (origContext != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRetryCache.java new file mode 100644 index 00000000000..46a23549797 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRetryCache.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.io.retry.RetryInvocationHandler; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestRouterRetryCache { + /** Federated HDFS cluster. */ + private MiniRouterDFSCluster cluster; + + @Before + public void setup() throws Exception { + Configuration namenodeConf = new Configuration(); + namenodeConf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe"); + cluster = new MiniRouterDFSCluster(true, 1); + cluster.addNamenodeOverrides(namenodeConf); + + // Start NNs and DNs and wait until ready + cluster.startCluster(); + + // Start routers with only an RPC service + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + + // Setup the mount table + cluster.installMockLocations(); + + // Making one Namenodes active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + } + } + cluster.waitActiveNamespaces(); + } + + @After + public void teardown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testRetryCache() throws Exception { + RetryInvocationHandler.SET_CALL_ID_FOR_TEST.set(false); + FileSystem routerFS = cluster.getRandomRouter().getFileSystem(); + Path testDir = new Path("/target-ns0/testdir"); + routerFS.mkdirs(testDir); + routerFS.setPermission(testDir, FsPermission.getDefault()); + + // Run as fake joe to authorize the test + UserGroupInformation joe = + UserGroupInformation.createUserForTesting("fake_joe", + new String[]{"fake_group"}); + FileSystem joeFS = joe.doAs( + (PrivilegedExceptionAction<FileSystem>) () -> + FileSystem.newInstance(routerFS.getUri(), routerFS.getConf())); + + Path renameSrc = new Path(testDir, "renameSrc"); + Path renameDst = new Path(testDir, "renameDst"); + joeFS.mkdirs(renameSrc); + + assertEquals(HAServiceProtocol.HAServiceState.ACTIVE, + cluster.getCluster().getNamesystem(0).getState()); + + int callId = Client.nextCallId(); + Client.setCallIdAndRetryCount(callId, 0, null); + assertTrue(joeFS.rename(renameSrc, renameDst)); + + Client.setCallIdAndRetryCount(callId, 0, null); + assertTrue(joeFS.rename(renameSrc, renameDst)); + + String ns0 = cluster.getNameservices().get(0); + cluster.switchToStandby(ns0, NAMENODES[0]); + cluster.switchToActive(ns0, NAMENODES[1]); + + assertEquals(HAServiceProtocol.HAServiceState.ACTIVE, + cluster.getCluster().getNamesystem(1).getState()); + + Client.setCallIdAndRetryCount(callId, 0, null); + assertTrue(joeFS.rename(renameSrc, renameDst)); + } + + @Test + public void testParseSpecialValue() { + String mockContent = "mockContent,clientIp:127.0.0.1," + + "clientCallId:12345,clientId:mockClientId"; + String clientIp = NameNode.parseSpecialValue(mockContent, "clientIp:"); + assertEquals("127.0.0.1", clientIp); + + String clientCallId = NameNode.parseSpecialValue( + mockContent, "clientCallId:"); + assertEquals("12345", clientCallId); + + String clientId = NameNode.parseSpecialValue(mockContent, "clientId:"); + assertEquals("mockClientId", clientId); + + String clientRetryNum = NameNode.parseSpecialValue( + mockContent, "clientRetryNum:"); + assertNull(clientRetryNum); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index f71145a4522..31cc18fc882 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -2053,6 +2053,8 @@ public class TestRouterRpc { final String logOutput = auditlog.getOutput(); assertTrue(logOutput.contains("callerContext=clientIp:")); assertTrue(logOutput.contains(",clientContext")); + assertTrue(logOutput.contains(",clientId")); + assertTrue(logOutput.contains(",clientCallId")); assertTrue(verifyFileExists(routerFS, dirPath)); } @@ -2103,6 +2105,41 @@ public class TestRouterRpc { assertFalse(auditLog.getOutput().contains("clientPort:1234")); } + @Test + public void testAddClientIdAndCallIdToCallerContext() throws IOException { + GenericTestUtils.LogCapturer auditLog = + GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog); + + // 1. ClientId and ClientCallId are not set on the client. + // Set client context. + CallerContext.setCurrent( + new CallerContext.Builder("clientContext").build()); + + // Create a directory via the router. + String dirPath = "/test"; + routerProtocol.mkdirs(dirPath, new FsPermission("755"), false); + + // The audit log should contains "clientId:" and "clientCallId:". + assertTrue(auditLog.getOutput().contains("clientId:")); + assertTrue(auditLog.getOutput().contains("clientCallId:")); + assertTrue(verifyFileExists(routerFS, dirPath)); + auditLog.clearOutput(); + + // 2. ClientId and ClientCallId are set on the client. + // Reset client context. + CallerContext.setCurrent( + new CallerContext.Builder( + "clientContext,clientId:mockClientId,clientCallId:4321").build()); + + // Create a directory via the router. + routerProtocol.getFileInfo(dirPath); + + // The audit log should not contain the original clientId and clientCallId + // set by client. + assertFalse(auditLog.getOutput().contains("clientId:mockClientId")); + assertFalse(auditLog.getOutput().contains("clientCallId:4321")); + } + @Test public void testContentSummaryWithSnapshot() throws Exception { DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 850b2fc5708..5bb6872e588 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS; import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.Time.monotonicNow; @@ -30,6 +31,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.LongAdder; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -107,7 +109,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.Lists; @@ -195,6 +196,9 @@ public class FSEditLog implements LogsPurgeable { protected final OpInstanceCache cache = new OpInstanceCache(); + // Users who can override the client ip + private final String[] ipProxyUsers; + /** * The edit directories that are shared between primary and secondary. */ @@ -246,6 +250,7 @@ public class FSEditLog implements LogsPurgeable { * @param editsDirs List of journals to use */ FSEditLog(Configuration conf, NNStorage storage, List<URI> editsDirs) { + ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS); isSyncRunning = false; this.conf = conf; this.storage = storage; @@ -799,8 +804,10 @@ public class FSEditLog implements LogsPurgeable { /** Record the RPC IDs if necessary */ private void logRpcIds(FSEditLogOp op, boolean toLogRpcIds) { if (toLogRpcIds) { - op.setRpcClientId(Server.getClientId()); - op.setRpcCallId(Server.getCallId()); + Pair<byte[], Integer> clientIdAndCallId = + NameNode.getClientIdAndCallId(this.ipProxyUsers); + op.setRpcClientId(clientIdAndCallId.getLeft()); + op.setRpcCallId(clientIdAndCallId.getRight()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 0610048ccac..63c7721b749 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -17,7 +17,10 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.apache.hadoop.util.Preconditions; @@ -494,6 +497,94 @@ public class NameNode extends ReconfigurableBase implements return metrics; } + /** + * Try to obtain the actual client info according to the current user. + * @param ipProxyUsers Users who can override client infos + */ + private static String clientInfoFromContext( + final String[] ipProxyUsers) { + if (ipProxyUsers != null) { + UserGroupInformation user = + UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser()); + if (user != null && + ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) { + CallerContext context = CallerContext.getCurrent(); + if (context != null && context.isContextValid()) { + return context.getContext(); + } + } + } + return null; + } + + /** + * Try to obtain the value corresponding to the key by parsing the content. + * @param content the full content to be parsed. + * @param key trying to obtain the value of the key. + * @return the value corresponding to the key. + */ + @VisibleForTesting + public static String parseSpecialValue(String content, String key) { + int posn = content.indexOf(key); + if (posn != -1) { + posn += key.length(); + int end = content.indexOf(",", posn); + return end == -1 ? content.substring(posn) + : content.substring(posn, end); + } + return null; + } + + /** + * Try to obtain the actual client's machine according to the current user. + * @param ipProxyUsers Users who can override client infos. + * @return The actual client's machine. + */ + public static String getClientMachine(final String[] ipProxyUsers) { + String cc = clientInfoFromContext(ipProxyUsers); + if (cc != null) { + // if the rpc has a caller context of "clientIp:1.2.3.4,CLI", + // return "1.2.3.4" as the client machine. + String key = CallerContext.CLIENT_IP_STR + + CallerContext.Builder.KEY_VALUE_SEPARATOR; + return parseSpecialValue(cc, key); + } + + String clientMachine = Server.getRemoteAddress(); + if (clientMachine == null) { //not a RPC client + clientMachine = ""; + } + return clientMachine; + } + + /** + * Try to obtain the actual client's id and call id + * according to the current user. + * @param ipProxyUsers Users who can override client infos + * @return The actual client's id and call id. + */ + public static Pair<byte[], Integer> getClientIdAndCallId( + final String[] ipProxyUsers) { + byte[] clientId = Server.getClientId(); + int callId = Server.getCallId(); + String cc = clientInfoFromContext(ipProxyUsers); + if (cc != null) { + String clientIdKey = CallerContext.CLIENT_ID_STR + + CallerContext.Builder.KEY_VALUE_SEPARATOR; + String clientIdStr = parseSpecialValue(cc, clientIdKey); + if (clientIdStr != null) { + clientId = StringUtils.hexStringToByte(clientIdStr); + } + String callIdKey = CallerContext.CLIENT_CALL_ID_STR + + CallerContext.Builder.KEY_VALUE_SEPARATOR; + String callIdStr = parseSpecialValue(cc, callIdKey); + if (callIdStr != null) { + callId = Integer.parseInt(callIdStr); + } + } + return Pair.of(clientId, callId); + } + /** * Returns object used for reporting namenode startup progress. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 1d50bc5cb53..b64530337ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -46,8 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.hadoop.ipc.CallerContext; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; @@ -271,7 +270,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { private final String defaultECPolicyName; - // Users who can override the client ip + // Users who can override the client info private final String[] ipProxyUsers; public NameNodeRpcServer(Configuration conf, NameNode nn) @@ -711,8 +710,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { if(!nn.isRole(NamenodeRole.NAMENODE)) throw new IOException("Only an ACTIVE node can invoke startCheckpoint."); - CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, - null); + CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (NamenodeCommand) cacheEntry.getPayload(); } @@ -725,13 +723,33 @@ public class NameNodeRpcServer implements NamenodeProtocols { return ret; } + /** + * Return the current CacheEntry. + */ + private CacheEntry getCacheEntry() { + Pair<byte[], Integer> clientInfo = + NameNode.getClientIdAndCallId(this.ipProxyUsers); + return RetryCache.waitForCompletion( + retryCache, clientInfo.getLeft(), clientInfo.getRight()); + } + + /** + * Return the current CacheEntryWithPayload. + */ + private CacheEntryWithPayload getCacheEntryWithPayload(Object payload) { + Pair<byte[], Integer> clientInfo = + NameNode.getClientIdAndCallId(this.ipProxyUsers); + return RetryCache.waitForCompletion(retryCache, payload, + clientInfo.getLeft(), clientInfo.getRight()); + } + @Override // NamenodeProtocol public void endCheckpoint(NamenodeRegistration registration, CheckpointSignature sig) throws IOException { String operationName = "endCheckpoint"; checkNNStartup(); namesystem.checkSuperuserPrivilege(operationName); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } @@ -801,7 +819,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } namesystem.checkOperation(OperationCategory.WRITE); - CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); + CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (HdfsFileStatus) cacheEntry.getPayload(); } @@ -832,8 +850,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { +src+" for "+clientName+" at "+clientMachine); } namesystem.checkOperation(OperationCategory.WRITE); - CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, - null); + CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (LastBlockWithStatus) cacheEntry.getPayload(); } @@ -999,7 +1016,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } @@ -1044,7 +1061,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return true; // Return previous response } @@ -1067,7 +1084,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { stateChangeLog.debug("*DIR* NameNode.concat: src path {} to" + " target path {}", Arrays.toString(src), trg); namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } @@ -1093,7 +1110,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } @@ -1130,7 +1147,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { + ", recursive=" + recursive); } namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return true; // Return previous response } @@ -1315,7 +1332,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public boolean saveNamespace(long timeWindow, long txGap) throws IOException { checkNNStartup(); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return true; // Return previous response } @@ -1503,7 +1520,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { public void satisfyStoragePolicy(String src) throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } @@ -1550,7 +1567,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { boolean createParent) throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } @@ -1920,34 +1937,11 @@ public class NameNodeRpcServer implements NamenodeProtocols { } } + /** + * Get the actual client's machine. + */ private String getClientMachine() { - if (ipProxyUsers != null) { - // Get the real user (or effective if it isn't a proxy user) - UserGroupInformation user = - UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser()); - if (user != null && - ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) { - CallerContext context = CallerContext.getCurrent(); - if (context != null && context.isContextValid()) { - String cc = context.getContext(); - // if the rpc has a caller context of "clientIp:1.2.3.4,CLI", - // return "1.2.3.4" as the client machine. - String key = CallerContext.CLIENT_IP_STR + - CallerContext.Builder.KEY_VALUE_SEPARATOR; - int posn = cc.indexOf(key); - if (posn != -1) { - posn += key.length(); - int end = cc.indexOf(",", posn); - return end == -1 ? cc.substring(posn) : cc.substring(posn, end); - } - } - } - } - String clientMachine = Server.getRemoteAddress(); - if (clientMachine == null) { //not a RPC client - clientMachine = ""; - } - return clientMachine; + return NameNode.getClientMachine(this.ipProxyUsers); } @Override @@ -1967,8 +1961,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } namesystem.checkOperation(OperationCategory.WRITE); - CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, - null); + CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (String) cacheEntry.getPayload(); } @@ -1995,7 +1988,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { } namesystem.checkOperation(OperationCategory.WRITE); metrics.incrDeleteSnapshotOps(); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } @@ -2037,7 +2030,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { } namesystem.checkOperation(OperationCategory.WRITE); metrics.incrRenameSnapshotOps(); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } @@ -2098,8 +2091,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.WRITE); - CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion - (retryCache, null); + CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (Long) cacheEntry.getPayload(); } @@ -2120,7 +2112,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; } @@ -2138,7 +2130,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { public void removeCacheDirective(long id) throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; } @@ -2165,7 +2157,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { public void addCachePool(CachePoolInfo info) throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } @@ -2182,7 +2174,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { public void modifyCachePool(CachePoolInfo info) throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } @@ -2199,7 +2191,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { public void removeCachePool(String cachePoolName) throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; } @@ -2262,7 +2254,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.WRITE); - final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + final CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; } @@ -2294,7 +2286,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { final ReencryptAction action) throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.WRITE); - final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + final CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; } @@ -2318,7 +2310,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { public void setErasureCodingPolicy(String src, String ecPolicyName) throws IOException { checkNNStartup(); - final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + final CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; } @@ -2342,7 +2334,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } @@ -2372,7 +2364,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { public void removeXAttr(String src, XAttr xAttr) throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.WRITE); - CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } @@ -2555,7 +2547,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public void unsetErasureCodingPolicy(String src) throws IOException { checkNNStartup(); - final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + final CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; } @@ -2581,8 +2573,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { String operationName = "addErasureCodingPolicies"; checkNNStartup(); namesystem.checkSuperuserPrivilege(operationName); - final CacheEntryWithPayload cacheEntry = - RetryCache.waitForCompletion(retryCache, null); + final CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (AddErasureCodingPolicyResponse[]) cacheEntry.getPayload(); } @@ -2605,7 +2596,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { String operationName = "removeErasureCodingPolicy"; checkNNStartup(); namesystem.checkSuperuserPrivilege(operationName); - final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + final CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; } @@ -2624,7 +2615,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { String operationName = "enableErasureCodingPolicy"; checkNNStartup(); namesystem.checkSuperuserPrivilege(operationName); - final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + final CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; } @@ -2643,7 +2634,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { String operationName = "disableErasureCodingPolicy"; checkNNStartup(); namesystem.checkSuperuserPrivilege(operationName); - final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + final CacheEntry cacheEntry = getCacheEntry(); if (cacheEntry != null && cacheEntry.isSuccess()) { return; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org