Repository: hadoop Updated Branches: refs/heads/branch-2.6 9cb288e9f -> 04b8a19f8
HDFS-9574. Reduce client failures during datanode restart. Contributed by Kihwal Lee. (cherry picked from commit b06e39de4fc4f9c35afb472eef0bba2adf91954f) Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/04b8a19f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/04b8a19f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/04b8a19f Branch: refs/heads/branch-2.6 Commit: 04b8a19f81ee616c315eec639642439b3a18ad9c Parents: 9cb288e Author: Kihwal Lee <kih...@apache.org> Authored: Fri Jan 8 12:26:05 2016 -0600 Committer: Kihwal Lee <kih...@apache.org> Committed: Fri Jan 8 12:26:05 2016 -0600 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../org/apache/hadoop/hdfs/DFSInputStream.java | 60 ++++++++-- .../hadoop/hdfs/server/datanode/DNConf.java | 12 ++ .../hadoop/hdfs/server/datanode/DataNode.java | 11 +- .../server/datanode/DataNodeFaultInjector.java | 2 + .../hdfs/server/datanode/DataXceiver.java | 120 +++++++++++-------- .../src/main/resources/hdfs-default.xml | 10 ++ .../fsdataset/impl/TestDatanodeRestart.java | 73 +++++++++++ 9 files changed, 230 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/04b8a19f/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f760d36..3ddcfab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -20,6 +20,8 @@ Release 2.6.4 - UNRELEASED HDFS-6945. BlockManager should remove a block from excessReplicateMap and decrement ExcessBlocks metric when the block is removed. (aajisaka) + HDFS-9574. Reduce client failures during datanode restart (kihwal) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/04b8a19f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 85b740e..3f26105 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -470,6 +470,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_IPC_DEFAULT_PORT; public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY = "dfs.datanode.min.supported.namenode.version"; public static final String DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT = "2.1.0-beta"; + public static final String DFS_DATANODE_BP_READY_TIMEOUT_KEY = "dfs.datanode.bp-ready.timeout"; + public static final long DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT = 20; public static final String DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable"; public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/04b8a19f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index db06d3b..506b485 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -29,6 +29,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -69,10 +70,12 @@ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.IdentityHashStore; +import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; @@ -314,13 +317,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, private long readBlockLength(LocatedBlock locatedblock) throws IOException { assert locatedblock != null : "LocatedBlock cannot be null"; int replicaNotFoundCount = locatedblock.getLocations().length; - - for(DatanodeInfo datanode : locatedblock.getLocations()) { + + final int timeout = dfsClient.getConf().socketTimeout; + LinkedList<DatanodeInfo> nodeList = new LinkedList<DatanodeInfo>( + Arrays.asList(locatedblock.getLocations())); + LinkedList<DatanodeInfo> retryList = new LinkedList<DatanodeInfo>(); + boolean isRetry = false; + long startTime = 0; + while (nodeList.size() > 0) { + DatanodeInfo datanode = nodeList.pop(); ClientDatanodeProtocol cdp = null; - try { cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, - dfsClient.getConfiguration(), dfsClient.getConf().socketTimeout, + dfsClient.getConfiguration(), timeout, dfsClient.getConf().connectToDnViaHostname, locatedblock); final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); @@ -328,13 +337,18 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, if (n >= 0) { return n; } - } - catch(IOException ioe) { - if (ioe instanceof RemoteException && - (((RemoteException) ioe).unwrapRemoteException() instanceof - ReplicaNotFoundException)) { - // special case : replica might not be on the DN, treat as 0 length - replicaNotFoundCount--; + } catch (IOException ioe) { + if (ioe instanceof RemoteException) { + if (((RemoteException) ioe).unwrapRemoteException() instanceof + ReplicaNotFoundException) { + // replica is not on the DN. We will treat it as 0 length + // if no one actually has a replica. + replicaNotFoundCount--; + } else if (((RemoteException) ioe).unwrapRemoteException() instanceof + RetriableException) { + // add to the list to be retried if necessary. + retryList.add(datanode); + } } if (DFSClient.LOG.isDebugEnabled()) { @@ -346,6 +360,30 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, RPC.stopProxy(cdp); } } + + // Ran out of nodes, but there are retriable nodes. + if (nodeList.size() == 0 && retryList.size() > 0) { + nodeList.addAll(retryList); + retryList.clear(); + isRetry = true; + } + + if (isRetry) { + // start tracking the time + if (startTime == 0) { + startTime = Time.monotonicNow(); + } + try { + Thread.sleep(500); // delay between retries. + } catch (InterruptedException e) { + throw new IOException("Interrupted while getting the length."); + } + } + + // see if we ran out of retry time + if (startTime > 0 && (Time.monotonicNow() - startTime > timeout)) { + break; + } } // Namenode told us about these locations, but none know about the replica http://git-wip-us.apache.org/repos/asf/hadoop/blob/04b8a19f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 67cd1ce..e59f23f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -50,6 +50,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -100,6 +102,8 @@ public class DNConf { final long maxLockedMemory; + private final long bpReadyTimeout; + public DNConf(Configuration conf) { this.conf = conf; socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, @@ -193,6 +197,10 @@ public class DNConf { this.restartReplicaExpiry = conf.getLong( DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY, DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L; + + this.bpReadyTimeout = conf.getLong( + DFS_DATANODE_BP_READY_TIMEOUT_KEY, + DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT); } // We get minimumNameNodeVersion via a method so it can be mocked out in tests. @@ -266,4 +274,8 @@ public class DNConf { public boolean getIgnoreSecurePortsForTesting() { return ignoreSecurePortsForTesting; } + + public long getBpReadyTimeout() { + return bpReadyTimeout; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/04b8a19f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c46cd8b..a10f2ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1409,6 +1409,7 @@ public class DataNode extends ReconfigurableBase @VisibleForTesting public DatanodeRegistration getDNRegistrationForBP(String bpid) throws IOException { + DataNodeFaultInjector.get().noRegistration(); BPOfferService bpos = blockPoolManager.get(bpid); if(bpos==null || bpos.bpRegistration==null) { throw new IOException("cannot find BPOfferService for bpid="+bpid); @@ -1536,7 +1537,6 @@ public class DataNode extends ReconfigurableBase throw new ShortCircuitFdsUnsupportedException( fileDescriptorPassingDisabledReason); } - checkBlockToken(blk, token, BlockTokenSecretManager.AccessMode.READ); int blkVersion = CURRENT_BLOCK_FORMAT_VERSION; if (maxVersion < blkVersion) { throw new ShortCircuitFdsVersionException("Your client is too old " + @@ -2744,6 +2744,15 @@ public class DataNode extends ReconfigurableBase } private void checkReadAccess(final ExtendedBlock block) throws IOException { + // Make sure this node has registered for the block pool. + try { + getDNRegistrationForBP(block.getBlockPoolId()); + } catch (IOException e) { + // if it has not registered with the NN, throw an exception back. + throw new org.apache.hadoop.ipc.RetriableException( + "Datanode not registered. Try again later."); + } + if (isBlockTokenEnabled) { Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser() .getTokenIdentifiers(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/04b8a19f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index a2d127f..fb81763 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -39,4 +39,6 @@ public class DataNodeFaultInjector { public void getHdfsBlocksMetadata() {} public void sendShortCircuitShmResponse() throws IOException {} + + public void noRegistration() throws IOException { } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/04b8a19f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index f7efff0..d7551cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -44,6 +44,7 @@ import java.net.SocketException; import java.nio.channels.ClosedChannelException; import java.security.MessageDigest; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.hadoop.hdfs.ExtendedBlockId; @@ -84,6 +85,7 @@ import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.Time; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; @@ -273,6 +275,9 @@ class DataXceiver extends Receiver implements Runnable { SlotId slotId, int maxVersion, boolean supportsReceiptVerification) throws IOException { updateCurrentThreadName("Passing file descriptors for block " + blk); + DataOutputStream out = getBufferedOutputStream(); + checkAccess(out, true, blk, token, + Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenSecretManager.AccessMode.READ); BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder(); FileInputStream fis[] = null; SlotId registeredSlotId = null; @@ -301,9 +306,6 @@ class DataXceiver extends Receiver implements Runnable { } catch (ShortCircuitFdsUnsupportedException e) { bld.setStatus(ERROR_UNSUPPORTED); bld.setMessage(e.getMessage()); - } catch (InvalidToken e) { - bld.setStatus(ERROR_ACCESS_TOKEN); - bld.setMessage(e.getMessage()); } catch (IOException e) { bld.setStatus(ERROR); bld.setMessage(e.getMessage()); @@ -489,10 +491,9 @@ class DataXceiver extends Receiver implements Runnable { final boolean sendChecksum, final CachingStrategy cachingStrategy) throws IOException { previousOpClientName = clientName; - + updateCurrentThreadName("Sending block " + block); OutputStream baseStream = getOutputStream(); - DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); + DataOutputStream out = getBufferedOutputStream(); checkAccess(out, true, block, blockToken, Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ); @@ -508,7 +509,6 @@ class DataXceiver extends Receiver implements Runnable { : dnR + " Served block " + block + " to " + remoteAddress; - updateCurrentThreadName("Sending block " + block); try { try { blockSender = new BlockSender(block, blockOffset, length, @@ -594,6 +594,10 @@ class DataXceiver extends Receiver implements Runnable { final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW || stage == BlockConstructionStage.TRANSFER_FINALIZED; + // reply to upstream datanode or client + final DataOutputStream replyOut = getBufferedOutputStream(); + checkAccess(replyOut, isClient, block, blockToken, + Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE); // check single target for transfer-RBW/Finalized if (isTransfer && targets.length > 0) { throw new IOException(stage + " does not support multiple targets " @@ -624,14 +628,6 @@ class DataXceiver extends Receiver implements Runnable { LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " + localAddress); - // reply to upstream datanode or client - final DataOutputStream replyOut = new DataOutputStream( - new BufferedOutputStream( - getOutputStream(), - HdfsConstants.SMALL_BUFFER_SIZE)); - checkAccess(replyOut, isClient, block, blockToken, - Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE); - DataOutputStream mirrorOut = null; // stream to next target DataInputStream mirrorIn = null; // reply from next target Socket mirrorSock = null; // socket to next target @@ -812,13 +808,13 @@ class DataXceiver extends Receiver implements Runnable { final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes) throws IOException { - checkAccess(socketOut, true, blk, blockToken, - Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream( getOutputStream()); + checkAccess(out, true, blk, blockToken, + Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); try { datanode.transferReplicaForPipelineRecovery(blk, targets, targetStorageTypes, clientName); @@ -868,6 +864,7 @@ class DataXceiver extends Receiver implements Runnable { @Override public void blockChecksum(final ExtendedBlock block, final Token<BlockTokenIdentifier> blockToken) throws IOException { + updateCurrentThreadName("Getting checksum for block " + block); final DataOutputStream out = new DataOutputStream( getOutputStream()); checkAccess(out, true, block, blockToken, @@ -878,13 +875,11 @@ class DataXceiver extends Receiver implements Runnable { long visibleLength = datanode.data.getReplicaVisibleLength(block); boolean partialBlk = requestLength < visibleLength; - updateCurrentThreadName("Reading metadata for block " + block); final LengthInputStream metadataIn = datanode.data .getMetaDataInputStream(block); final DataInputStream checksumIn = new DataInputStream( new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); - updateCurrentThreadName("Getting checksum for block " + block); try { //read metadata file final BlockMetadataHeader header = BlockMetadataHeader @@ -928,20 +923,9 @@ class DataXceiver extends Receiver implements Runnable { public void copyBlock(final ExtendedBlock block, final Token<BlockTokenIdentifier> blockToken) throws IOException { updateCurrentThreadName("Copying block " + block); - // Read in the header - if (datanode.isBlockTokenEnabled) { - try { - datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block, - BlockTokenSecretManager.AccessMode.COPY); - } catch (InvalidToken e) { - LOG.warn("Invalid access token in request from " + remoteAddress - + " for OP_COPY_BLOCK for block " + block + " : " - + e.getLocalizedMessage()); - sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token"); - return; - } - - } + DataOutputStream reply = getBufferedOutputStream(); + checkAccess(reply, true, block, blockToken, + Op.COPY_BLOCK, BlockTokenSecretManager.AccessMode.COPY); if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start String msg = "Not able to copy block " + block.getBlockId() + " " + @@ -953,7 +937,6 @@ class DataXceiver extends Receiver implements Runnable { } BlockSender blockSender = null; - DataOutputStream reply = null; boolean isOpSuccess = true; try { @@ -961,10 +944,7 @@ class DataXceiver extends Receiver implements Runnable { blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, null, CachingStrategy.newDropBehind()); - // set up response stream OutputStream baseStream = getOutputStream(); - reply = new DataOutputStream(new BufferedOutputStream( - baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); // send status first writeSuccessWithChecksumInfo(blockSender, reply); @@ -1004,21 +984,12 @@ class DataXceiver extends Receiver implements Runnable { final String delHint, final DatanodeInfo proxySource) throws IOException { updateCurrentThreadName("Replacing block " + block + " from " + delHint); + DataOutputStream replyOut = new DataOutputStream(getOutputStream()); + checkAccess(replyOut, true, block, blockToken, + Op.REPLACE_BLOCK, BlockTokenSecretManager.AccessMode.REPLACE); /* read header */ block.setNumBytes(dataXceiverServer.estimateBlockSize); - if (datanode.isBlockTokenEnabled) { - try { - datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block, - BlockTokenSecretManager.AccessMode.REPLACE); - } catch (InvalidToken e) { - LOG.warn("Invalid access token in request from " + remoteAddress - + " for OP_REPLACE_BLOCK for block " + block + " : " - + e.getLocalizedMessage()); - sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token"); - return; - } - } if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start String msg = "Not able to receive block " + block.getBlockId() + @@ -1035,7 +1006,6 @@ class DataXceiver extends Receiver implements Runnable { String errMsg = null; BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; - DataOutputStream replyOut = new DataOutputStream(getOutputStream()); try { // get the output stream to the proxy final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname); @@ -1135,6 +1105,16 @@ class DataXceiver extends Receiver implements Runnable { datanode.metrics.addReplaceBlockOp(elapsed()); } + /** + * Separated for testing. + * @return + */ + DataOutputStream getBufferedOutputStream() { + return new DataOutputStream( + new BufferedOutputStream(getOutputStream(), + HdfsConstants.SMALL_BUFFER_SIZE)); + } + private long elapsed() { return now() - opStartTime; } @@ -1178,11 +1158,51 @@ class DataXceiver extends Receiver implements Runnable { } + /** + * Wait until the BP is registered, upto the configured amount of time. + * Throws an exception if times out, which should fail the client request. + * @param the requested block + */ + void checkAndWaitForBP(final ExtendedBlock block) + throws IOException { + String bpId = block.getBlockPoolId(); + + // The registration is only missing in relatively short time window. + // Optimistically perform this first. + try { + datanode.getDNRegistrationForBP(bpId); + return; + } catch (IOException ioe) { + // not registered + } + + // retry + long bpReadyTimeout = dnConf.getBpReadyTimeout() * 1000; + long startTime = Time.monotonicNow(); + while (Time.monotonicNow() - startTime <= bpReadyTimeout) { + try { + datanode.getDNRegistrationForBP(bpId); + return; + } catch (IOException ioe) { + // not registered + } + // sleep before trying again + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + throw new IOException("Interrupted while serving request. Aborting."); + } + } + // failed to obtain registration. + throw new IOException("Not ready to serve the block pool, " + bpId + "."); + } + private void checkAccess(OutputStream out, final boolean reply, final ExtendedBlock blk, final Token<BlockTokenIdentifier> t, final Op op, final BlockTokenSecretManager.AccessMode mode) throws IOException { + checkAndWaitForBP(blk); if (datanode.isBlockTokenEnabled) { if (LOG.isDebugEnabled()) { LOG.debug("Checking block access token for block '" + blk.getBlockId() http://git-wip-us.apache.org/repos/asf/hadoop/blob/04b8a19f/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 1678ee4..925ead8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2224,4 +2224,14 @@ </description> </property> +<property> + <name>dfs.datanode.bp-ready.timeout</name> + <value>20</value> + <description> + The maximum wait time for datanode to be ready before failing the + received request. Setting this to 0 fails requests right away if the + datanode is not yet registered with the namenode. This wait time + reduces initial request failures after datanode restart. + </description> +</property> </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/04b8a19f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java index a91baec..f9e5f87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; @@ -211,4 +212,76 @@ public class TestDatanodeRestart { } } } + + @Test + public void testWaitForRegistrationOnRestart() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY, 5); + conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000); + + // This makes the datanode appear registered to the NN, but it won't be + // able to get to the saved dn reg internally. + DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() { + @Override + public void noRegistration() throws IOException { + throw new IOException("no reg found for testing"); + } + }; + DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector.instance = dnFaultInjector; + MiniDFSCluster cluster = null; + long start = 0; + Path file = new Path("/reg"); + try { + int numDNs = 1; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.waitActive(); + + start = System.currentTimeMillis(); + FileSystem fileSys = cluster.getFileSystem(); + try { + DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L); + // It is a bug if this does not fail. + throw new IOException("Did not fail!"); + } catch (org.apache.hadoop.ipc.RemoteException e) { + long elapsed = System.currentTimeMillis() - start; + // timers have at-least semantics, so it should be at least 5 seconds. + if (elapsed < 5000 || elapsed > 10000) { + throw new IOException(elapsed + " seconds passed.", e); + } + } + DataNodeFaultInjector.instance = oldDnInjector; + // this should succeed now. + DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L); + + // turn it back to under-construction, so that the client calls + // getReplicaVisibleLength() rpc method against the datanode. + fileSys.append(file); + // back to simulating unregistered node. + DataNodeFaultInjector.instance = dnFaultInjector; + byte[] buffer = new byte[8]; + start = System.currentTimeMillis(); + try { + fileSys.open(file).read(0L, buffer, 0, 1); + throw new IOException("Did not fail!"); + } catch (IOException e) { + long elapsed = System.currentTimeMillis() - start; + if (e.getMessage().contains("readBlockLength")) { + throw new IOException("Failed, but with unexpected exception:", e); + } + // timers have at-least semantics, so it should be at least 5 seconds. + if (elapsed < 5000 || elapsed > 10000) { + throw new IOException(elapsed + " seconds passed.", e); + } + } + DataNodeFaultInjector.instance = oldDnInjector; + fileSys.open(file).read(0L, buffer, 0, 1); + } finally { + DataNodeFaultInjector.instance = oldDnInjector; + if (cluster != null) { + cluster.shutdown(); + } + } + } + }