Author: shv
Date: Sat Feb  2 02:07:26 2013
New Revision: 1441681

URL: http://svn.apache.org/viewvc?rev=1441681&view=rev
Log:
HDFS-4452. getAdditionalBlock() can create multiple blocks if the client times 
out and retries. Contributed by Konstantin Shvachko.

Added:
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
   (with props)
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1441681&r1=1441680&r2=1441681&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Feb  2 
02:07:26 2013
@@ -744,6 +744,9 @@ Release 2.0.3-alpha - Unreleased 
     HDFS-4428. FsDatasetImpl should disclose what the error is when a rename
     fails. (Colin Patrick McCabe via atm)
 
+    HDFS-4452. getAdditionalBlock() can create multiple blocks if the client
+    times out and retries. (shv)
+
   BREAKDOWN OF HDFS-3077 SUBTASKS
 
     HDFS-3077. Quorum-based protocol for reading and writing edit logs.

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1441681&r1=1441680&r2=1441681&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 Sat Feb  2 02:07:26 2013
@@ -2200,12 +2200,9 @@ public class FSNamesystem implements Nam
       throws LeaseExpiredException, NotReplicatedYetException,
       QuotaExceededException, SafeModeException, UnresolvedLinkException,
       IOException {
-    checkBlock(previous);
-    Block previousBlock = ExtendedBlock.getLocalBlock(previous);
-    long fileLength, blockSize;
+    long blockSize;
     int replication;
     DatanodeDescriptor clientNode = null;
-    Block newBlock = null;
 
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug(
@@ -2213,119 +2210,61 @@ public class FSNamesystem implements Nam
           +src+" for "+clientName);
     }
 
-    writeLock();
+    // Part I. Analyze the state of the file with respect to the input data.
+    readLock();
     try {
-      checkOperation(OperationCategory.WRITE);
-
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot add block to " + src, safeMode);
-      }
-
-      // have we exceeded the configured limit of fs objects.
-      checkFsObjectLimit();
-
-      INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
-      BlockInfo lastBlockInFile = pendingFile.getLastBlock();
-      if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
-        // The block that the client claims is the current last block
-        // doesn't match up with what we think is the last block. There are
-        // three possibilities:
-        // 1) This is the first block allocation of an append() pipeline
-        //    which started appending exactly at a block boundary.
-        //    In this case, the client isn't passed the previous block,
-        //    so it makes the allocateBlock() call with previous=null.
-        //    We can distinguish this since the last block of the file
-        //    will be exactly a full block.
-        // 2) This is a retry from a client that missed the response of a
-        //    prior getAdditionalBlock() call, perhaps because of a network
-        //    timeout, or because of an HA failover. In that case, we know
-        //    by the fact that the client is re-issuing the RPC that it
-        //    never began to write to the old block. Hence it is safe to
-        //    abandon it and allocate a new one.
-        // 3) This is an entirely bogus request/bug -- we should error out
-        //    rather than potentially appending a new block with an empty
-        //    one in the middle, etc
-
-        BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
-        if (previous == null &&
-            lastBlockInFile != null &&
-            lastBlockInFile.getNumBytes() == 
pendingFile.getPreferredBlockSize() &&
-            lastBlockInFile.isComplete()) {
-          // Case 1
-          if (NameNode.stateChangeLog.isDebugEnabled()) {
-             NameNode.stateChangeLog.debug(
-                 "BLOCK* NameSystem.allocateBlock: handling block allocation" +
-                 " writing to a file with a complete previous block: src=" +
-                 src + " lastBlock=" + lastBlockInFile);
-          }
-        } else if (Block.matchingIdAndGenStamp(penultimateBlock, 
previousBlock)) {
-          // Case 2
-          if (lastBlockInFile.getNumBytes() != 0) {
-            throw new IOException(
-                "Request looked like a retry to allocate block " +
-                lastBlockInFile + " but it already contains " +
-                lastBlockInFile.getNumBytes() + " bytes");
-          }
-
-          // The retry case ("b" above) -- abandon the old block.
-          NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
-              "caught retry for allocation of a new block in " +
-              src + ". Abandoning old block " + lastBlockInFile);
-          dir.removeBlock(src, pendingFile, lastBlockInFile);
-          dir.persistBlocks(src, pendingFile);
-        } else {
-          
-          throw new IOException("Cannot allocate block in " + src + ": " +
-              "passed 'previous' block " + previous + " does not match actual 
" +
-              "last block in file " + lastBlockInFile);
-        }
+      LocatedBlock[] onRetryBlock = new LocatedBlock[1];
+      final INode[] inodes = analyzeFileState(
+          src, clientName, previous, onRetryBlock).getINodes();
+      final INodeFileUnderConstruction pendingFile =
+          (INodeFileUnderConstruction) inodes[inodes.length - 1];
+
+      if(onRetryBlock[0] != null) {
+        // This is a retry. Just return the last block.
+        return onRetryBlock[0];
       }
 
-      // commit the last block and complete it if it has minimum replicas
-      commitOrCompleteLastBlock(pendingFile, previousBlock);
-
-      //
-      // If we fail this, bad things happen!
-      //
-      if (!checkFileProgress(pendingFile, false)) {
-        throw new NotReplicatedYetException("Not replicated yet:" + src);
-      }
-      fileLength = pendingFile.computeContentSummary().getLength();
       blockSize = pendingFile.getPreferredBlockSize();
       clientNode = pendingFile.getClientNode();
       replication = pendingFile.getBlockReplication();
     } finally {
-      writeUnlock();
+      readUnlock();
     }
 
     // choose targets for the new block to be allocated.
-    final DatanodeDescriptor targets[] = blockManager.chooseTarget(
+    final DatanodeDescriptor targets[] = getBlockManager().chooseTarget(
         src, replication, clientNode, excludedNodes, blockSize);
 
-    // Allocate a new block and record it in the INode. 
+    // Part II.
+    // Allocate a new block, add it to the INode and the BlocksMap. 
+    Block newBlock = null;
+    long offset;
     writeLock();
     try {
-      checkOperation(OperationCategory.WRITE);
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot add block to " + src, safeMode);
+      // Run the full analysis again, since things could have changed
+      // while chooseTarget() was executing.
+      LocatedBlock[] onRetryBlock = new LocatedBlock[1];
+      INodesInPath inodesInPath =
+          analyzeFileState(src, clientName, previous, onRetryBlock);
+      INode[] inodes = inodesInPath.getINodes();
+      final INodeFileUnderConstruction pendingFile =
+          (INodeFileUnderConstruction) inodes[inodes.length - 1];
+
+      if(onRetryBlock[0] != null) {
+        // This is a retry. Just return the last block.
+        return onRetryBlock[0];
       }
 
-      final INodesInPath inodesInPath = dir.rootDir.getExistingPathINodes(src, 
true);
-      final INode[] inodes = inodesInPath.getINodes();
-      final INodeFileUnderConstruction pendingFile
-          = checkLease(src, clientName, inodes[inodes.length - 1]);
-                                                           
-      if (!checkFileProgress(pendingFile, false)) {
-        throw new NotReplicatedYetException("Not replicated yet:" + src);
-      }
+      // commit the last block and complete it if it has minimum replicas
+      commitOrCompleteLastBlock(pendingFile,
+                                ExtendedBlock.getLocalBlock(previous));
+
+      // allocate new block, record block locations in INode.
+      newBlock = createNewBlock();
+      saveAllocatedBlock(src, inodesInPath, newBlock, targets);
 
-      // allocate new block record block locations in INode.
-      newBlock = allocateBlock(src, inodesInPath, targets);
-      
-      for (DatanodeDescriptor dn : targets) {
-        dn.incBlocksScheduled();
-      }
       dir.persistBlocks(src, pendingFile);
+      offset = pendingFile.computeFileSize(true);
     } finally {
       writeUnlock();
     }
@@ -2333,10 +2272,114 @@ public class FSNamesystem implements Nam
       getEditLog().logSync();
     }
 
-    // Create next block
-    LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, 
fileLength);
-    blockManager.setBlockToken(b, BlockTokenSecretManager.AccessMode.WRITE);
-    return b;
+    // Return located block
+    return makeLocatedBlock(newBlock, targets, offset);
+  }
+
+  INodesInPath analyzeFileState(String src,
+                                String clientName,
+                                ExtendedBlock previous,
+                                LocatedBlock[] onRetryBlock)
+          throws IOException  {
+    assert hasReadOrWriteLock();
+
+    checkBlock(previous);
+    onRetryBlock[0] = null;
+    checkOperation(OperationCategory.WRITE);
+    if (isInSafeMode()) {
+      throw new SafeModeException("Cannot add block to " + src, safeMode);
+    }
+
+    // have we exceeded the configured limit of fs objects.
+    checkFsObjectLimit();
+
+    Block previousBlock = ExtendedBlock.getLocalBlock(previous);
+    final INodesInPath inodesInPath =
+        dir.rootDir.getExistingPathINodes(src, true);
+    final INode[] inodes = inodesInPath.getINodes();
+    final INodeFileUnderConstruction pendingFile
+        = checkLease(src, clientName, inodes[inodes.length - 1]);
+    BlockInfo lastBlockInFile = pendingFile.getLastBlock();
+    if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
+      // The block that the client claims is the current last block
+      // doesn't match up with what we think is the last block. There are
+      // four possibilities:
+      // 1) This is the first block allocation of an append() pipeline
+      //    which started appending exactly at a block boundary.
+      //    In this case, the client isn't passed the previous block,
+      //    so it makes the allocateBlock() call with previous=null.
+      //    We can distinguish this since the last block of the file
+      //    will be exactly a full block.
+      // 2) This is a retry from a client that missed the response of a
+      //    prior getAdditionalBlock() call, perhaps because of a network
+      //    timeout, or because of an HA failover. In that case, we know
+      //    by the fact that the client is re-issuing the RPC that it
+      //    never began to write to the old block. Hence it is safe to
+      //    to return the existing block.
+      // 3) This is an entirely bogus request/bug -- we should error out
+      //    rather than potentially appending a new block with an empty
+      //    one in the middle, etc
+      // 4) This is a retry from a client that timed out while
+      //    the prior getAdditionalBlock() is still being processed,
+      //    currently working on chooseTarget(). 
+      //    There are no means to distinguish between the first and 
+      //    the second attempts in Part I, because the first one hasn't
+      //    changed the namesystem state yet.
+      //    We run this analysis again in Part II where case 4 is impossible.
+
+      BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
+      if (previous == null &&
+          lastBlockInFile != null &&
+          lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() 
&&
+          lastBlockInFile.isComplete()) {
+        // Case 1
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+           NameNode.stateChangeLog.debug(
+               "BLOCK* NameSystem.allocateBlock: handling block allocation" +
+               " writing to a file with a complete previous block: src=" +
+               src + " lastBlock=" + lastBlockInFile);
+        }
+      } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) 
{
+        if (lastBlockInFile.getNumBytes() != 0) {
+          throw new IOException(
+              "Request looked like a retry to allocate block " +
+              lastBlockInFile + " but it already contains " +
+              lastBlockInFile.getNumBytes() + " bytes");
+        }
+
+        // Case 2
+        // Return the last block.
+        NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
+            "caught retry for allocation of a new block in " +
+            src + ". Returning previously allocated block " + lastBlockInFile);
+        long offset = pendingFile.computeFileSize(true);
+        onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
+            
((BlockInfoUnderConstruction)lastBlockInFile).getExpectedLocations(),
+            offset);
+        return inodesInPath;
+      } else {
+        // Case 3
+        throw new IOException("Cannot allocate block in " + src + ": " +
+            "passed 'previous' block " + previous + " does not match actual " +
+            "last block in file " + lastBlockInFile);
+      }
+    }
+
+    // Check if the penultimate block is minimally replicated
+    if (!checkFileProgress(pendingFile, false)) {
+      throw new NotReplicatedYetException("Not replicated yet: " + src);
+    }
+    return inodesInPath;
+  }
+
+  LocatedBlock makeLocatedBlock(Block blk,
+                                        DatanodeInfo[] locs,
+                                        long offset) throws IOException {
+    LocatedBlock lBlk = new LocatedBlock(
+        getExtendedBlock(blk), locs, offset);
+    getBlockManager().setBlockToken(
+        lBlk, BlockTokenSecretManager.AccessMode.WRITE);
+    return lBlk;
   }
 
   /** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, 
DatanodeInfo[], DatanodeInfo[], int, String) */
@@ -2528,22 +2571,33 @@ public class FSNamesystem implements Nam
   }
 
   /**
-   * Allocate a block at the given pending filename
+   * Save allocated block at the given pending filename
    * 
    * @param src path to the file
    * @param inodesInPath representing each of the components of src. 
    *                     The last INode is the INode for the file.
    * @throws QuotaExceededException If addition of block exceeds space quota
    */
-  private Block allocateBlock(String src, INodesInPath inodesInPath,
-      DatanodeDescriptor targets[]) throws IOException {
+  BlockInfo saveAllocatedBlock(String src, INodesInPath inodesInPath,
+      Block newBlock, DatanodeDescriptor targets[]) throws IOException {
+    assert hasWriteLock();
+    BlockInfo b = dir.addBlock(src, inodesInPath, newBlock, targets);
+    NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
+        + getBlockPoolId() + " " + b);
+    for (DatanodeDescriptor dn : targets) {
+      dn.incBlocksScheduled();
+    }
+    return b;
+  }
+
+  /**
+   * Create new block with a unique block id and a new generation stamp.
+   */
+  Block createNewBlock() throws IOException {
     assert hasWriteLock();
     Block b = new Block(getFSImage().getUniqueBlockId(), 0, 0); 
     // Increment the generation stamp for every new block.
     b.setGenerationStamp(nextGenerationStamp());
-    b = dir.addBlock(src, inodesInPath, b, targets);
-    NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
-        + blockPoolId + " " + b);
     return b;
   }
 

Added: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java?rev=1441681&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
 (added)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
 Sat Feb  2 02:07:26 2013
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+import java.lang.reflect.Field;
+import java.util.EnumSet;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.net.Node;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Race between two threads simultaneously calling
+ * FSNamesystem.getAdditionalBlock().
+ */
+public class TestAddBlockRetry {
+  public static final Log LOG = LogFactory.getLog(TestAddBlockRetry.class);
+
+  private static final short REPLICATION = 3;
+
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+
+  private int count = 0;
+  private LocatedBlock lb1;
+  private LocatedBlock lb2;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(REPLICATION)
+      .build();
+    cluster.waitActive();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Retry addBlock() while another thread is in chooseTarget().
+   * See HDFS-4452.
+   */
+  @Test
+  public void testRetryAddBlockWhileInChooseTarget() throws Exception {
+    final String src = "/testRetryAddBlockWhileInChooseTarget";
+
+    FSNamesystem ns = cluster.getNamesystem();
+    BlockManager spyBM = spy(ns.getBlockManager());
+    final NamenodeProtocols nn = cluster.getNameNodeRpc();
+
+    // substitute mocked BlockManager into FSNamesystem
+    Class<? extends FSNamesystem> nsClass = ns.getClass();
+    Field bmField = nsClass.getDeclaredField("blockManager");
+    bmField.setAccessible(true);
+    bmField.set(ns, spyBM);
+
+    doAnswer(new Answer<DatanodeDescriptor[]>() {
+      @Override
+      public DatanodeDescriptor[] answer(InvocationOnMock invocation)
+          throws Throwable {
+        LOG.info("chooseTarget for " + src);
+        DatanodeDescriptor[] ret =
+            (DatanodeDescriptor[]) invocation.callRealMethod();
+        count++;
+        if(count == 1) { // run second addBlock()
+          LOG.info("Starting second addBlock for " + src);
+          nn.addBlock(src, "clientName", null, null);
+          LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
+          assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
+          lb2 = lbs.get(0);
+          assertEquals("Wrong replication",
+              REPLICATION, lb2.getLocations().length);
+        }
+        return ret;
+      }
+    }).when(spyBM).chooseTarget(Mockito.anyString(), Mockito.anyInt(),
+        Mockito.<DatanodeDescriptor>any(), Mockito.<HashMap<Node, Node>>any(),
+        Mockito.anyLong());
+
+    // create file
+    nn.create(src, FsPermission.getFileDefault(),
+        "clientName",
+        new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
+        true, (short)3, 1024);
+
+    // start first addBlock()
+    LOG.info("Starting first addBlock for " + src);
+    nn.addBlock(src, "clientName", null, null);
+
+    // check locations
+    LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
+    assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
+    lb1 = lbs.get(0);
+    assertEquals("Wrong replication", REPLICATION, lb1.getLocations().length);
+    assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock());
+  }
+}

Propchange: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain


Reply via email to