Repository: hadoop Updated Branches: refs/heads/branch-2.7.2 109c7545d -> 82ec5dbb2
HDFS-9574. Reduce client failures during datanode restart. Contributed by Kihwal Lee. (cherry picked from commit b06e39de4fc4f9c35afb472eef0bba2adf91954f) Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/82ec5dbb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/82ec5dbb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/82ec5dbb Branch: refs/heads/branch-2.7.2 Commit: 82ec5dbb2505066da8a6ed008d943b5ada027b15 Parents: 109c754 Author: Kihwal Lee <kih...@apache.org> Authored: Thu Jan 14 09:55:48 2016 -0600 Committer: Kihwal Lee <kih...@apache.org> Committed: Thu Jan 14 09:55:48 2016 -0600 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../org/apache/hadoop/hdfs/DFSInputStream.java | 60 +++++++-- .../hadoop/hdfs/server/datanode/DNConf.java | 12 ++ .../hadoop/hdfs/server/datanode/DataNode.java | 11 +- .../server/datanode/DataNodeFaultInjector.java | 2 + .../hdfs/server/datanode/DataXceiver.java | 122 +++++++++++-------- .../src/main/resources/hdfs-default.xml | 10 ++ .../fsdataset/impl/TestDatanodeRestart.java | 73 +++++++++++ 9 files changed, 231 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/82ec5dbb/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d92cb5e..eadac75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -16,6 +16,8 @@ Release 2.7.2 - 2015-12-16 HDFS-9221. HdfsServerConstants#ReplicaState#getState should avoid calling values() since it creates a temporary array. (Staffan Friberg via yliu) + HDFS-9574. Reduce client failures during datanode restart (kihwal) + OPTIMIZATIONS HDFS-8722. Optimize datanode writes for small writes and flushes (kihwal) http://git-wip-us.apache.org/repos/asf/hadoop/blob/82ec5dbb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 3bb8a0a..c2a5296 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -492,6 +492,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version"; public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "2.1.0-beta"; public static final String DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY = "dfs.namenode.inode.attributes.provider.class"; + public static final String DFS_DATANODE_BP_READY_TIMEOUT_KEY = "dfs.datanode.bp-ready.timeout"; + public static final long DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT = 20; public static final String DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable"; public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/82ec5dbb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 9f7b15c..19dde1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -29,6 +29,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -71,10 +72,12 @@ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.IdentityHashStore; +import org.apache.hadoop.util.StopWatch; import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; @@ -345,13 +348,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, private long readBlockLength(LocatedBlock locatedblock) throws IOException { assert locatedblock != null : "LocatedBlock cannot be null"; int replicaNotFoundCount = locatedblock.getLocations().length; - - for(DatanodeInfo datanode : locatedblock.getLocations()) { + + final int timeout = dfsClient.getConf().socketTimeout; + LinkedList<DatanodeInfo> nodeList = new LinkedList<DatanodeInfo>( + Arrays.asList(locatedblock.getLocations())); + LinkedList<DatanodeInfo> retryList = new LinkedList<DatanodeInfo>(); + boolean isRetry = false; + StopWatch sw = new StopWatch(); + while (nodeList.size() > 0) { + DatanodeInfo datanode = nodeList.pop(); ClientDatanodeProtocol cdp = null; - try { cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, - dfsClient.getConfiguration(), dfsClient.getConf().socketTimeout, + dfsClient.getConfiguration(), timeout, dfsClient.getConf().connectToDnViaHostname, locatedblock); final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); @@ -359,13 +368,18 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, if (n >= 0) { return n; } - } - catch(IOException ioe) { - if (ioe instanceof RemoteException && - (((RemoteException) ioe).unwrapRemoteException() instanceof - ReplicaNotFoundException)) { - // special case : replica might not be on the DN, treat as 0 length - replicaNotFoundCount--; + } catch (IOException ioe) { + if (ioe instanceof RemoteException) { + if (((RemoteException) ioe).unwrapRemoteException() instanceof + ReplicaNotFoundException) { + // replica is not on the DN. We will treat it as 0 length + // if no one actually has a replica. + replicaNotFoundCount--; + } else if (((RemoteException) ioe).unwrapRemoteException() instanceof + RetriableException) { + // add to the list to be retried if necessary. + retryList.add(datanode); + } } if (DFSClient.LOG.isDebugEnabled()) { @@ -377,6 +391,30 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, RPC.stopProxy(cdp); } } + + // Ran out of nodes, but there are retriable nodes. + if (nodeList.size() == 0 && retryList.size() > 0) { + nodeList.addAll(retryList); + retryList.clear(); + isRetry = true; + } + + if (isRetry) { + // start the stop watch if not already running. + if (!sw.isRunning()) { + sw.start(); + } + try { + Thread.sleep(500); // delay between retries. + } catch (InterruptedException e) { + throw new IOException("Interrupted while getting the length."); + } + } + + // see if we ran out of retry time + if (sw.isRunning() && sw.now(TimeUnit.MILLISECONDS) > timeout) { + break; + } } // Namenode told us about these locations, but none know about the replica http://git-wip-us.apache.org/repos/asf/hadoop/blob/82ec5dbb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 67cd1ce..e59f23f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -50,6 +50,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -100,6 +102,8 @@ public class DNConf { final long maxLockedMemory; + private final long bpReadyTimeout; + public DNConf(Configuration conf) { this.conf = conf; socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, @@ -193,6 +197,10 @@ public class DNConf { this.restartReplicaExpiry = conf.getLong( DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY, DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L; + + this.bpReadyTimeout = conf.getLong( + DFS_DATANODE_BP_READY_TIMEOUT_KEY, + DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT); } // We get minimumNameNodeVersion via a method so it can be mocked out in tests. @@ -266,4 +274,8 @@ public class DNConf { public boolean getIgnoreSecurePortsForTesting() { return ignoreSecurePortsForTesting; } + + public long getBpReadyTimeout() { + return bpReadyTimeout; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/82ec5dbb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 238ffad..54445af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1429,6 +1429,7 @@ public class DataNode extends ReconfigurableBase @VisibleForTesting public DatanodeRegistration getDNRegistrationForBP(String bpid) throws IOException { + DataNodeFaultInjector.get().noRegistration(); BPOfferService bpos = blockPoolManager.get(bpid); if(bpos==null || bpos.bpRegistration==null) { throw new IOException("cannot find BPOfferService for bpid="+bpid); @@ -1556,7 +1557,6 @@ public class DataNode extends ReconfigurableBase throw new ShortCircuitFdsUnsupportedException( fileDescriptorPassingDisabledReason); } - checkBlockToken(blk, token, BlockTokenSecretManager.AccessMode.READ); int blkVersion = CURRENT_BLOCK_FORMAT_VERSION; if (maxVersion < blkVersion) { throw new ShortCircuitFdsVersionException("Your client is too old " + @@ -2798,6 +2798,15 @@ public class DataNode extends ReconfigurableBase } private void checkReadAccess(final ExtendedBlock block) throws IOException { + // Make sure this node has registered for the block pool. + try { + getDNRegistrationForBP(block.getBlockPoolId()); + } catch (IOException e) { + // if it has not registered with the NN, throw an exception back. + throw new org.apache.hadoop.ipc.RetriableException( + "Datanode not registered. Try again later."); + } + if (isBlockTokenEnabled) { Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser() .getTokenIdentifiers(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/82ec5dbb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 46ec3ae..0e38694 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -49,4 +49,6 @@ public class DataNodeFaultInjector { public boolean dropHeartbeatPacket() { return false; } + + public void noRegistration() throws IOException { } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/82ec5dbb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 01ff32d..f27c28b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -45,6 +45,7 @@ import java.net.SocketTimeoutException; import java.nio.channels.ClosedChannelException; import java.security.MessageDigest; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.StorageType; @@ -85,6 +86,7 @@ import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.StopWatch; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; @@ -296,6 +298,9 @@ class DataXceiver extends Receiver implements Runnable { SlotId slotId, int maxVersion, boolean supportsReceiptVerification) throws IOException { updateCurrentThreadName("Passing file descriptors for block " + blk); + DataOutputStream out = getBufferedOutputStream(); + checkAccess(out, true, blk, token, + Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenSecretManager.AccessMode.READ); BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder(); FileInputStream fis[] = null; SlotId registeredSlotId = null; @@ -324,9 +329,6 @@ class DataXceiver extends Receiver implements Runnable { } catch (ShortCircuitFdsUnsupportedException e) { bld.setStatus(ERROR_UNSUPPORTED); bld.setMessage(e.getMessage()); - } catch (InvalidToken e) { - bld.setStatus(ERROR_ACCESS_TOKEN); - bld.setMessage(e.getMessage()); } catch (IOException e) { bld.setStatus(ERROR); bld.setMessage(e.getMessage()); @@ -513,9 +515,9 @@ class DataXceiver extends Receiver implements Runnable { final CachingStrategy cachingStrategy) throws IOException { previousOpClientName = clientName; long read = 0; + updateCurrentThreadName("Sending block " + block); OutputStream baseStream = getOutputStream(); - DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); + DataOutputStream out = getBufferedOutputStream(); checkAccess(out, true, block, blockToken, Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ); @@ -531,7 +533,6 @@ class DataXceiver extends Receiver implements Runnable { : dnR + " Served block " + block + " to " + remoteAddress; - updateCurrentThreadName("Sending block " + block); try { try { blockSender = new BlockSender(block, blockOffset, length, @@ -625,6 +626,10 @@ class DataXceiver extends Receiver implements Runnable { final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW || stage == BlockConstructionStage.TRANSFER_FINALIZED; long size = 0; + // reply to upstream datanode or client + final DataOutputStream replyOut = getBufferedOutputStream(); + checkAccess(replyOut, isClient, block, blockToken, + Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE); // check single target for transfer-RBW/Finalized if (isTransfer && targets.length > 0) { throw new IOException(stage + " does not support multiple targets " @@ -655,14 +660,6 @@ class DataXceiver extends Receiver implements Runnable { LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " + localAddress); - // reply to upstream datanode or client - final DataOutputStream replyOut = new DataOutputStream( - new BufferedOutputStream( - getOutputStream(), - HdfsConstants.SMALL_BUFFER_SIZE)); - checkAccess(replyOut, isClient, block, blockToken, - Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE); - DataOutputStream mirrorOut = null; // stream to next target DataInputStream mirrorIn = null; // reply from next target Socket mirrorSock = null; // socket to next target @@ -858,13 +855,13 @@ class DataXceiver extends Receiver implements Runnable { final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes) throws IOException { - checkAccess(socketOut, true, blk, blockToken, - Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream( getOutputStream()); + checkAccess(out, true, blk, blockToken, + Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); try { datanode.transferReplicaForPipelineRecovery(blk, targets, targetStorageTypes, clientName); @@ -918,6 +915,7 @@ class DataXceiver extends Receiver implements Runnable { @Override public void blockChecksum(final ExtendedBlock block, final Token<BlockTokenIdentifier> blockToken) throws IOException { + updateCurrentThreadName("Getting checksum for block " + block); final DataOutputStream out = new DataOutputStream( getOutputStream()); checkAccess(out, true, block, blockToken, @@ -928,13 +926,11 @@ class DataXceiver extends Receiver implements Runnable { long visibleLength = datanode.data.getReplicaVisibleLength(block); boolean partialBlk = requestLength < visibleLength; - updateCurrentThreadName("Reading metadata for block " + block); final LengthInputStream metadataIn = datanode.data .getMetaDataInputStream(block); final DataInputStream checksumIn = new DataInputStream( new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); - updateCurrentThreadName("Getting checksum for block " + block); try { //read metadata file final BlockMetadataHeader header = BlockMetadataHeader @@ -982,21 +978,10 @@ class DataXceiver extends Receiver implements Runnable { public void copyBlock(final ExtendedBlock block, final Token<BlockTokenIdentifier> blockToken) throws IOException { updateCurrentThreadName("Copying block " + block); - // Read in the header - if (datanode.isBlockTokenEnabled) { - try { - datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block, - BlockTokenSecretManager.AccessMode.COPY); - } catch (InvalidToken e) { - LOG.warn("Invalid access token in request from " + remoteAddress - + " for OP_COPY_BLOCK for block " + block + " : " - + e.getLocalizedMessage()); - sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token"); - return; - } + DataOutputStream reply = getBufferedOutputStream(); + checkAccess(reply, true, block, blockToken, + Op.COPY_BLOCK, BlockTokenSecretManager.AccessMode.COPY); - } - if (datanode.data.getPinning(block)) { String msg = "Not able to copy block " + block.getBlockId() + " " + "to " + peer.getRemoteAddressString() + " because it's pinned "; @@ -1014,7 +999,6 @@ class DataXceiver extends Receiver implements Runnable { } BlockSender blockSender = null; - DataOutputStream reply = null; boolean isOpSuccess = true; try { @@ -1022,10 +1006,7 @@ class DataXceiver extends Receiver implements Runnable { blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, null, CachingStrategy.newDropBehind()); - // set up response stream OutputStream baseStream = getOutputStream(); - reply = new DataOutputStream(new BufferedOutputStream( - baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); // send status first writeSuccessWithChecksumInfo(blockSender, reply); @@ -1069,20 +1050,9 @@ class DataXceiver extends Receiver implements Runnable { final String delHint, final DatanodeInfo proxySource) throws IOException { updateCurrentThreadName("Replacing block " + block + " from " + delHint); - - /* read header */ - if (datanode.isBlockTokenEnabled) { - try { - datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block, - BlockTokenSecretManager.AccessMode.REPLACE); - } catch (InvalidToken e) { - LOG.warn("Invalid access token in request from " + remoteAddress - + " for OP_REPLACE_BLOCK for block " + block + " : " - + e.getLocalizedMessage()); - sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token"); - return; - } - } + DataOutputStream replyOut = new DataOutputStream(getOutputStream()); + checkAccess(replyOut, true, block, blockToken, + Op.REPLACE_BLOCK, BlockTokenSecretManager.AccessMode.REPLACE); if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start String msg = "Not able to receive block " + block.getBlockId() + @@ -1099,7 +1069,6 @@ class DataXceiver extends Receiver implements Runnable { String errMsg = null; BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; - DataOutputStream replyOut = new DataOutputStream(getOutputStream()); boolean IoeDuringCopyBlockOperation = false; try { // Move the block to different storage in the same datanode @@ -1211,6 +1180,16 @@ class DataXceiver extends Receiver implements Runnable { datanode.metrics.addReplaceBlockOp(elapsed()); } + /** + * Separated for testing. + * @return + */ + DataOutputStream getBufferedOutputStream() { + return new DataOutputStream( + new BufferedOutputStream(getOutputStream(), + HdfsConstants.SMALL_BUFFER_SIZE)); + } + private long elapsed() { return monotonicNow() - opStartTime; } @@ -1257,11 +1236,52 @@ class DataXceiver extends Receiver implements Runnable { datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort); } + /** + * Wait until the BP is registered, upto the configured amount of time. + * Throws an exception if times out, which should fail the client request. + * @param the requested block + */ + void checkAndWaitForBP(final ExtendedBlock block) + throws IOException { + String bpId = block.getBlockPoolId(); + + // The registration is only missing in relatively short time window. + // Optimistically perform this first. + try { + datanode.getDNRegistrationForBP(bpId); + return; + } catch (IOException ioe) { + // not registered + } + + // retry + long bpReadyTimeout = dnConf.getBpReadyTimeout(); + StopWatch sw = new StopWatch(); + sw.start(); + while (sw.now(TimeUnit.SECONDS) <= bpReadyTimeout) { + try { + datanode.getDNRegistrationForBP(bpId); + return; + } catch (IOException ioe) { + // not registered + } + // sleep before trying again + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + throw new IOException("Interrupted while serving request. Aborting."); + } + } + // failed to obtain registration. + throw new IOException("Not ready to serve the block pool, " + bpId + "."); + } + private void checkAccess(OutputStream out, final boolean reply, final ExtendedBlock blk, final Token<BlockTokenIdentifier> t, final Op op, final BlockTokenSecretManager.AccessMode mode) throws IOException { + checkAndWaitForBP(blk); if (datanode.isBlockTokenEnabled) { if (LOG.isDebugEnabled()) { LOG.debug("Checking block access token for block '" + blk.getBlockId() http://git-wip-us.apache.org/repos/asf/hadoop/blob/82ec5dbb/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 4199798..fad10cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2320,4 +2320,14 @@ <description>Whether pin blocks on favored DataNode.</description> </property> +<property> + <name>dfs.datanode.bp-ready.timeout</name> + <value>20</value> + <description> + The maximum wait time for datanode to be ready before failing the + received request. Setting this to 0 fails requests right away if the + datanode is not yet registered with the namenode. This wait time + reduces initial request failures after datanode restart. + </description> +</property> </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/82ec5dbb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java index a91baec..376c72d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; @@ -211,4 +212,76 @@ public class TestDatanodeRestart { } } } + + @Test + public void testWaitForRegistrationOnRestart() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY, 5); + conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000); + + // This makes the datanode appear registered to the NN, but it won't be + // able to get to the saved dn reg internally. + DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() { + @Override + public void noRegistration() throws IOException { + throw new IOException("no reg found for testing"); + } + }; + DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector.set(dnFaultInjector); + MiniDFSCluster cluster = null; + long start = 0; + Path file = new Path("/reg"); + try { + int numDNs = 1; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.waitActive(); + + start = System.currentTimeMillis(); + FileSystem fileSys = cluster.getFileSystem(); + try { + DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L); + // It is a bug if this does not fail. + throw new IOException("Did not fail!"); + } catch (org.apache.hadoop.ipc.RemoteException e) { + long elapsed = System.currentTimeMillis() - start; + // timers have at-least semantics, so it should be at least 5 seconds. + if (elapsed < 5000 || elapsed > 10000) { + throw new IOException(elapsed + " seconds passed.", e); + } + } + DataNodeFaultInjector.set(oldDnInjector); + // this should succeed now. + DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L); + + // turn it back to under-construction, so that the client calls + // getReplicaVisibleLength() rpc method against the datanode. + fileSys.append(file); + // back to simulating unregistered node. + DataNodeFaultInjector.set(dnFaultInjector); + byte[] buffer = new byte[8]; + start = System.currentTimeMillis(); + try { + fileSys.open(file).read(0L, buffer, 0, 1); + throw new IOException("Did not fail!"); + } catch (IOException e) { + long elapsed = System.currentTimeMillis() - start; + if (e.getMessage().contains("readBlockLength")) { + throw new IOException("Failed, but with unexpected exception:", e); + } + // timers have at-least semantics, so it should be at least 5 seconds. + if (elapsed < 5000 || elapsed > 10000) { + throw new IOException(elapsed + " seconds passed.", e); + } + } + DataNodeFaultInjector.set(oldDnInjector); + fileSys.open(file).read(0L, buffer, 0, 1); + } finally { + DataNodeFaultInjector.set(oldDnInjector); + if (cluster != null) { + cluster.shutdown(); + } + } + } + }