Revert "HDFS-10224. Implement asynchronous rename for DistributedFileSystem. Contributed by Xiaobing Zhou"
This reverts commit bdc45bef646cafdc04a59c19e23dcba3bb16b20c. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d740a902 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d740a902 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d740a902 Branch: refs/heads/branch-2.8 Commit: d740a90260d9d26a67688b004e5fcda60cc30723 Parents: 8a07026 Author: Andrew Wang <w...@apache.org> Authored: Fri Jun 3 18:14:02 2016 -0700 Committer: Andrew Wang <w...@apache.org> Committed: Fri Jun 3 18:17:33 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/fs/FileSystem.java | 1 + .../main/java/org/apache/hadoop/ipc/Client.java | 11 +- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 34 +-- .../org/apache/hadoop/ipc/TestAsyncIPC.java | 2 +- .../hadoop/hdfs/AsyncDistributedFileSystem.java | 110 -------- .../hadoop/hdfs/DistributedFileSystem.java | 22 +- .../ClientNamenodeProtocolTranslatorPB.java | 39 +-- .../apache/hadoop/hdfs/TestAsyncDFSRename.java | 258 ------------------- 8 files changed, 17 insertions(+), 460 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 6558d98..75d468c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -1247,6 +1247,7 @@ public abstract class FileSystem extends Configured implements Closeable { /** * Renames Path src to Path dst * <ul> + * <li * <li>Fails if src is a file and dst is a directory. * <li>Fails if src is a directory and dst is a file. * <li>Fails if the parent of dst does not exist or is a file. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 35e5f21..befab73 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -119,8 +119,7 @@ public class Client implements AutoCloseable { private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>(); private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>(); - private static final ThreadLocal<Future<?>> - RETURN_RPC_RESPONSE = new ThreadLocal<>(); + private static final ThreadLocal<Future<?>> returnValue = new ThreadLocal<>(); private static final ThreadLocal<Boolean> asynchronousMode = new ThreadLocal<Boolean>() { @Override @@ -131,8 +130,8 @@ public class Client implements AutoCloseable { @SuppressWarnings("unchecked") @Unstable - public static <T> Future<T> getReturnRpcResponse() { - return (Future<T>) RETURN_RPC_RESPONSE.get(); + public static <T> Future<T> getReturnValue() { + return (Future<T>) returnValue.get(); } /** Set call id and retry count for the next call. */ @@ -1398,7 +1397,7 @@ public class Client implements AutoCloseable { } }; - RETURN_RPC_RESPONSE.set(returnFuture); + returnValue.set(returnFuture); return null; } else { return getRpcResponse(call, connection); @@ -1412,7 +1411,7 @@ public class Client implements AutoCloseable { * synchronous mode. */ @Unstable - public static boolean isAsynchronousMode() { + static boolean isAsynchronousMode() { return asynchronousMode.get(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 350e041..88e2e2e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -26,9 +26,7 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.SocketFactory; @@ -37,7 +35,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputOutputStream; import org.apache.hadoop.io.Writable; @@ -70,9 +67,7 @@ import com.google.protobuf.TextFormat; @InterfaceStability.Evolving public class ProtobufRpcEngine implements RpcEngine { public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class); - private static final ThreadLocal<Callable<?>> - RETURN_MESSAGE_CALLBACK = new ThreadLocal<>(); - + static { // Register the rpcRequest deserializer for WritableRpcEngine org.apache.hadoop.ipc.Server.registerProtocolEngine( RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class, @@ -81,12 +76,6 @@ public class ProtobufRpcEngine implements RpcEngine { private static final ClientCache CLIENTS = new ClientCache(); - @SuppressWarnings("unchecked") - @Unstable - public static <T> Callable<T> getReturnMessageCallback() { - return (Callable<T>) RETURN_MESSAGE_CALLBACK.get(); - } - public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { @@ -200,7 +189,7 @@ public class ProtobufRpcEngine implements RpcEngine { * the server. */ @Override - public Object invoke(Object proxy, final Method method, Object[] args) + public Object invoke(Object proxy, Method method, Object[] args) throws ServiceException { long startTime = 0; if (LOG.isDebugEnabled()) { @@ -262,23 +251,6 @@ public class ProtobufRpcEngine implements RpcEngine { LOG.debug("Call: " + method.getName() + " took " + callTime + "ms"); } - if (Client.isAsynchronousMode()) { - final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse(); - Callable<Message> callback = new Callable<Message>() { - @Override - public Message call() throws Exception { - return getReturnMessage(method, frrw.get()); - } - }; - RETURN_MESSAGE_CALLBACK.set(callback); - return null; - } else { - return getReturnMessage(method, val); - } - } - - private Message getReturnMessage(final Method method, - final RpcResponseWrapper rrw) throws ServiceException { Message prototype = null; try { prototype = getReturnProtoType(method); @@ -288,7 +260,7 @@ public class ProtobufRpcEngine implements RpcEngine { Message returnMessage; try { returnMessage = prototype.newBuilderForType() - .mergeFrom(rrw.theResponseRead).build(); + .mergeFrom(val.theResponseRead).build(); if (LOG.isTraceEnabled()) { LOG.trace(Thread.currentThread().getId() + ": Response <- " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java index 6cf75c7..de4395e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java @@ -84,7 +84,7 @@ public class TestAsyncIPC { try { final long param = TestIPC.RANDOM.nextLong(); TestIPC.call(client, param, server, conf); - Future<LongWritable> returnFuture = Client.getReturnRpcResponse(); + Future<LongWritable> returnFuture = Client.getReturnValue(); returnFutures.put(i, returnFuture); expectedValues.put(i, param); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java deleted file mode 100644 index 37899aa..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; -import org.apache.hadoop.ipc.Client; - -import com.google.common.util.concurrent.AbstractFuture; - -/**************************************************************** - * Implementation of the asynchronous distributed file system. - * This instance of this class is the way end-user code interacts - * with a Hadoop DistributedFileSystem in an asynchronous manner. - * - *****************************************************************/ -@Unstable -public class AsyncDistributedFileSystem { - - private final DistributedFileSystem dfs; - - AsyncDistributedFileSystem(final DistributedFileSystem dfs) { - this.dfs = dfs; - } - - static <T> Future<T> getReturnValue() { - final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB - .getReturnValueCallback(); - Future<T> returnFuture = new AbstractFuture<T>() { - public T get() throws InterruptedException, ExecutionException { - try { - set(returnValueCallback.call()); - } catch (Exception e) { - setException(e); - } - return super.get(); - } - }; - return returnFuture; - } - - /** - * Renames Path src to Path dst - * <ul> - * <li>Fails if src is a file and dst is a directory. - * <li>Fails if src is a directory and dst is a file. - * <li>Fails if the parent of dst does not exist or is a file. - * </ul> - * <p> - * If OVERWRITE option is not passed as an argument, rename fails if the dst - * already exists. - * <p> - * If OVERWRITE option is passed as an argument, rename overwrites the dst if - * it is a file or an empty directory. Rename fails if dst is a non-empty - * directory. - * <p> - * Note that atomicity of rename is dependent on the file system - * implementation. Please refer to the file system documentation for details. - * This default implementation is non atomic. - * - * @param src - * path to be renamed - * @param dst - * new path after rename - * @throws IOException - * on failure - * @return an instance of Future, #get of which is invoked to wait for - * asynchronous call being finished. - */ - public Future<Void> rename(Path src, Path dst, - final Options.Rename... options) throws IOException { - dfs.getFsStatistics().incrementWriteOps(1); - - final Path absSrc = dfs.fixRelativePart(src); - final Path absDst = dfs.fixRelativePart(dst); - - final boolean isAsync = Client.isAsynchronousMode(); - Client.setAsynchronousMode(true); - try { - dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst), - options); - return getReturnValue(); - } finally { - Client.setAsynchronousMode(isAsync); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 2ffe11a..27881d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -31,7 +31,6 @@ import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStorageLocation; @@ -206,7 +205,7 @@ public class DistributedFileSystem extends FileSystem { * @return path component of {file} * @throws IllegalArgumentException if URI does not belong to this DFS */ - String getPathName(Path file) { + private String getPathName(Path file) { checkPath(file); String result = file.toUri().getPath(); if (!DFSUtilClient.isValidName(result)) { @@ -2480,23 +2479,4 @@ public class DistributedFileSystem extends FileSystem { } return ret; } - - private final AsyncDistributedFileSystem adfs = - new AsyncDistributedFileSystem(this); - - /** @return an {@link AsyncDistributedFileSystem} object. */ - @Unstable - public AsyncDistributedFileSystem getAsyncDistributedFileSystem() { - return adfs; - } - - @Override - protected Path fixRelativePart(Path p) { - return super.fixRelativePart(p); - } - - Statistics getFsStatistics() { - return statistics; - } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 75fba21..6aeed28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -24,14 +24,11 @@ import java.util.EnumSet; import java.util.List; import com.google.common.collect.Lists; -import java.util.concurrent.Callable; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; -import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -155,14 +152,13 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPer import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto; @@ -174,9 +170,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufHelper; -import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; @@ -188,9 +182,12 @@ import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenReque import org.apache.hadoop.security.token.Token; import com.google.protobuf.ByteString; -import com.google.protobuf.Message; import com.google.protobuf.ServiceException; +import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; +import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos + .EncryptionZoneProto; + /** * This class forwards NN's ClientProtocol calls as RPC calls to the NN server * while translating from the parameter types used in ClientProtocol to the @@ -201,8 +198,6 @@ import com.google.protobuf.ServiceException; public class ClientNamenodeProtocolTranslatorPB implements ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator { final private ClientNamenodeProtocolPB rpcProxy; - private static final ThreadLocal<Callable<?>> - RETURN_VALUE_CALLBACK = new ThreadLocal<>(); static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST = GetServerDefaultsRequestProto.newBuilder().build(); @@ -235,12 +230,6 @@ public class ClientNamenodeProtocolTranslatorPB implements rpcProxy = proxy; } - @SuppressWarnings("unchecked") - @Unstable - public static <T> Callable<T> getReturnValueCallback() { - return (Callable<T>) RETURN_VALUE_CALLBACK.get(); - } - @Override public void close() { RPC.stopProxy(rpcProxy); @@ -476,7 +465,6 @@ public class ClientNamenodeProtocolTranslatorPB implements RenameRequestProto req = RenameRequestProto.newBuilder() .setSrc(src) .setDst(dst).build(); - try { return rpcProxy.rename(null, req).getResult(); } catch (ServiceException e) { @@ -501,22 +489,7 @@ public class ClientNamenodeProtocolTranslatorPB implements setDst(dst).setOverwriteDest(overwrite). build(); try { - if (Client.isAsynchronousMode()) { - rpcProxy.rename2(null, req); - - final Callable<Message> returnMessageCallback = ProtobufRpcEngine - .getReturnMessageCallback(); - Callable<Void> callBack = new Callable<Void>() { - @Override - public Void call() throws Exception { - returnMessageCallback.call(); - return null; - } - }; - RETURN_VALUE_CALLBACK.set(callBack); - } else { - rpcProxy.rename2(null, req); - } + rpcProxy.rename2(null, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d740a902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java deleted file mode 100644 index 9322e1a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.Options.Rename; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; -import org.apache.hadoop.security.UserGroupInformation; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class TestAsyncDFSRename { - final Path asyncRenameDir = new Path("/test/async_rename/"); - public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class); - final private static Configuration CONF = new HdfsConfiguration(); - - final private static String GROUP1_NAME = "group1"; - final private static String GROUP2_NAME = "group2"; - final private static String USER1_NAME = "user1"; - private static final UserGroupInformation USER1; - - private MiniDFSCluster gCluster; - - static { - // explicitly turn on permission checking - CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); - - // create fake mapping for the groups - Map<String, String[]> u2g_map = new HashMap<String, String[]>(1); - u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME }); - DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map); - - // Initiate all four users - USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] { - GROUP1_NAME, GROUP2_NAME }); - } - - @Before - public void setUp() throws IOException { - gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build(); - gCluster.waitActive(); - } - - @After - public void tearDown() throws IOException { - if (gCluster != null) { - gCluster.shutdown(); - gCluster = null; - } - } - - static int countLease(MiniDFSCluster cluster) { - return TestDFSRename.countLease(cluster); - } - - void list(DistributedFileSystem dfs, String name) throws IOException { - FileSystem.LOG.info("\n\n" + name); - for (FileStatus s : dfs.listStatus(asyncRenameDir)) { - FileSystem.LOG.info("" + s.getPath()); - } - } - - static void createFile(DistributedFileSystem dfs, Path f) throws IOException { - DataOutputStream a_out = dfs.create(f); - a_out.writeBytes("something"); - a_out.close(); - } - - /** - * Check the blocks of dst file are cleaned after rename with overwrite - * Restart NN to check the rename successfully - */ - @Test - public void testAsyncRenameWithOverwrite() throws Exception { - final short replFactor = 2; - final long blockSize = 512; - Configuration conf = new Configuration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( - replFactor).build(); - cluster.waitActive(); - DistributedFileSystem dfs = cluster.getFileSystem(); - AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); - - try { - - long fileLen = blockSize * 3; - String src = "/foo/src"; - String dst = "/foo/dst"; - String src2 = "/foo/src2"; - String dst2 = "/foo/dst2"; - Path srcPath = new Path(src); - Path dstPath = new Path(dst); - Path srcPath2 = new Path(src2); - Path dstPath2 = new Path(dst2); - - DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1); - DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1); - DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1); - DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1); - - LocatedBlocks lbs = NameNodeAdapter.getBlockLocations( - cluster.getNameNode(), dst, 0, fileLen); - LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations( - cluster.getNameNode(), dst2, 0, fileLen); - BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode()) - .getBlockManager(); - assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock() - .getLocalBlock()) != null); - assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock() - .getLocalBlock()) != null); - - Future<Void> retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE); - Future<Void> retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE); - retVal1.get(); - retVal2.get(); - - assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock() - .getLocalBlock()) == null); - assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock() - .getLocalBlock()) == null); - - // Restart NN and check the rename successfully - cluster.restartNameNodes(); - assertFalse(dfs.exists(srcPath)); - assertTrue(dfs.exists(dstPath)); - assertFalse(dfs.exists(srcPath2)); - assertTrue(dfs.exists(dstPath2)); - } finally { - if (dfs != null) { - dfs.close(); - } - if (cluster != null) { - cluster.shutdown(); - } - } - } - - @Test - public void testConcurrentAsyncRenameWithOverwrite() throws Exception { - final short replFactor = 2; - final long blockSize = 512; - final Path renameDir = new Path( - "/test/concurrent_reanme_with_overwrite_dir/"); - Configuration conf = new HdfsConfiguration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) - .build(); - cluster.waitActive(); - DistributedFileSystem dfs = cluster.getFileSystem(); - AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); - int count = 1000; - - try { - long fileLen = blockSize * 3; - assertTrue(dfs.mkdirs(renameDir)); - - Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>(); - - // concurrently invoking many rename - for (int i = 0; i < count; i++) { - Path src = new Path(renameDir, "src" + i); - Path dst = new Path(renameDir, "dst" + i); - DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1); - DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1); - Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); - returnFutures.put(i, returnFuture); - } - - // wait for completing the calls - for (int i = 0; i < count; i++) { - returnFutures.get(i).get(); - } - - // Restart NN and check the rename successfully - cluster.restartNameNodes(); - - // very the src dir should not exist, dst should - for (int i = 0; i < count; i++) { - Path src = new Path(renameDir, "src" + i); - Path dst = new Path(renameDir, "dst" + i); - assertFalse(dfs.exists(src)); - assertTrue(dfs.exists(dst)); - } - } finally { - dfs.delete(renameDir, true); - if (cluster != null) { - cluster.shutdown(); - } - } - } - - @Test - public void testAsyncRenameWithException() throws Exception { - FileSystem rootFs = FileSystem.get(CONF); - final Path renameDir = new Path("/test/async_rename_exception/"); - final Path src = new Path(renameDir, "src"); - final Path dst = new Path(renameDir, "dst"); - rootFs.mkdirs(src); - - AsyncDistributedFileSystem adfs = USER1 - .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() { - @Override - public AsyncDistributedFileSystem run() throws Exception { - return gCluster.getFileSystem().getAsyncDistributedFileSystem(); - } - }); - - try { - Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); - returnFuture.get(); - } catch (ExecutionException e) { - checkPermissionDenied(e, src); - } - } - - private void checkPermissionDenied(final Exception e, final Path dir) { - assertTrue(e.getCause() instanceof ExecutionException); - assertTrue("Permission denied messages must carry AccessControlException", - e.getMessage().contains("AccessControlException")); - assertTrue("Permission denied messages must carry the username", e - .getMessage().contains(USER1_NAME)); - assertTrue("Permission denied messages must carry the path parent", e - .getMessage().contains(dir.getParent().toUri().getPath())); - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org