http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 0000000,92d117c..be346a4 mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@@ -1,0 -1,3135 +1,3240 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs; + + import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT; + + import java.io.BufferedOutputStream; + import java.io.DataInputStream; + import java.io.DataOutputStream; + import java.io.FileNotFoundException; + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.net.InetAddress; + import java.net.InetSocketAddress; + import java.net.Socket; + import java.net.SocketAddress; + import java.net.URI; + import java.net.UnknownHostException; + import java.security.GeneralSecurityException; + import java.util.ArrayList; + import java.util.EnumSet; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + import java.util.Random; + import java.util.concurrent.SynchronousQueue; + import java.util.concurrent.ThreadLocalRandom; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicInteger; + + import javax.net.SocketFactory; + + import org.apache.hadoop.HadoopIllegalArgumentException; + import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.crypto.CipherSuite; + import org.apache.hadoop.crypto.CryptoCodec; + import org.apache.hadoop.crypto.CryptoInputStream; + import org.apache.hadoop.crypto.CryptoOutputStream; + import org.apache.hadoop.crypto.CryptoProtocolVersion; + import org.apache.hadoop.crypto.key.KeyProvider; + import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; + import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; + import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; + import org.apache.hadoop.fs.BlockLocation; + import org.apache.hadoop.fs.CacheFlag; + import org.apache.hadoop.fs.ContentSummary; + import org.apache.hadoop.fs.CreateFlag; + import org.apache.hadoop.fs.FileAlreadyExistsException; + import org.apache.hadoop.fs.FileEncryptionInfo; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.FsServerDefaults; + import org.apache.hadoop.fs.FsStatus; + import org.apache.hadoop.fs.FsTracer; + import org.apache.hadoop.fs.HdfsBlockLocation; + import org.apache.hadoop.fs.InvalidPathException; + import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum; + import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; + import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; + import org.apache.hadoop.fs.Options; + import org.apache.hadoop.fs.Options.ChecksumOpt; + import org.apache.hadoop.fs.ParentNotDirectoryException; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.fs.RemoteIterator; + import org.apache.hadoop.fs.StorageType; + import org.apache.hadoop.fs.UnresolvedLinkException; + import org.apache.hadoop.fs.XAttr; + import org.apache.hadoop.fs.XAttrSetFlag; + import org.apache.hadoop.fs.permission.AclEntry; + import org.apache.hadoop.fs.permission.AclStatus; + import org.apache.hadoop.fs.permission.FsAction; + import org.apache.hadoop.fs.permission.FsPermission; + import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; + import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + import org.apache.hadoop.hdfs.client.HdfsDataInputStream; + import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; + import org.apache.hadoop.hdfs.client.impl.DfsClientConf; + import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; + import org.apache.hadoop.hdfs.net.Peer; + import org.apache.hadoop.hdfs.protocol.AclException; + import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; + import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; + import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; + import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator; + import org.apache.hadoop.hdfs.protocol.CachePoolEntry; + import org.apache.hadoop.hdfs.protocol.CachePoolInfo; + import org.apache.hadoop.hdfs.protocol.CachePoolIterator; + import org.apache.hadoop.hdfs.protocol.ClientProtocol; + import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; + import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; + import org.apache.hadoop.hdfs.protocol.DatanodeID; + import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + import org.apache.hadoop.hdfs.protocol.DirectoryListing; + import org.apache.hadoop.hdfs.protocol.EncryptionZone; + import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator; + import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + import org.apache.hadoop.hdfs.protocol.HdfsConstants; + import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; + import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; + import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; + import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; + import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; + import org.apache.hadoop.hdfs.protocol.LocatedBlock; + import org.apache.hadoop.hdfs.protocol.LocatedBlocks; + import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; + import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; + import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; + import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; + import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; + import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; + import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; + import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; + import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; + import org.apache.hadoop.hdfs.protocol.datatransfer.Op; + import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; + import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; + import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; + import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; + import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; + import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; + import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; + import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; + import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; + import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; + import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; + import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; + import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; + import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; + import org.apache.hadoop.hdfs.server.namenode.SafeModeException; + import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; + import org.apache.hadoop.hdfs.util.IOUtilsClient; + import org.apache.hadoop.io.DataOutputBuffer; + import org.apache.hadoop.io.EnumSetWritable; + import org.apache.hadoop.io.IOUtils; + import org.apache.hadoop.io.MD5Hash; + import org.apache.hadoop.io.Text; ++import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; + import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; + import org.apache.hadoop.ipc.RPC; + import org.apache.hadoop.ipc.RemoteException; + import org.apache.hadoop.net.DNS; + import org.apache.hadoop.net.NetUtils; + import org.apache.hadoop.security.AccessControlException; + import org.apache.hadoop.security.UserGroupInformation; + import org.apache.hadoop.security.token.SecretManager.InvalidToken; + import org.apache.hadoop.security.token.Token; + import org.apache.hadoop.security.token.TokenRenewer; + import org.apache.hadoop.util.Daemon; + import org.apache.hadoop.util.DataChecksum; + import org.apache.hadoop.util.DataChecksum.Type; + import org.apache.hadoop.util.Progressable; + import org.apache.hadoop.util.Time; + import org.apache.htrace.core.TraceScope; + + import com.google.common.annotations.VisibleForTesting; + import com.google.common.base.Joiner; + import com.google.common.base.Preconditions; + import com.google.common.collect.Lists; + import com.google.common.net.InetAddresses; + import org.apache.htrace.core.Tracer; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /******************************************************** + * DFSClient can connect to a Hadoop Filesystem and + * perform basic file tasks. It uses the ClientProtocol + * to communicate with a NameNode daemon, and connects + * directly to DataNodes to read/write block data. + * + * Hadoop DFS users should obtain an instance of + * DistributedFileSystem, which uses DFSClient to handle + * filesystem tasks. + * + ********************************************************/ + @InterfaceAudience.Private + public class DFSClient implements java.io.Closeable, RemotePeerFactory, + DataEncryptionKeyFactory { + public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class); + public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour + + private final Configuration conf; + private final Tracer tracer; + private final DfsClientConf dfsClientConf; + final ClientProtocol namenode; + /* The service used for delegation tokens */ + private Text dtService; + + final UserGroupInformation ugi; + volatile boolean clientRunning = true; + volatile long lastLeaseRenewal; + private volatile FsServerDefaults serverDefaults; + private volatile long serverDefaultsLastUpdate; + final String clientName; + final SocketFactory socketFactory; + final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure; + final FileSystem.Statistics stats; + private final String authority; + private final Random r = new Random(); + private SocketAddress[] localInterfaceAddrs; + private DataEncryptionKey encryptionKey; + final SaslDataTransferClient saslClient; + private final CachingStrategy defaultReadCachingStrategy; + private final CachingStrategy defaultWriteCachingStrategy; + private final ClientContext clientContext; + + private static final DFSHedgedReadMetrics HEDGED_READ_METRIC = + new DFSHedgedReadMetrics(); + private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; ++ private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL; + private final int smallBufferSize; + + public DfsClientConf getConf() { + return dfsClientConf; + } + + Configuration getConfiguration() { + return conf; + } + + /** + * A map from file names to {@link DFSOutputStream} objects + * that are currently being written by this client. + * Note that a file can only be written by a single client. + */ + private final Map<Long, DFSOutputStream> filesBeingWritten + = new HashMap<Long, DFSOutputStream>(); + + /** + * Same as this(NameNode.getNNAddress(conf), conf); + * @see #DFSClient(InetSocketAddress, Configuration) + * @deprecated Deprecated at 0.21 + */ + @Deprecated + public DFSClient(Configuration conf) throws IOException { + this(DFSUtilClient.getNNAddress(conf), conf); + } + + public DFSClient(InetSocketAddress address, Configuration conf) throws IOException { + this(DFSUtilClient.getNNUri(address), conf); + } + + /** + * Same as this(nameNodeUri, conf, null); + * @see #DFSClient(URI, Configuration, FileSystem.Statistics) + */ + public DFSClient(URI nameNodeUri, Configuration conf + ) throws IOException { + this(nameNodeUri, conf, null); + } + + /** + * Same as this(nameNodeUri, null, conf, stats); + * @see #DFSClient(URI, ClientProtocol, Configuration, FileSystem.Statistics) + */ + public DFSClient(URI nameNodeUri, Configuration conf, + FileSystem.Statistics stats) + throws IOException { + this(nameNodeUri, null, conf, stats); + } + + /** + * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode. + * If HA is enabled and a positive value is set for + * {@link HdfsClientConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} + * in the configuration, the DFSClient will use + * {@link LossyRetryInvocationHandler} as its RetryInvocationHandler. + * Otherwise one of nameNodeUri or rpcNamenode must be null. + */ + @VisibleForTesting + public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, + Configuration conf, FileSystem.Statistics stats) + throws IOException { + // Copy only the required DFSClient configuration + this.tracer = FsTracer.get(conf); + this.dfsClientConf = new DfsClientConf(conf); + this.conf = conf; + this.stats = stats; + this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); + this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); + this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); + + this.ugi = UserGroupInformation.getCurrentUser(); + + this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority(); + this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" + + ThreadLocalRandom.current().nextInt() + "_" + + Thread.currentThread().getId(); + int numResponseToDrop = conf.getInt( + HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, + HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT); + ProxyAndInfo<ClientProtocol> proxyInfo = null; + AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false); + + if (numResponseToDrop > 0) { + // This case is used for testing. + LOG.warn(HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY + + " is set to " + numResponseToDrop + + ", this hacked client will proactively drop responses"); + proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf, + nameNodeUri, ClientProtocol.class, numResponseToDrop, + nnFallbackToSimpleAuth); + } + + if (proxyInfo != null) { + this.dtService = proxyInfo.getDelegationTokenService(); + this.namenode = proxyInfo.getProxy(); + } else if (rpcNamenode != null) { + // This case is used for testing. + Preconditions.checkArgument(nameNodeUri == null); + this.namenode = rpcNamenode; + dtService = null; + } else { + Preconditions.checkArgument(nameNodeUri != null, + "null URI"); + proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, + nameNodeUri, nnFallbackToSimpleAuth); + this.dtService = proxyInfo.getDelegationTokenService(); + this.namenode = proxyInfo.getProxy(); + } + + String localInterfaces[] = + conf.getTrimmedStrings(HdfsClientConfigKeys.DFS_CLIENT_LOCAL_INTERFACES); + localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces); + if (LOG.isDebugEnabled() && 0 != localInterfaces.length) { + LOG.debug("Using local interfaces [" + + Joiner.on(',').join(localInterfaces)+ "] with addresses [" + + Joiner.on(',').join(localInterfaceAddrs) + "]"); + } + + Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ? + null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false); + Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ? + null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0); + Boolean writeDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ? + null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false); + this.defaultReadCachingStrategy = + new CachingStrategy(readDropBehind, readahead); + this.defaultWriteCachingStrategy = + new CachingStrategy(writeDropBehind, readahead); + this.clientContext = ClientContext.get( + conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT), + dfsClientConf); + + if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) { - this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize()); ++ this.initThreadsNumForHedgedReads(dfsClientConf. ++ getHedgedReadThreadpoolSize()); + } ++ ++ this.initThreadsNumForStripedReads(dfsClientConf. ++ getStripedReadThreadpoolSize()); + this.saslClient = new SaslDataTransferClient( + conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), + TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); + } + + /** + * Return the socket addresses to use with each configured + * local interface. Local interfaces may be specified by IP + * address, IP address range using CIDR notation, interface + * name (e.g. eth0) or sub-interface name (e.g. eth0:0). + * The socket addresses consist of the IPs for the interfaces + * and the ephemeral port (port 0). If an IP, IP range, or + * interface name matches an interface with sub-interfaces + * only the IP of the interface is used. Sub-interfaces can + * be used by specifying them explicitly (by IP or name). + * + * @return SocketAddresses for the configured local interfaces, + * or an empty array if none are configured + * @throws UnknownHostException if a given interface name is invalid + */ + private static SocketAddress[] getLocalInterfaceAddrs( + String interfaceNames[]) throws UnknownHostException { + List<SocketAddress> localAddrs = new ArrayList<SocketAddress>(); + for (String interfaceName : interfaceNames) { + if (InetAddresses.isInetAddress(interfaceName)) { + localAddrs.add(new InetSocketAddress(interfaceName, 0)); + } else if (NetUtils.isValidSubnet(interfaceName)) { + for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) { + localAddrs.add(new InetSocketAddress(addr, 0)); + } + } else { + for (String ip : DNS.getIPs(interfaceName, false)) { + localAddrs.add(new InetSocketAddress(ip, 0)); + } + } + } + return localAddrs.toArray(new SocketAddress[localAddrs.size()]); + } + + /** + * Select one of the configured local interfaces at random. We use a random + * interface because other policies like round-robin are less effective + * given that we cache connections to datanodes. + * + * @return one of the local interface addresses at random, or null if no + * local interfaces are configured + */ + SocketAddress getRandomLocalInterfaceAddr() { + if (localInterfaceAddrs.length == 0) { + return null; + } + final int idx = r.nextInt(localInterfaceAddrs.length); + final SocketAddress addr = localInterfaceAddrs[idx]; + if (LOG.isDebugEnabled()) { + LOG.debug("Using local interface " + addr); + } + return addr; + } + + /** + * Return the timeout that clients should use when writing to datanodes. + * @param numNodes the number of nodes in the pipeline. + */ + int getDatanodeWriteTimeout(int numNodes) { + final int t = dfsClientConf.getDatanodeSocketWriteTimeout(); + return t > 0? t + HdfsConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0; + } + + int getDatanodeReadTimeout(int numNodes) { + final int t = dfsClientConf.getSocketTimeout(); + return t > 0? HdfsConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0; + } + + @VisibleForTesting + public String getClientName() { + return clientName; + } + + void checkOpen() throws IOException { + if (!clientRunning) { + IOException result = new IOException("Filesystem closed"); + throw result; + } + } + + /** Return the lease renewer instance. The renewer thread won't start + * until the first output stream is created. The same instance will + * be returned until all output streams are closed. + */ + public LeaseRenewer getLeaseRenewer() throws IOException { + return LeaseRenewer.getInstance(authority, ugi, this); + } + + /** Get a lease and start automatic renewal */ + private void beginFileLease(final long inodeId, final DFSOutputStream out) + throws IOException { + getLeaseRenewer().put(inodeId, out, this); + } + + /** Stop renewal of lease for the file. */ + void endFileLease(final long inodeId) throws IOException { + getLeaseRenewer().closeFile(inodeId, this); + } + + + /** Put a file. Only called from LeaseRenewer, where proper locking is + * enforced to consistently update its local dfsclients array and + * client's filesBeingWritten map. + */ + public void putFileBeingWritten(final long inodeId, final DFSOutputStream out) { + synchronized(filesBeingWritten) { + filesBeingWritten.put(inodeId, out); + // update the last lease renewal time only when there was no + // writes. once there is one write stream open, the lease renewer + // thread keeps it updated well with in anyone's expiration time. + if (lastLeaseRenewal == 0) { + updateLastLeaseRenewal(); + } + } + } + + /** Remove a file. Only called from LeaseRenewer. */ + public void removeFileBeingWritten(final long inodeId) { + synchronized(filesBeingWritten) { + filesBeingWritten.remove(inodeId); + if (filesBeingWritten.isEmpty()) { + lastLeaseRenewal = 0; + } + } + } + + /** Is file-being-written map empty? */ + public boolean isFilesBeingWrittenEmpty() { + synchronized(filesBeingWritten) { + return filesBeingWritten.isEmpty(); + } + } + + /** @return true if the client is running */ + public boolean isClientRunning() { + return clientRunning; + } + + long getLastLeaseRenewal() { + return lastLeaseRenewal; + } + + void updateLastLeaseRenewal() { + synchronized(filesBeingWritten) { + if (filesBeingWritten.isEmpty()) { + return; + } + lastLeaseRenewal = Time.monotonicNow(); + } + } + + /** + * Renew leases. + * @return true if lease was renewed. May return false if this + * client has been closed or has no files open. + **/ + public boolean renewLease() throws IOException { + if (clientRunning && !isFilesBeingWrittenEmpty()) { + try { + namenode.renewLease(clientName); + updateLastLeaseRenewal(); + return true; + } catch (IOException e) { + // Abort if the lease has already expired. + final long elapsed = Time.monotonicNow() - getLastLeaseRenewal(); + if (elapsed > HdfsConstants.LEASE_HARDLIMIT_PERIOD) { + LOG.warn("Failed to renew lease for " + clientName + " for " + + (elapsed/1000) + " seconds (>= hard-limit =" + + (HdfsConstants.LEASE_HARDLIMIT_PERIOD / 1000) + " seconds.) " + + "Closing all files being written ...", e); + closeAllFilesBeingWritten(true); + } else { + // Let the lease renewer handle it and retry. + throw e; + } + } + } + return false; + } + + /** + * Close connections the Namenode. + */ + void closeConnectionToNamenode() { + RPC.stopProxy(namenode); + } + + /** Close/abort all files being written. */ + public void closeAllFilesBeingWritten(final boolean abort) { + for(;;) { + final long inodeId; + final DFSOutputStream out; + synchronized(filesBeingWritten) { + if (filesBeingWritten.isEmpty()) { + return; + } + inodeId = filesBeingWritten.keySet().iterator().next(); + out = filesBeingWritten.remove(inodeId); + } + if (out != null) { + try { + if (abort) { + out.abort(); + } else { + out.close(); + } + } catch(IOException ie) { + LOG.error("Failed to " + (abort ? "abort" : "close") + " file: " + + out.getSrc() + " with inode: " + inodeId, ie); + } + } + } + } + + /** + * Close the file system, abandoning all of the leases and files being + * created and close connections to the namenode. + */ + @Override + public synchronized void close() throws IOException { + if(clientRunning) { + closeAllFilesBeingWritten(false); + clientRunning = false; + getLeaseRenewer().closeClient(this); + // close connections to the namenode + closeConnectionToNamenode(); + } + } + + /** + * Close all open streams, abandoning all of the leases and files being + * created. + * @param abort whether streams should be gracefully closed + */ + public void closeOutputStreams(boolean abort) { + if (clientRunning) { + closeAllFilesBeingWritten(abort); + } + } + + /** + * @see ClientProtocol#getPreferredBlockSize(String) + */ + public long getBlockSize(String f) throws IOException { + checkOpen(); + TraceScope scope = newPathTraceScope("getBlockSize", f); + try { + return namenode.getPreferredBlockSize(f); + } catch (IOException ie) { + LOG.warn("Problem getting block size", ie); + throw ie; + } finally { + scope.close(); + } + } + + /** + * Get server default values for a number of configuration params. + * @see ClientProtocol#getServerDefaults() + */ + public FsServerDefaults getServerDefaults() throws IOException { + checkOpen(); + long now = Time.monotonicNow(); + if ((serverDefaults == null) || + (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD)) { + serverDefaults = namenode.getServerDefaults(); + serverDefaultsLastUpdate = now; + } + assert serverDefaults != null; + return serverDefaults; + } + + /** + * Get a canonical token service name for this client's tokens. Null should + * be returned if the client is not using tokens. + * @return the token service for the client + */ + @InterfaceAudience.LimitedPrivate( { "HDFS" }) + public String getCanonicalServiceName() { + return (dtService != null) ? dtService.toString() : null; + } + + /** + * @see ClientProtocol#getDelegationToken(Text) + */ + public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) + throws IOException { + assert dtService != null; + TraceScope scope = tracer.newScope("getDelegationToken"); + try { + Token<DelegationTokenIdentifier> token = + namenode.getDelegationToken(renewer); + if (token != null) { + token.setService(this.dtService); + LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token)); + } else { + LOG.info("Cannot get delegation token from " + renewer); + } + return token; + } finally { + scope.close(); + } + } + + /** + * Renew a delegation token + * @param token the token to renew + * @return the new expiration time + * @throws InvalidToken + * @throws IOException + * @deprecated Use Token.renew instead. + */ + @Deprecated + public long renewDelegationToken(Token<DelegationTokenIdentifier> token) + throws InvalidToken, IOException { + LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token)); + try { + return token.renew(conf); + } catch (InterruptedException ie) { + throw new RuntimeException("caught interrupted", ie); + } catch (RemoteException re) { + throw re.unwrapRemoteException(InvalidToken.class, + AccessControlException.class); + } + } + + /** + * Cancel a delegation token + * @param token the token to cancel + * @throws InvalidToken + * @throws IOException + * @deprecated Use Token.cancel instead. + */ + @Deprecated + public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) + throws InvalidToken, IOException { + LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token)); + try { + token.cancel(conf); + } catch (InterruptedException ie) { + throw new RuntimeException("caught interrupted", ie); + } catch (RemoteException re) { + throw re.unwrapRemoteException(InvalidToken.class, + AccessControlException.class); + } + } + + @InterfaceAudience.Private + public static class Renewer extends TokenRenewer { + + static { + //Ensure that HDFS Configuration files are loaded before trying to use + // the renewer. + HdfsConfigurationLoader.init(); + } + + @Override + public boolean handleKind(Text kind) { + return DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind); + } + + @SuppressWarnings("unchecked") + @Override + public long renew(Token<?> token, Configuration conf) throws IOException { + Token<DelegationTokenIdentifier> delToken = + (Token<DelegationTokenIdentifier>) token; + ClientProtocol nn = getNNProxy(delToken, conf); + try { + return nn.renewDelegationToken(delToken); + } catch (RemoteException re) { + throw re.unwrapRemoteException(InvalidToken.class, + AccessControlException.class); + } + } + + @SuppressWarnings("unchecked") + @Override + public void cancel(Token<?> token, Configuration conf) throws IOException { + Token<DelegationTokenIdentifier> delToken = + (Token<DelegationTokenIdentifier>) token; + LOG.info("Cancelling " + + DelegationTokenIdentifier.stringifyToken(delToken)); + ClientProtocol nn = getNNProxy(delToken, conf); + try { + nn.cancelDelegationToken(delToken); + } catch (RemoteException re) { + throw re.unwrapRemoteException(InvalidToken.class, + AccessControlException.class); + } + } + + private static ClientProtocol getNNProxy( + Token<DelegationTokenIdentifier> token, Configuration conf) + throws IOException { + URI uri = HAUtilClient.getServiceUriFromToken( + HdfsConstants.HDFS_URI_SCHEME, token); + if (HAUtilClient.isTokenForLogicalUri(token) && + !HAUtilClient.isLogicalUri(conf, uri)) { + // If the token is for a logical nameservice, but the configuration + // we have disagrees about that, we can't actually renew it. + // This can be the case in MR, for example, if the RM doesn't + // have all of the HA clusters configured in its configuration. + throw new IOException("Unable to map logical nameservice URI '" + + uri + "' to a NameNode. Local configuration does not have " + + "a failover proxy provider configured."); + } + + ProxyAndInfo<ClientProtocol> info = + NameNodeProxiesClient.createProxyWithClientProtocol(conf, uri, null); + assert info.getDelegationTokenService().equals(token.getService()) : + "Returned service '" + info.getDelegationTokenService().toString() + + "' doesn't match expected service '" + + token.getService().toString() + "'"; + + return info.getProxy(); + } + + @Override + public boolean isManaged(Token<?> token) throws IOException { + return true; + } + + } + + /** + * Report corrupt blocks that were discovered by the client. + * @see ClientProtocol#reportBadBlocks(LocatedBlock[]) + */ + public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { + checkOpen(); + namenode.reportBadBlocks(blocks); + } + + public LocatedBlocks getLocatedBlocks(String src, long start) + throws IOException { + return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize()); + } + + /* + * This is just a wrapper around callGetBlockLocations, but non-static so that + * we can stub it out for tests. + */ + @VisibleForTesting + public LocatedBlocks getLocatedBlocks(String src, long start, long length) + throws IOException { + TraceScope scope = newPathTraceScope("getBlockLocations", src); + try { + return callGetBlockLocations(namenode, src, start, length); + } finally { + scope.close(); + } + } + + /** + * @see ClientProtocol#getBlockLocations(String, long, long) + */ + static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, + String src, long start, long length) + throws IOException { + try { + return namenode.getBlockLocations(src, start, length); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + UnresolvedPathException.class); + } + } + + /** + * Recover a file's lease + * @param src a file's path + * @return true if the file is already closed + * @throws IOException + */ + boolean recoverLease(String src) throws IOException { + checkOpen(); + + TraceScope scope = newPathTraceScope("recoverLease", src); + try { + return namenode.recoverLease(src, clientName); + } catch (RemoteException re) { + throw re.unwrapRemoteException(FileNotFoundException.class, + AccessControlException.class, + UnresolvedPathException.class); + } finally { + scope.close(); + } + } + + /** + * Get block location info about file + * + * getBlockLocations() returns a list of hostnames that store + * data for a specific file region. It returns a set of hostnames + * for every block within the indicated region. + * + * This function is very useful when writing code that considers + * data-placement when performing operations. For example, the + * MapReduce system tries to schedule tasks on the same machines + * as the data-block the task processes. + */ + public BlockLocation[] getBlockLocations(String src, long start, + long length) throws IOException, UnresolvedLinkException { + checkOpen(); + TraceScope scope = newPathTraceScope("getBlockLocations", src); + try { + LocatedBlocks blocks = getLocatedBlocks(src, start, length); + BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks); + HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length]; + for (int i = 0; i < locations.length; i++) { + hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i)); + } + return hdfsLocations; + } finally { + scope.close(); + } + } + + /** + * Decrypts a EDEK by consulting the KeyProvider. + */ + private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo + feInfo) throws IOException { + TraceScope scope = tracer.newScope("decryptEDEK"); + try { + KeyProvider provider = getKeyProvider(); + if (provider == null) { + throw new IOException("No KeyProvider is configured, cannot access" + + " an encrypted file"); + } + EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption( + feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(), + feInfo.getEncryptedDataEncryptionKey()); + try { + KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension + .createKeyProviderCryptoExtension(provider); + return cryptoProvider.decryptEncryptedKey(ekv); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } finally { + scope.close(); + } + } + + /** + * Obtain the crypto protocol version from the provided FileEncryptionInfo, + * checking to see if this version is supported by. + * + * @param feInfo FileEncryptionInfo + * @return CryptoProtocolVersion from the feInfo + * @throws IOException if the protocol version is unsupported. + */ + private static CryptoProtocolVersion getCryptoProtocolVersion + (FileEncryptionInfo feInfo) throws IOException { + final CryptoProtocolVersion version = feInfo.getCryptoProtocolVersion(); + if (!CryptoProtocolVersion.supports(version)) { + throw new IOException("Client does not support specified " + + "CryptoProtocolVersion " + version.getDescription() + " version " + + "number" + version.getVersion()); + } + return version; + } + + /** + * Obtain a CryptoCodec based on the CipherSuite set in a FileEncryptionInfo + * and the available CryptoCodecs configured in the Configuration. + * + * @param conf Configuration + * @param feInfo FileEncryptionInfo + * @return CryptoCodec + * @throws IOException if no suitable CryptoCodec for the CipherSuite is + * available. + */ + private static CryptoCodec getCryptoCodec(Configuration conf, + FileEncryptionInfo feInfo) throws IOException { + final CipherSuite suite = feInfo.getCipherSuite(); + if (suite.equals(CipherSuite.UNKNOWN)) { + throw new IOException("NameNode specified unknown CipherSuite with ID " + + suite.getUnknownValue() + ", cannot instantiate CryptoCodec."); + } + final CryptoCodec codec = CryptoCodec.getInstance(conf, suite); + if (codec == null) { + throw new UnknownCipherSuiteException( + "No configuration found for the cipher suite " + + suite.getConfigSuffix() + " prefixed with " + + HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX + + ". Please see the example configuration " + + "hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE " + + "at core-default.xml for details."); + } + return codec; + } + + /** + * Wraps the stream in a CryptoInputStream if the underlying file is + * encrypted. + */ + public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis) + throws IOException { + final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo(); + if (feInfo != null) { + // File is encrypted, wrap the stream in a crypto stream. + // Currently only one version, so no special logic based on the version # + getCryptoProtocolVersion(feInfo); + final CryptoCodec codec = getCryptoCodec(conf, feInfo); + final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo); + final CryptoInputStream cryptoIn = + new CryptoInputStream(dfsis, codec, decrypted.getMaterial(), + feInfo.getIV()); + return new HdfsDataInputStream(cryptoIn); + } else { + // No FileEncryptionInfo so no encryption. + return new HdfsDataInputStream(dfsis); + } + } + + /** + * Wraps the stream in a CryptoOutputStream if the underlying file is + * encrypted. + */ + public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos, + FileSystem.Statistics statistics) throws IOException { + return createWrappedOutputStream(dfsos, statistics, 0); + } + + /** + * Wraps the stream in a CryptoOutputStream if the underlying file is + * encrypted. + */ + public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos, + FileSystem.Statistics statistics, long startPos) throws IOException { + final FileEncryptionInfo feInfo = dfsos.getFileEncryptionInfo(); + if (feInfo != null) { + // File is encrypted, wrap the stream in a crypto stream. + // Currently only one version, so no special logic based on the version # + getCryptoProtocolVersion(feInfo); + final CryptoCodec codec = getCryptoCodec(conf, feInfo); + KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo); + final CryptoOutputStream cryptoOut = + new CryptoOutputStream(dfsos, codec, + decrypted.getMaterial(), feInfo.getIV(), startPos); + return new HdfsDataOutputStream(cryptoOut, statistics, startPos); + } else { + // No FileEncryptionInfo present so no encryption. + return new HdfsDataOutputStream(dfsos, statistics, startPos); + } + } + + public DFSInputStream open(String src) + throws IOException, UnresolvedLinkException { + return open(src, dfsClientConf.getIoBufferSize(), true, null); + } + + /** + * Create an input stream that obtains a nodelist from the + * namenode, and then reads from all the right places. Creates + * inner subclass of InputStream that does the right out-of-band + * work. + * @deprecated Use {@link #open(String, int, boolean)} instead. + */ + @Deprecated + public DFSInputStream open(String src, int buffersize, boolean verifyChecksum, + FileSystem.Statistics stats) + throws IOException, UnresolvedLinkException { + return open(src, buffersize, verifyChecksum); + } + + + /** + * Create an input stream that obtains a nodelist from the + * namenode, and then reads from all the right places. Creates + * inner subclass of InputStream that does the right out-of-band + * work. + */ + public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) + throws IOException, UnresolvedLinkException { + checkOpen(); + // Get block info from namenode + TraceScope scope = newPathTraceScope("newDFSInputStream", src); + try { - return new DFSInputStream(this, src, verifyChecksum, null); ++ LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0); ++ if (locatedBlocks != null) { ++ ErasureCodingPolicy ecPolicy = locatedBlocks.getErasureCodingPolicy(); ++ if (ecPolicy != null) { ++ return new DFSStripedInputStream(this, src, verifyChecksum, ecPolicy, ++ locatedBlocks); ++ } ++ return new DFSInputStream(this, src, verifyChecksum, locatedBlocks); ++ } else { ++ throw new IOException("Cannot open filename " + src); ++ } + } finally { + scope.close(); + } + } + + /** + * Get the namenode associated with this DFSClient object + * @return the namenode associated with this DFSClient object + */ + public ClientProtocol getNamenode() { + return namenode; + } + + /** + * Call {@link #create(String, boolean, short, long, Progressable)} with + * default <code>replication</code> and <code>blockSize<code> and null <code> + * progress</code>. + */ + public OutputStream create(String src, boolean overwrite) + throws IOException { + return create(src, overwrite, dfsClientConf.getDefaultReplication(), + dfsClientConf.getDefaultBlockSize(), null); + } + + /** + * Call {@link #create(String, boolean, short, long, Progressable)} with + * default <code>replication</code> and <code>blockSize<code>. + */ + public OutputStream create(String src, + boolean overwrite, + Progressable progress) throws IOException { + return create(src, overwrite, dfsClientConf.getDefaultReplication(), + dfsClientConf.getDefaultBlockSize(), progress); + } + + /** + * Call {@link #create(String, boolean, short, long, Progressable)} with + * null <code>progress</code>. + */ + public OutputStream create(String src, + boolean overwrite, + short replication, + long blockSize) throws IOException { + return create(src, overwrite, replication, blockSize, null); + } + + /** + * Call {@link #create(String, boolean, short, long, Progressable, int)} + * with default bufferSize. + */ + public OutputStream create(String src, boolean overwrite, short replication, + long blockSize, Progressable progress) throws IOException { + return create(src, overwrite, replication, blockSize, progress, + dfsClientConf.getIoBufferSize()); + } + + /** + * Call {@link #create(String, FsPermission, EnumSet, short, long, + * Progressable, int, ChecksumOpt)} with default <code>permission</code> + * {@link FsPermission#getFileDefault()}. + * + * @param src File name + * @param overwrite overwrite an existing file if true + * @param replication replication factor for the file + * @param blockSize maximum block size + * @param progress interface for reporting client progress + * @param buffersize underlying buffersize + * + * @return output stream + */ + public OutputStream create(String src, + boolean overwrite, + short replication, + long blockSize, + Progressable progress, + int buffersize) + throws IOException { + return create(src, FsPermission.getFileDefault(), + overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) + : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress, + buffersize, null); + } + + /** + * Call {@link #create(String, FsPermission, EnumSet, boolean, short, + * long, Progressable, int, ChecksumOpt)} with <code>createParent</code> + * set to true. + */ + public DFSOutputStream create(String src, + FsPermission permission, + EnumSet<CreateFlag> flag, + short replication, + long blockSize, + Progressable progress, + int buffersize, + ChecksumOpt checksumOpt) + throws IOException { + return create(src, permission, flag, true, + replication, blockSize, progress, buffersize, checksumOpt, null); + } + + /** + * Create a new dfs file with the specified block replication + * with write-progress reporting and return an output stream for writing + * into the file. + * + * @param src File name + * @param permission The permission of the directory being created. + * If null, use default permission {@link FsPermission#getFileDefault()} + * @param flag indicates create a new file or create/overwrite an + * existing file or append to an existing file + * @param createParent create missing parent directory if true + * @param replication block replication + * @param blockSize maximum block size + * @param progress interface for reporting client progress + * @param buffersize underlying buffer size + * @param checksumOpt checksum options + * + * @return output stream + * + * @see ClientProtocol#create for detailed description of exceptions thrown + */ + public DFSOutputStream create(String src, + FsPermission permission, + EnumSet<CreateFlag> flag, + boolean createParent, + short replication, + long blockSize, + Progressable progress, + int buffersize, + ChecksumOpt checksumOpt) throws IOException { - return create(src, permission, flag, createParent, replication, blockSize, ++ return create(src, permission, flag, createParent, replication, blockSize, + progress, buffersize, checksumOpt, null); + } + + private FsPermission applyUMask(FsPermission permission) { + if (permission == null) { + permission = FsPermission.getFileDefault(); + } + return permission.applyUMask(dfsClientConf.getUMask()); + } + + /** + * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, + * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is + * a hint to where the namenode should place the file blocks. + * The favored nodes hint is not persisted in HDFS. Hence it may be honored + * at the creation time only. HDFS could move the blocks during balancing or + * replication, to move the blocks from favored nodes. A value of null means + * no favored nodes for this create + */ + public DFSOutputStream create(String src, + FsPermission permission, + EnumSet<CreateFlag> flag, + boolean createParent, + short replication, + long blockSize, + Progressable progress, + int buffersize, + ChecksumOpt checksumOpt, + InetSocketAddress[] favoredNodes) throws IOException { + checkOpen(); + final FsPermission masked = applyUMask(permission); + if(LOG.isDebugEnabled()) { + LOG.debug(src + ": masked=" + masked); + } + final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, + src, masked, flag, createParent, replication, blockSize, progress, + buffersize, dfsClientConf.createChecksum(checksumOpt), + getFavoredNodesStr(favoredNodes)); + beginFileLease(result.getFileId(), result); + return result; + } + + private String[] getFavoredNodesStr(InetSocketAddress[] favoredNodes) { + String[] favoredNodeStrs = null; + if (favoredNodes != null) { + favoredNodeStrs = new String[favoredNodes.length]; + for (int i = 0; i < favoredNodes.length; i++) { + favoredNodeStrs[i] = + favoredNodes[i].getHostName() + ":" + + favoredNodes[i].getPort(); + } + } + return favoredNodeStrs; + } + + /** + * Append to an existing file if {@link CreateFlag#APPEND} is present + */ + private DFSOutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag, + int buffersize, Progressable progress) throws IOException { + if (flag.contains(CreateFlag.APPEND)) { + HdfsFileStatus stat = getFileInfo(src); + if (stat == null) { // No file to append to + // New file needs to be created if create option is present + if (!flag.contains(CreateFlag.CREATE)) { + throw new FileNotFoundException("failed to append to non-existent file " + + src + " on client " + clientName); + } + return null; + } + return callAppend(src, buffersize, flag, progress, null); + } + return null; + } + + /** + * Same as {{@link #create(String, FsPermission, EnumSet, short, long, + * Progressable, int, ChecksumOpt)} except that the permission + * is absolute (ie has already been masked with umask. + */ + public DFSOutputStream primitiveCreate(String src, + FsPermission absPermission, + EnumSet<CreateFlag> flag, + boolean createParent, + short replication, + long blockSize, + Progressable progress, + int buffersize, + ChecksumOpt checksumOpt) + throws IOException, UnresolvedLinkException { + checkOpen(); + CreateFlag.validate(flag); + DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress); + if (result == null) { + DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt); + result = DFSOutputStream.newStreamForCreate(this, src, absPermission, + flag, createParent, replication, blockSize, progress, buffersize, + checksum, null); + } + beginFileLease(result.getFileId(), result); + return result; + } + + /** + * Creates a symbolic link. + * + * @see ClientProtocol#createSymlink(String, String,FsPermission, boolean) + */ + public void createSymlink(String target, String link, boolean createParent) + throws IOException { + checkOpen(); + TraceScope scope = newPathTraceScope("createSymlink", target); + try { + final FsPermission dirPerm = applyUMask(null); + namenode.createSymlink(target, link, dirPerm, createParent); + } catch (RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileAlreadyExistsException.class, + FileNotFoundException.class, + ParentNotDirectoryException.class, + NSQuotaExceededException.class, + DSQuotaExceededException.class, + QuotaByStorageTypeExceededException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class); + } finally { + scope.close(); + } + } + + /** + * Resolve the *first* symlink, if any, in the path. + * + * @see ClientProtocol#getLinkTarget(String) + */ + public String getLinkTarget(String path) throws IOException { + checkOpen(); + TraceScope scope = newPathTraceScope("getLinkTarget", path); + try { + return namenode.getLinkTarget(path); + } catch (RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class); + } finally { + scope.close(); + } + } + + /** Method to get stream returned by append call */ + private DFSOutputStream callAppend(String src, int buffersize, + EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes) + throws IOException { + CreateFlag.validateForAppend(flag); + try { + LastBlockWithStatus blkWithStatus = namenode.append(src, clientName, + new EnumSetWritable<>(flag, CreateFlag.class)); + return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize, + progress, blkWithStatus.getLastBlock(), + blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(null), + favoredNodes); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + SafeModeException.class, + DSQuotaExceededException.class, + QuotaByStorageTypeExceededException.class, + UnsupportedOperationException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class); + } + } + + /** + * Append to an existing HDFS file. + * + * @param src file name + * @param buffersize buffer size + * @param flag indicates whether to append data to a new block instead of + * the last block + * @param progress for reporting write-progress; null is acceptable. + * @param statistics file system statistics; null is acceptable. + * @return an output stream for writing into the file + * + * @see ClientProtocol#append(String, String, EnumSetWritable) + */ + public HdfsDataOutputStream append(final String src, final int buffersize, + EnumSet<CreateFlag> flag, final Progressable progress, + final FileSystem.Statistics statistics) throws IOException { + final DFSOutputStream out = append(src, buffersize, flag, null, progress); + return createWrappedOutputStream(out, statistics, out.getInitialLen()); + } + + /** + * Append to an existing HDFS file. + * + * @param src file name + * @param buffersize buffer size + * @param flag indicates whether to append data to a new block instead of the + * last block + * @param progress for reporting write-progress; null is acceptable. + * @param statistics file system statistics; null is acceptable. + * @param favoredNodes FavoredNodes for new blocks + * @return an output stream for writing into the file + * @see ClientProtocol#append(String, String, EnumSetWritable) + */ + public HdfsDataOutputStream append(final String src, final int buffersize, + EnumSet<CreateFlag> flag, final Progressable progress, + final FileSystem.Statistics statistics, + final InetSocketAddress[] favoredNodes) throws IOException { + final DFSOutputStream out = append(src, buffersize, flag, + getFavoredNodesStr(favoredNodes), progress); + return createWrappedOutputStream(out, statistics, out.getInitialLen()); + } + + private DFSOutputStream append(String src, int buffersize, + EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress) + throws IOException { + checkOpen(); + final DFSOutputStream result = callAppend(src, buffersize, flag, progress, + favoredNodes); + beginFileLease(result.getFileId(), result); + return result; + } + + /** + * Set replication for an existing file. + * @param src file name + * @param replication replication to set the file to + * + * @see ClientProtocol#setReplication(String, short) + */ + public boolean setReplication(String src, short replication) + throws IOException { + checkOpen(); + TraceScope scope = newPathTraceScope("setReplication", src); + try { + return namenode.setReplication(src, replication); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + SafeModeException.class, + DSQuotaExceededException.class, + QuotaByStorageTypeExceededException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class); + } finally { + scope.close(); + } + } + + /** + * Set storage policy for an existing file/directory + * @param src file/directory name + * @param policyName name of the storage policy + */ + public void setStoragePolicy(String src, String policyName) + throws IOException { + checkOpen(); + TraceScope scope = newPathTraceScope("setStoragePolicy", src); + try { + namenode.setStoragePolicy(src, policyName); + } catch (RemoteException e) { + throw e.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + SafeModeException.class, + NSQuotaExceededException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class); + } finally { + scope.close(); + } + } + + /** + * @param path file/directory name + * @return Get the storage policy for specified path + */ + public BlockStoragePolicy getStoragePolicy(String path) throws IOException { + checkOpen(); + TraceScope scope = newPathTraceScope("getStoragePolicy", path); + try { + return namenode.getStoragePolicy(path); + } catch (RemoteException e) { + throw e.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + SafeModeException.class, + UnresolvedPathException.class); + } finally { + scope.close(); + } + } + + /** + * @return All the existing storage policies + */ + public BlockStoragePolicy[] getStoragePolicies() throws IOException { + checkOpen(); + TraceScope scope = tracer.newScope("getStoragePolicies"); + try { + return namenode.getStoragePolicies(); + } finally { + scope.close(); + } + } + + /** + * Rename file or directory. + * @see ClientProtocol#rename(String, String) + * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead. + */ + @Deprecated + public boolean rename(String src, String dst) throws IOException { + checkOpen(); + TraceScope scope = newSrcDstTraceScope("rename", src, dst); + try { + return namenode.rename(src, dst); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + NSQuotaExceededException.class, + DSQuotaExceededException.class, + QuotaByStorageTypeExceededException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class); + } finally { + scope.close(); + } + } + + /** + * Move blocks from src to trg and delete src + * See {@link ClientProtocol#concat}. + */ + public void concat(String trg, String [] srcs) throws IOException { + checkOpen(); + TraceScope scope = tracer.newScope("concat"); + try { + namenode.concat(trg, srcs); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class); + } finally { + scope.close(); + } + } + /** + * Rename file or directory. + * @see ClientProtocol#rename2(String, String, Options.Rename...) + */ + public void rename(String src, String dst, Options.Rename... options) + throws IOException { + checkOpen(); + TraceScope scope = newSrcDstTraceScope("rename2", src, dst); + try { + namenode.rename2(src, dst, options); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + DSQuotaExceededException.class, + QuotaByStorageTypeExceededException.class, + FileAlreadyExistsException.class, + FileNotFoundException.class, + ParentNotDirectoryException.class, + SafeModeException.class, + NSQuotaExceededException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class); + } finally { + scope.close(); + } + } + + /** + * Truncate a file to an indicated size + * See {@link ClientProtocol#truncate}. + */ + public boolean truncate(String src, long newLength) throws IOException { + checkOpen(); + if (newLength < 0) { + throw new HadoopIllegalArgumentException( + "Cannot truncate to a negative file size: " + newLength + "."); + } + TraceScope scope = newPathTraceScope("truncate", src); + try { + return namenode.truncate(src, newLength, clientName); + } catch (RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + UnresolvedPathException.class); + } finally { + scope.close(); + } + } + + /** + * Delete file or directory. + * See {@link ClientProtocol#delete(String, boolean)}. + */ + @Deprecated + public boolean delete(String src) throws IOException { + checkOpen(); + return delete(src, true); + } + + /** + * delete file or directory. + * delete contents of the directory if non empty and recursive + * set to true + * + * @see ClientProtocol#delete(String, boolean) + */ + public boolean delete(String src, boolean recursive) throws IOException { + checkOpen(); + TraceScope scope = newPathTraceScope("delete", src); + try { + return namenode.delete(src, recursive); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + SafeModeException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class); + } finally { + scope.close(); + } + } + + /** Implemented using getFileInfo(src) + */ + public boolean exists(String src) throws IOException { + checkOpen(); + return getFileInfo(src) != null; + } + + /** + * Get a partial listing of the indicated directory + * No block locations need to be fetched + */ + public DirectoryListing listPaths(String src, byte[] startAfter) + throws IOException { + return listPaths(src, startAfter, false); + } + + /** + * Get a partial listing of the indicated directory + * + * Recommend to use HdfsFileStatus.EMPTY_NAME as startAfter + * if the application wants to fetch a listing starting from + * the first entry in the directory + * + * @see ClientProtocol#getListing(String, byte[], boolean) + */ + public DirectoryListing listPaths(String src, byte[] startAfter, + boolean needLocation) throws IOException { + checkOpen(); + TraceScope scope = newPathTraceScope("listPaths", src); + try { + return namenode.getListing(src, startAfter, needLocation); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + UnresolvedPathException.class); + } finally { + scope.close(); + } + } + + /** + * Get the file info for a specific file or directory. + * @param src The string representation of the path to the file + * @return object containing information regarding the file + * or null if file not found + * + * @see ClientProtocol#getFileInfo(String) for description of exceptions + */ + public HdfsFileStatus getFileInfo(String src) throws IOException { + checkOpen(); + TraceScope scope = newPathTraceScope("getFileInfo", src); + try { + return namenode.getFileInfo(src); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + UnresolvedPathException.class); + } finally { + scope.close(); + } + } + + /** + * Close status of a file + * @return true if file is already closed + */ + public boolean isFileClosed(String src) throws IOException{ + checkOpen(); + TraceScope scope = newPathTraceScope("isFileClosed", src); + try { + return namenode.isFileClosed(src); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + UnresolvedPathException.class); + } finally { + scope.close(); + } + } + + /** + * Get the file info for a specific file or directory. If src + * refers to a symlink then the FileStatus of the link is returned. + * @param src path to a file or directory. + * + * For description of exceptions thrown + * @see ClientProtocol#getFileLinkInfo(String) + */ + public HdfsFileStatus getFileLinkInfo(String src) throws IOException { + checkOpen(); + TraceScope scope = newPathTraceScope("getFileLinkInfo", src); + try { + return namenode.getFileLinkInfo(src); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + UnresolvedPathException.class); + } finally { + scope.close(); + } + } + + @InterfaceAudience.Private + public void clearDataEncryptionKey() { + LOG.debug("Clearing encryption key"); + synchronized (this) { + encryptionKey = null; + } + } + + /** + * @return true if data sent between this client and DNs should be encrypted, + * false otherwise. + * @throws IOException in the event of error communicating with the NN + */ + boolean shouldEncryptData() throws IOException { + FsServerDefaults d = getServerDefaults(); + return d == null ? false : d.getEncryptDataTransfer(); + } + + @Override + public DataEncryptionKey newDataEncryptionKey() throws IOException { + if (shouldEncryptData()) { + synchronized (this) { + if (encryptionKey == null || + encryptionKey.expiryDate < Time.now()) { + LOG.debug("Getting new encryption token from NN"); + encryptionKey = namenode.getDataEncryptionKey(); + } + return encryptionKey; + } + } else { + return null; + } + } + + /** + * Get the checksum of the whole file of a range of the file. Note that the + * range always starts from the beginning of the file. + * @param src The file path + * @param length the length of the range, i.e., the range is [0, length] + * @return The checksum + * @see DistributedFileSystem#getFileChecksum(Path) + */ + public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) + throws IOException { + checkOpen(); + Preconditions.checkArgument(length >= 0); + //get block locations for the file range + LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, + length); + if (null == blockLocations) { + throw new FileNotFoundException("File does not exist: " + src); + } + if (blockLocations.isUnderConstruction()) { + throw new IOException("Fail to get checksum, since file " + src + + " is under construction."); + } + List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks(); + final DataOutputBuffer md5out = new DataOutputBuffer(); + int bytesPerCRC = -1; + DataChecksum.Type crcType = DataChecksum.Type.DEFAULT; + long crcPerBlock = 0; + boolean refetchBlocks = false; + int lastRetriedIndex = -1; + + // get block checksum for each block + long remaining = length; + if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) { + remaining = Math.min(length, blockLocations.getFileLength()); + } + for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) { + if (refetchBlocks) { // refetch to get fresh tokens + blockLocations = callGetBlockLocations(namenode, src, 0, length); + if (null == blockLocations) { + throw new FileNotFoundException("File does not exist: " + src); + } + if (blockLocations.isUnderConstruction()) { + throw new IOException("Fail to get checksum, since file " + src + + " is under construction."); + } + locatedblocks = blockLocations.getLocatedBlocks(); + refetchBlocks = false; + } + LocatedBlock lb = locatedblocks.get(i); + final ExtendedBlock block = lb.getBlock(); + if (remaining < block.getNumBytes()) { + block.setNumBytes(remaining); + } + remaining -= block.getNumBytes(); + final DatanodeInfo[] datanodes = lb.getLocations(); + + //try each datanode location of the block + final int timeout = 3000*datanodes.length + dfsClientConf.getSocketTimeout(); + boolean done = false; + for(int j = 0; !done && j < datanodes.length; j++) { + DataOutputStream out = null; + DataInputStream in = null; + + try { + //connect to a datanode + IOStreamPair pair = connectToDN(datanodes[j], timeout, lb); + out = new DataOutputStream(new BufferedOutputStream(pair.out, + smallBufferSize)); + in = new DataInputStream(pair.in); + + if (LOG.isDebugEnabled()) { + LOG.debug("write to " + datanodes[j] + ": " + + Op.BLOCK_CHECKSUM + ", block=" + block); + } + // get block MD5 + new Sender(out).blockChecksum(block, lb.getBlockToken()); + + final BlockOpResponseProto reply = + BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); + + String logInfo = "for block " + block + " from datanode " + datanodes[j]; + DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); + + OpBlockChecksumResponseProto checksumData = + reply.getChecksumResponse(); + + //read byte-per-checksum + final int bpc = checksumData.getBytesPerCrc(); + if (i == 0) { //first block + bytesPerCRC = bpc; + } + else if (bpc != bytesPerCRC) { + throw new IOException("Byte-per-checksum not matched: bpc=" + bpc + + " but bytesPerCRC=" + bytesPerCRC); + } + + //read crc-per-block + final long cpb = checksumData.getCrcPerBlock(); + if (locatedblocks.size() > 1 && i == 0) { + crcPerBlock = cpb; + } + + //read md5 + final MD5Hash md5 = new MD5Hash( + checksumData.getMd5().toByteArray()); + md5.write(md5out); + + // read crc-type + final DataChecksum.Type ct; + if (checksumData.hasCrcType()) { + ct = PBHelperClient.convert(checksumData + .getCrcType()); + } else { + LOG.debug("Retrieving checksum from an earlier-version DataNode: " + + "inferring checksum by reading first byte"); + ct = inferChecksumTypeByReading(lb, datanodes[j]); + } + + if (i == 0) { // first block + crcType = ct; + } else if (crcType != DataChecksum.Type.MIXED + && crcType != ct) { + // if crc types are mixed in a file + crcType = DataChecksum.Type.MIXED; + } + + done = true; + + if (LOG.isDebugEnabled()) { + if (i == 0) { + LOG.debug("set bytesPerCRC=" + bytesPerCRC + + ", crcPerBlock=" + crcPerBlock); + } + LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5); + } + } catch (InvalidBlockTokenException ibte) { + if (i > lastRetriedIndex) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " + + "for file " + src + " for block " + block + + " from datanode " + datanodes[j] + + ". Will retry the block once."); + } + lastRetriedIndex = i; + done = true; // actually it's not done; but we'll retry + i--; // repeat at i-th block + refetchBlocks = true; + break; + } + } catch (IOException ie) { + LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie); + } finally { + IOUtils.closeStream(in); + IOUtils.closeStream(out); + } + } + + if (!done) { + throw new IOException("Fail to get block MD5 for " + block); + } + } + + //compute file MD5 + final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); + switch (crcType) { + case CRC32: + return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC, + crcPerBlock, fileMD5); + case CRC32C: + return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC, + crcPerBlock, fileMD5); + default: + // If there is no block allocated for the file, + // return one with the magic entry that matches what previous + // hdfs versions return. + if (locatedblocks.size() == 0) { + return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5); + } + + // we should never get here since the validity was checked + // when getCrcType() was called above. + return null; + } + } + + /** + * Connect to the given datanode's datantrasfer port, and return + * the resulting IOStreamPair. This includes encryption wrapping, etc. + */ + private IOStreamPair connectToDN(DatanodeInfo dn, int timeout, + LocatedBlock lb) throws IOException { + boolean success = false; + Socket sock = null; + try { + sock = socketFactory.createSocket(); + String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname()); + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to datanode " + dnAddr); + } + NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); + sock.setSoTimeout(timeout); + + OutputStream unbufOut = NetUtils.getOutputStream(sock); + InputStream unbufIn = NetUtils.getInputStream(sock); + IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this, + lb.getBlockToken(), dn); + success = true; + return ret; + } finally { + if (!success) { + IOUtils.closeSocket(sock); + } + } + } + + /** + * Infer the checksum type for a replica by sending an OP_READ_BLOCK + * for the first byte of that replica. This is used for compatibility + * with older HDFS versions which did not include the checksum type in + * OpBlockChecksumResponseProto. + * + * @param lb the located block + * @param dn the connected datanode + * @return the inferred checksum type + * @throws IOException if an error occurs + */ + private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) + throws IOException { + IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb); + + try { + DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, + smallBufferSize)); + DataInputStream in = new DataInputStream(pair.in); + + new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, + 0, 1, true, CachingStrategy.newDefaultStrategy()); + final BlockOpResponseProto reply = + BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); + String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; + DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); + + return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); + } finally { + IOUtilsClient.cleanup(null, pair.in, pair.out); + } + } + + /** + * Set permissions to a file or directory. + * @param src path name. + * @param permission permission to set to + * + * @see ClientProtocol#setPermission(String, FsPermission) + */ + public void setPermission(String src, FsPermission permission) + throws IOException { + checkOpen(); + TraceScope scope = newPathTraceScope("setPermission", src); + try { + namenode.setPermission(src, permission); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + SafeModeException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class); + } finally { + scope.close(); + } + } + + /** + * Set file or directory owner. + * @param src path name. + * @param username user id. + * @param groupname user group. + * + * @see ClientProtocol#setOwner(String, String, String) + */ + public void setOwner(String src, String username, String groupname) + throws IOException { + checkOpen(); + TraceScope scope = newPathTraceScope("setOwner", src); + try { + namenode.setOwner(src, username, groupname); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + SafeModeException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class); + } finally { + scope.close(); + } + } + + private long[] callGetStats() throws IOException { + checkOpen(); + TraceScope scope = tracer.newScope("getStats"); + try { + return namenode.getStats(); + } finally { + scope.close(); + } + } + + /** + * @see ClientProtocol#getStats() + */ + public FsStatus getDiskStatus() throws IOException { + long rawNums[] = callGetStats(); + return new FsStatus(rawNums[0], rawNums[1], rawNums[2]); + } + + /** + * Returns count of blocks with no good replicas left. Normally should be + * zero. + * @throws IOException + */ + public long getMissingBlocksCount() throws IOException { + return callGetStats()[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX]; + } + + /** + * Returns count of blocks with replication factor 1 and have + * lost the only replica. + * @throws IOException + */ + public long getMissingReplOneBlocksCount() throws IOException { + return callGetStats()[ClientProtocol. + GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX]; + } + + /** + * Returns count of blocks with one of more replica missing. + * @throws IOException + */ + public long getUnderReplicatedBlocksCount() throws IOException { + return callGetStats()[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX]; + } + + /** + * Returns count of blocks with at least one replica marked corrupt. + * @throws IOException + */ + public long getCorruptBlocksCount() throws IOException { + return callGetStats()[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX]; + } + + /** + * @return a list in which each entry describes a corrupt file/block + * @throws IOException + */ + public CorruptFileBlocks listCorruptFileBlocks(String path, + String cookie) + throws IOException { + checkOpen(); + TraceScope scope = newPathTraceScope("listCorruptFileBlocks", path); + try { + return namenode.listCorruptFileBlocks(path, cookie); + } finally { + scope.close(); + } + } + + public DatanodeInfo[] datanodeReport(DatanodeReportType type) + throws IOException { + checkOpen(); + TraceScope scope = tracer.newScope("datanodeReport"); + try { + return namenode.getDatanodeReport(type); + } finally { + scope.close(); + } + } + + public DatanodeStorageReport[] getDatanodeStorageReport( + DatanodeReportType type) throws IOException { + checkOpen(); + TraceScope scope = + tracer.newScope("datanodeStorageReport"); + try { + return namenode.getDatanodeStorageReport(type); + } finally { + scope.close(); + } + } + + /** + * Enter, leave or get safe mode. + * + * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction,boolean) + */ + public boolean setSafeMode(SafeModeAction action) throws IOException { + checkOpen(); + return setSafeMode(action, false); + } + + /** + * Enter, leave or get safe mode. + * + * @param action + * One of SafeModeAction.GET, SafeModeAction.ENTER and + * SafeModeActiob.LEAVE + * @param isChecked + * If true, then check only active namenode's safemode status, else + * check first namenode's status. + * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean) + */ + public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{ + TraceScope scope = + tracer.newScope("setSafeMode"); + try { + return namenode.setSafeMode(action, isChecked); + } finally { + scope.close(); + } + } + + /** + * Create one snapshot. + * + * @param snapshotRoot The directory where the snapshot is to be taken + * @param snapshotName Name of the snapshot + * @return the snapshot path. + * @see ClientProtocol#createSnapshot(String, String) + */ + public String createSnapshot(String snapshotRoot, String snapshotName) + throws IOException { + checkOpen(); + TraceScope scope = tracer.newScope("createSnapshot"); + try { + return namenode.createSnapshot(snapshotRoot, snapshotName); + } catch(RemoteException re) { + throw re.unwrapRemoteException(); + } finally { + scope.close(); + } + } + + /** + * Delete a snapshot of a snapshottable directory. + * + * @param snapshotRoot The snapshottable directory that the + * to-be-deleted snapshot belongs to + * @param snapshotName The name of the to-be-deleted snapshot + * @throws IOException + * @see ClientProtocol#deleteSnapshot(String, String) + */ + public void deleteSnapshot(String snapshotRoot, String snapshotName) + throws IOException { + checkOpen(); + TraceScope scope = tracer.newScope("deleteSnapshot"); + try { + namenode.deleteSnapshot(snapshotRoot, snapshotName); + } catch(RemoteException re) { + throw re.unwrapRemoteException(); + } finally { + scope.close(); + } + } + + /** + * Rename a snapshot. + * @param snapshotDir The directory path where the snapshot was taken + * @param snapshotOldName Old name of the snapshot + * @param snapshotNewName New name of the snapshot + * @throws IOException + * @see ClientProtocol#renameSnapshot(String, String, String) + */ + public void renameSnapshot(String snapshotDir, String snapshotOldName, + String snapshotNewName) throws IOException { + checkOpen(); + TraceScope scope = tracer.newScope("renameSnapshot"); + try { + namenode.renameSnapshot(snapshotDir, snapshotOldName, snapshotNewName); + } catch(RemoteException re) { + throw re.unwrapRemoteException(); + } finally { + scope.close(); + } + } + + /** + * Get all the current snapshottable directories. + * @return All the c
<TRUNCATED>