Author: atm
Date: Fri Dec 16 04:18:58 2011
New Revision: 1215036

URL: http://svn.apache.org/viewvc?rev=1215036&view=rev
Log:
HDFS-2602. NN should log newly-allocated blocks without losing BlockInfo. 
Contributed by Aaron T. Myers

Added:
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java
Modified:
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
 Fri Dec 16 04:18:58 2011
@@ -57,3 +57,5 @@ HDFS-2680. DFSClient should construct fa
 HDFS-2683. Authority-based lookup of proxy provider fails if path becomes 
canonicalized (todd)
 
 HDFS-2689. HA: BookKeeperEditLogInputStream doesn't implement isInProgress() 
(atm)
+
+HDFS-2602. NN should log newly-allocated blocks without losing BlockInfo (atm)

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 Fri Dec 16 04:18:58 2011
@@ -125,6 +125,8 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = false;
   public static final String  DFS_PERMISSIONS_ENABLED_KEY = 
"dfs.permissions.enabled";
   public static final boolean DFS_PERMISSIONS_ENABLED_DEFAULT = true;
+  public static final String  DFS_PERSIST_BLOCKS_KEY = "dfs.persist.blocks";
+  public static final boolean DFS_PERSIST_BLOCKS_DEFAULT = false;
   public static final String  DFS_PERMISSIONS_SUPERUSERGROUP_KEY = 
"dfs.permissions.superusergroup";
   public static final String  DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT = 
"supergroup";
   public static final String  DFS_ADMIN = "dfs.cluster.administrators";

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
 Fri Dec 16 04:18:58 2011
@@ -45,6 +45,16 @@ public class HAUtil {
   }
 
   /**
+   * Returns true if HA is using a shared edits directory.
+   *
+   * @param conf Configuration
+   * @return true if HA config is using a shared edits dir, false otherwise.
+   */
+  public static boolean usesSharedEditsDir(Configuration conf) {
+    return null != conf.get(DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
+  }
+
+  /**
    * Get the namenode Id by matching the {@code addressKey}
    * with the the address of the local node.
    * 

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 Fri Dec 16 04:18:58 2011
@@ -425,7 +425,7 @@ public class BlockManager {
     
     final boolean b = commitBlock((BlockInfoUnderConstruction)lastBlock, 
commitBlock);
     if(countNodes(lastBlock).liveReplicas() >= minReplication)
-      completeBlock(fileINode,fileINode.numBlocks()-1);
+      completeBlock(fileINode,fileINode.numBlocks()-1, false);
     return b;
   }
 
@@ -437,14 +437,14 @@ public class BlockManager {
    * of replicas reported from data-nodes.
    */
   private BlockInfo completeBlock(final INodeFile fileINode,
-      final int blkIndex) throws IOException {
+      final int blkIndex, boolean force) throws IOException {
     if(blkIndex < 0)
       return null;
     BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
     if(curBlock.isComplete())
       return curBlock;
     BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
-    if(ucBlock.numNodes() < minReplication)
+    if (!force && ucBlock.numNodes() < minReplication)
       throw new IOException("Cannot complete block: " +
           "block does not satisfy minimal replication requirement.");
     BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
@@ -455,15 +455,27 @@ public class BlockManager {
   }
 
   private BlockInfo completeBlock(final INodeFile fileINode,
-      final BlockInfo block) throws IOException {
+      final BlockInfo block, boolean force) throws IOException {
     BlockInfo[] fileBlocks = fileINode.getBlocks();
     for(int idx = 0; idx < fileBlocks.length; idx++)
       if(fileBlocks[idx] == block) {
-        return completeBlock(fileINode, idx);
+        return completeBlock(fileINode, idx, force);
       }
     return block;
   }
+  
+  /**
+   * Force the given block in the given file to be marked as complete,
+   * regardless of whether enough replicas are present. This is necessary
+   * when tailing edit logs as a Standby.
+   */
+  public BlockInfo forceCompleteBlock(final INodeFile fileINode,
+      final BlockInfoUnderConstruction block) throws IOException {
+    block.commitBlock(block);
+    return completeBlock(fileINode, block, true);
+  }
 
+  
   /**
    * Convert the last block of the file to an under construction block.<p>
    * The block is converted only if the file has blocks and the last one
@@ -590,8 +602,8 @@ public class BlockManager {
     final boolean isCorrupt = numCorruptNodes == numNodes;
     final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
     final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
+    int j = 0;
     if (numMachines > 0) {
-      int j = 0;
       for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
           it.hasNext();) {
         final DatanodeDescriptor d = it.next();
@@ -600,6 +612,12 @@ public class BlockManager {
           machines[j++] = d;
       }
     }
+    assert j == machines.length :
+      "isCorrupt: " + isCorrupt + 
+      " numMachines: " + numMachines +
+      " numNodes: " + numNodes +
+      " numCorrupt: " + numCorruptNodes +
+      " numCorruptRepls: " + numCorruptReplicas;
     final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), 
blk);
     return new LocatedBlock(eb, machines, pos, isCorrupt);
   }
@@ -1608,7 +1626,7 @@ public class BlockManager {
     int numCurrentReplica = countLiveNodes(storedBlock);
     if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
         && numCurrentReplica >= minReplication)
-      storedBlock = completeBlock(storedBlock.getINode(), storedBlock);
+      storedBlock = completeBlock(storedBlock.getINode(), storedBlock, false);
 
     // check whether safe replication is reached for the block
     // only complete blocks are counted towards that
@@ -1673,7 +1691,7 @@ public class BlockManager {
 
     if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
         numLiveReplicas >= minReplication)
-      storedBlock = completeBlock(fileINode, storedBlock);
+      storedBlock = completeBlock(fileINode, storedBlock, false);
 
     // check whether safe replication is reached for the block
     // only complete blocks are counted towards that

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 Fri Dec 16 04:18:58 2011
@@ -263,34 +263,19 @@ public class FSDirectory implements Clos
    */
   INode unprotectedAddFile( String path, 
                             PermissionStatus permissions,
-                            BlockInfo[] blocks, 
                             short replication,
                             long modificationTime,
                             long atime,
                             long preferredBlockSize) 
       throws UnresolvedLinkException {
     INode newNode;
-    long diskspace = UNKNOWN_DISK_SPACE;
     assert hasWriteLock();
-    if (blocks == null)
-      newNode = new INodeDirectory(permissions, modificationTime);
-    else {
-      newNode = new INodeFile(permissions, blocks.length, replication,
-                              modificationTime, atime, preferredBlockSize);
-      diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks);
-    }
+    newNode = new INodeFile(permissions, new BlockInfo[0], replication,
+                            modificationTime, atime, preferredBlockSize);
     writeLock();
     try {
       try {
-        newNode = addNode(path, newNode, diskspace);
-        if(newNode != null && blocks != null) {
-          int nrBlocks = blocks.length;
-          // Add file->block mapping
-          INodeFile newF = (INodeFile)newNode;
-          for (int i = 0; i < nrBlocks; i++) {
-            newF.setBlock(i, getBlockManager().addINode(blocks[i], newF));
-          }
-        }
+        newNode = addNode(path, newNode, 0);
       } catch (IOException e) {
         return null;
       }
@@ -391,7 +376,7 @@ public class FSDirectory implements Clos
       writeUnlock();
     }
   }
-
+  
   /**
    * Close file.
    */
@@ -414,7 +399,7 @@ public class FSDirectory implements Clos
   }
 
   /**
-   * Remove a block to the file.
+   * Remove a block from the file.
    */
   boolean removeBlock(String path, INodeFileUnderConstruction fileNode, 
                       Block block) throws IOException {
@@ -422,27 +407,32 @@ public class FSDirectory implements Clos
 
     writeLock();
     try {
-      // modify file-> block and blocksMap
-      fileNode.removeLastBlock(block);
-      getBlockManager().removeBlockFromMap(block);
-
+      unprotectedRemoveBlock(path, fileNode, block);
       // write modified block locations to log
       fsImage.getEditLog().logOpenFile(path, fileNode);
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
-            +path+" with "+block
-            +" block is removed from the file system");
-      }
-
-      // update space consumed
-      INode[] pathINodes = getExistingPathINodes(path);
-      updateCount(pathINodes, pathINodes.length-1, 0,
-          -fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
     } finally {
       writeUnlock();
     }
     return true;
   }
+  
+  void unprotectedRemoveBlock(String path,
+      INodeFileUnderConstruction fileNode, Block block) throws IOException {
+    // modify file-> block and blocksMap
+    fileNode.removeLastBlock(block);
+    getBlockManager().removeBlockFromMap(block);
+
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+          +path+" with "+block
+          +" block is removed from the file system");
+    }
+
+    // update space consumed
+    INode[] pathINodes = getExistingPathINodes(path);
+    updateCount(pathINodes, pathINodes.length - 1, 0,
+        - fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
+  }
 
   /**
    * @see #unprotectedRenameTo(String, String, long)

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
 Fri Dec 16 04:18:58 2011
@@ -28,6 +28,7 @@ import java.util.EnumMap;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.util.Holder;
+
 import com.google.common.base.Joiner;
 
 @InterfaceAudience.Private
@@ -137,82 +139,84 @@ public class FSEditLogLoader {
           numEdits++;
           incrOpCount(op.opCode, opCounts);
           switch (op.opCode) {
-          case OP_ADD:
-          case OP_CLOSE: {
+          case OP_ADD: {
             AddCloseOp addCloseOp = (AddCloseOp)op;
 
-            // versions > 0 support per file replication
-            // get name and replication
-            final short replication  = fsNamesys.getBlockManager(
-                ).adjustReplication(addCloseOp.replication);
-
-            long blockSize = addCloseOp.blockSize;
-            BlockInfo blocks[] = new BlockInfo[addCloseOp.blocks.length];
-            for (int i = 0; i < addCloseOp.blocks.length; i++) {
-              if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD
-                 && i == addCloseOp.blocks.length-1) {
-                blocks[i] = new 
BlockInfoUnderConstruction(addCloseOp.blocks[i],
-                                                           replication);
-              } else {
-                blocks[i] = new BlockInfo(addCloseOp.blocks[i], replication);
+            // See if the file already exists (persistBlocks call)
+            INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
+            if (oldFile == null) { // this is OP_ADD on a new file
+              // versions > 0 support per file replication
+              // get name and replication
+              final short replication  = fsNamesys.getBlockManager(
+                  ).adjustReplication(addCloseOp.replication);
+              PermissionStatus permissions = fsNamesys.getUpgradePermission();
+              if (addCloseOp.permissions != null) {
+                permissions = addCloseOp.permissions;
+              }
+              long blockSize = addCloseOp.blockSize;
+              
+              if (FSNamesystem.LOG.isDebugEnabled()) {
+                FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
+                    " numblocks : " + addCloseOp.blocks.length +
+                    " clientHolder " + addCloseOp.clientName +
+                    " clientMachine " + addCloseOp.clientMachine);
               }
-            }
-
-            PermissionStatus permissions = fsNamesys.getUpgradePermission();
-            if (addCloseOp.permissions != null) {
-              permissions = addCloseOp.permissions;
-            }
-
 
-            // Older versions of HDFS does not store the block size in inode.
-            // If the file has more than one block, use the size of the
-            // first block as the blocksize. Otherwise use the default
-            // block size.
-            if (-8 <= logVersion && blockSize == 0) {
-              if (blocks.length > 1) {
-                blockSize = blocks[0].getNumBytes();
-              } else {
-                long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 
0);
-                blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
+              // Older versions of HDFS does not store the block size in inode.
+              // If the file has more than one block, use the size of the
+              // first block as the blocksize. Otherwise use the default
+              // block size.
+              if (-8 <= logVersion && blockSize == 0) {
+                if (addCloseOp.blocks.length > 1) {
+                  blockSize = addCloseOp.blocks[0].getNumBytes();
+                } else {
+                  long first = ((addCloseOp.blocks.length == 1)?
+                      addCloseOp.blocks[0].getNumBytes(): 0);
+                  blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
+                }
               }
-            }
 
+              // TODO: We should do away with this add-then-replace dance.
 
-            // The open lease transaction re-creates a file if necessary.
-            // Delete the file if it already exists.
-            if (FSNamesystem.LOG.isDebugEnabled()) {
-              FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
-                  " numblocks : " + blocks.length +
-                  " clientHolder " + addCloseOp.clientName +
-                  " clientMachine " + addCloseOp.clientMachine);
+              // add to the file tree
+              INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
+                  addCloseOp.path, permissions,
+                  replication, addCloseOp.mtime,
+                  addCloseOp.atime, blockSize);
+
+              fsNamesys.prepareFileForWrite(addCloseOp.path, node,
+                  addCloseOp.clientName, addCloseOp.clientMachine, null);
+            } else { // This is OP_ADD on an existing file
+              if (!oldFile.isUnderConstruction()) {
+                // This is a call to append() on an already-closed file.
+                fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
+                    addCloseOp.clientName, addCloseOp.clientMachine, null);
+                oldFile = getINodeFile(fsDir, addCloseOp.path);
+              }
+              
+              updateBlocks(fsDir, addCloseOp, oldFile);
             }
-
-            fsDir.unprotectedDelete(addCloseOp.path, addCloseOp.mtime);
-
-            // add to the file tree
-            INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
-                addCloseOp.path, permissions,
-                blocks, replication,
-                addCloseOp.mtime, addCloseOp.atime, blockSize);
-            if (addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
-              //
-              // Replace current node with a INodeUnderConstruction.
-              // Recreate in-memory lease record.
-              //
-              INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
-                                        node.getLocalNameBytes(),
-                                        node.getReplication(),
-                                        node.getModificationTime(),
-                                        node.getPreferredBlockSize(),
-                                        node.getBlocks(),
-                                        node.getPermissionStatus(),
-                                        addCloseOp.clientName,
-                                        addCloseOp.clientMachine,
-                                        null);
-              fsDir.replaceNode(addCloseOp.path, node, cons);
-              fsNamesys.leaseManager.addLease(cons.getClientName(),
-                                              addCloseOp.path);
+            break;
+          }
+          case OP_CLOSE: {
+            AddCloseOp addCloseOp = (AddCloseOp)op;
+            
+            INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
+            if (oldFile == null) {
+              throw new IOException("Operation trying to close non-existent 
file " +
+                  addCloseOp.path);
             }
+            
+            // Update in-memory data structures
+            updateBlocks(fsDir, addCloseOp, oldFile);
+
+            // Now close the file
+            INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) 
oldFile;
+            // TODO: we could use removeLease(holder, path) here, but OP_CLOSE
+            // doesn't seem to serialize the holder... unclear why!
+            fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
+            INodeFile newFile = ucFile.convertToInodeFile();
+            fsDir.replaceNode(addCloseOp.path, ucFile, newFile);
             break;
           }
           case OP_SET_REPLICATION: {
@@ -404,7 +408,88 @@ public class FSEditLogLoader {
     }
     return numEdits;
   }
-
+  
+  private static INodeFile getINodeFile(FSDirectory fsDir, String path)
+      throws IOException {
+    INode inode = fsDir.getINode(path);
+    if (inode != null) {
+      if (!(inode instanceof INodeFile)) {
+        throw new IOException("Operation trying to get non-file " + path);
+      }
+    }
+    return (INodeFile)inode;
+  }
+  
+  /**
+   * Update in-memory data structures with new block information.
+   * @throws IOException
+   */
+  private void updateBlocks(FSDirectory fsDir, AddCloseOp addCloseOp,
+      INodeFile file) throws IOException {
+    
+    // Update the salient file attributes.
+    file.setAccessTime(addCloseOp.atime);
+    file.setModificationTimeForce(addCloseOp.mtime);
+    
+    // Update its block list
+    BlockInfo[] oldBlocks = file.getBlocks();
+    
+    // Are we only updating the last block's gen stamp.
+    boolean isGenStampUpdate = oldBlocks.length == addCloseOp.blocks.length;
+    
+    // First, update blocks in common
+    for (int i = 0; i < oldBlocks.length && i < addCloseOp.blocks.length; i++) 
{
+      BlockInfo oldBlock = oldBlocks[i];
+      Block newBlock = addCloseOp.blocks[i];
+      
+      boolean isLastBlock = i == oldBlocks.length - 1;
+      if (oldBlock.getBlockId() != newBlock.getBlockId() ||
+          (oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() && 
+              !(isGenStampUpdate && isLastBlock))) {
+        throw new IOException("Mismatched block IDs or generation stamps, " + 
+            "attempting to replace block " + oldBlock + " with " + newBlock +
+            " as block # " + i + "/" + addCloseOp.blocks.length + " of " +
+            addCloseOp.path);
+      }
+      
+      oldBlock.setNumBytes(newBlock.getNumBytes());
+      oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
+      
+      if (oldBlock instanceof BlockInfoUnderConstruction &&
+          (!isLastBlock || addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE)) {
+        fsNamesys.getBlockManager().forceCompleteBlock(
+            (INodeFileUnderConstruction)file,
+            (BlockInfoUnderConstruction)oldBlock);
+      }
+    }
+    
+    if (addCloseOp.blocks.length < oldBlocks.length) {
+      // We're removing a block from the file, e.g. abandonBlock(...)
+      if (!file.isUnderConstruction()) {
+        throw new IOException("Trying to remove a block from file " +
+            addCloseOp.path + " which is not under construction.");
+      }
+      if (addCloseOp.blocks.length != oldBlocks.length - 1) {
+        throw new IOException("Trying to remove more than one block from file "
+            + addCloseOp.path);
+      }
+      fsDir.unprotectedRemoveBlock(addCloseOp.path,
+          (INodeFileUnderConstruction)file, oldBlocks[oldBlocks.length - 1]);
+    } else if (addCloseOp.blocks.length > oldBlocks.length) {
+      // We're adding blocks
+      for (int i = oldBlocks.length; i < addCloseOp.blocks.length; i++) {
+        Block newBlock = addCloseOp.blocks[i];
+        BlockInfo newBI = new BlockInfoUnderConstruction(newBlock, 
file.getReplication());
+        fsNamesys.getBlockManager().addINode(newBI, file);
+        file.addBlock(newBI);
+      }
+    }
+    
+    if (addCloseOp.blocks.length > 0) {
+      fsNamesys.notifyGenStampUpdate(
+          addCloseOp.blocks[addCloseOp.blocks.length - 
1].getGenerationStamp());
+    }
+  }
 
   private static void dumpOpCounts(
       EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts) {

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 Fri Dec 16 04:18:58 2011
@@ -52,6 +52,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
@@ -203,7 +205,7 @@ import com.google.common.base.Preconditi
 @Metrics(context="dfs")
 public class FSNamesystem implements Namesystem, FSClusterStats,
     FSNamesystemMBean, NameNodeMXBean {
-  static final Log LOG = LogFactory.getLog(FSNamesystem.class);
+  public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
 
   private static final ThreadLocal<StringBuilder> auditBuffer =
     new ThreadLocal<StringBuilder>() {
@@ -252,6 +254,7 @@ public class FSNamesystem implements Nam
   static final int DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED = 100;
   static int BLOCK_DELETION_INCREMENT = 1000;
   private boolean isPermissionEnabled;
+  private boolean persistBlocks;
   private UserGroupInformation fsOwner;
   private String supergroup;
   private PermissionStatus defaultPermission;
@@ -669,6 +672,15 @@ public class FSNamesystem implements Nam
                                                
DFS_PERMISSIONS_ENABLED_DEFAULT);
     LOG.info("supergroup=" + supergroup);
     LOG.info("isPermissionEnabled=" + isPermissionEnabled);
+
+    this.persistBlocks = conf.getBoolean(DFS_PERSIST_BLOCKS_KEY,
+                                         DFS_PERSIST_BLOCKS_DEFAULT);
+    // block allocation has to be persisted in HA using a shared edits 
directory
+    // so that the standby has up-to-date namespace information
+    String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
+    this.persistBlocks |= HAUtil.isHAEnabled(conf, nameserviceId) &&
+        HAUtil.usesSharedEditsDir(conf);
+
     short filePermission = 
(short)conf.getInt(DFS_NAMENODE_UPGRADE_PERMISSION_KEY,
                                               
DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
     this.defaultPermission = PermissionStatus.createImmutable(
@@ -1403,26 +1415,7 @@ public class FSNamesystem implements Nam
           blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
 
       if (append && myFile != null) {
-        //
-        // Replace current node with a INodeUnderConstruction.
-        // Recreate in-memory lease record.
-        //
-        INodeFile node = (INodeFile) myFile;
-        INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
-                                        node.getLocalNameBytes(),
-                                        node.getReplication(),
-                                        node.getModificationTime(),
-                                        node.getPreferredBlockSize(),
-                                        node.getBlocks(),
-                                        node.getPermissionStatus(),
-                                        holder,
-                                        clientMachine,
-                                        clientNode);
-        dir.replaceNode(src, node, cons);
-        leaseManager.addLease(cons.getClientName(), src);
-
-        // convert last block to under-construction
-        return blockManager.convertLastBlockToUnderConstruction(cons);
+        return prepareFileForWrite(src, myFile, holder, clientMachine, 
clientNode);
       } else {
        // Now we can add the name to the filesystem. This file has no
        // blocks associated with it.
@@ -1450,6 +1443,39 @@ public class FSNamesystem implements Nam
     }
     return null;
   }
+  
+  /**
+   * Replace current node with a INodeUnderConstruction.
+   * Recreate in-memory lease record.
+   * 
+   * @param src path to the file
+   * @param file existing file object
+   * @param leaseHolder identifier of the lease holder on this file
+   * @param clientMachine identifier of the client machine
+   * @param clientNode if the client is collocated with a DN, that DN's 
descriptor
+   * @return the last block locations if the block is partial or null otherwise
+   * @throws UnresolvedLinkException
+   * @throws IOException
+   */
+  public LocatedBlock prepareFileForWrite(String src, INode file,
+      String leaseHolder, String clientMachine, DatanodeDescriptor clientNode)
+      throws UnresolvedLinkException, IOException {
+    INodeFile node = (INodeFile) file;
+    INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+                                    node.getLocalNameBytes(),
+                                    node.getReplication(),
+                                    node.getModificationTime(),
+                                    node.getPreferredBlockSize(),
+                                    node.getBlocks(),
+                                    node.getPermissionStatus(),
+                                    leaseHolder,
+                                    clientMachine,
+                                    clientNode);
+    dir.replaceNode(src, node, cons);
+    leaseManager.addLease(cons.getClientName(), src);
+
+    return blockManager.convertLastBlockToUnderConstruction(cons);
+  }
 
   /**
    * Recover lease;
@@ -1700,10 +1726,14 @@ public class FSNamesystem implements Nam
       
       for (DatanodeDescriptor dn : targets) {
         dn.incBlocksScheduled();
-      }      
+      }
+      dir.persistBlocks(src, pendingFile);
     } finally {
       writeUnlock();
     }
+    if (persistBlocks) {
+      getEditLog().logSync();
+    }
 
     // Create next block
     LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, 
fileLength);
@@ -1782,10 +1812,15 @@ public class FSNamesystem implements Nam
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                       + b + " is removed from pendingCreates");
       }
-      return true;
+      dir.persistBlocks(src, file);
     } finally {
       writeUnlock();
     }
+    if (persistBlocks) {
+      getEditLog().logSync();
+    }
+
+    return true;
   }
   
   // make sure that we still have the lease on this file.
@@ -2594,8 +2629,8 @@ public class FSNamesystem implements Nam
         //remove lease, close file
         finalizeINodeFileUnderConstruction(src, pendingFile);
       } else if (supportAppends) {
-        // If this commit does not want to close the file, persist
-        // blocks only if append is supported 
+        // If this commit does not want to close the file, persist blocks
+        // only if append is supported or we're explicitly told to
         dir.persistBlocks(src, pendingFile);
       }
     } finally {
@@ -3565,7 +3600,8 @@ public class FSNamesystem implements Nam
           }
           assert node != null : "Found a lease for nonexisting file.";
           assert node.isUnderConstruction() :
-            "Found a lease for file that is not under construction.";
+            "Found a lease for file " + path + " that is not under 
construction." +
+            " lease=" + lease;
           INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
           BlockInfo[] blocks = cons.getBlocks();
           if(blocks == null)
@@ -3881,7 +3917,6 @@ public class FSNamesystem implements Nam
    */
   void setGenerationStamp(long stamp) {
     generationStamp.setStamp(stamp);
-    notifyGenStampUpdate(stamp);
   }
 
   /**
@@ -4000,7 +4035,7 @@ public class FSNamesystem implements Nam
     } finally {
       writeUnlock();
     }
-    if (supportAppends) {
+    if (supportAppends || persistBlocks) {
       getEditLog().logSync();
     }
     LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
 Fri Dec 16 04:18:58 2011
@@ -153,6 +153,9 @@ public class LeaseManager {
     Lease lease = getLease(holder);
     if (lease != null) {
       removeLease(lease, src);
+    } else {
+      LOG.warn("Removing non-existent lease! holder=" + holder +
+          " src=" + src);
     }
   }
 

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java
 Fri Dec 16 04:18:58 2011
@@ -188,7 +188,7 @@ public class PendingDataNodeMessages {
    */
   synchronized DataNodeMessage take(long gs) {
     DataNodeMessage m = queue.peek();
-    if (m != null && m.getTargetGs() < gs) {
+    if (m != null && m.getTargetGs() <= gs) {
       return queue.remove();
     } else {
       return null;

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
 Fri Dec 16 04:18:58 2011
@@ -152,4 +152,5 @@ public class EditLogTailer {
       }
     }
   }
+
 }

Added: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java?rev=1215036&view=auto
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java
 (added)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java
 Fri Dec 16 04:18:58 2011
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Level;
+
+import java.io.IOException;
+import java.util.Random;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * A JUnit test for checking if restarting DFS preserves the
+ * blocks that are part of an unclosed file.
+ */
+public class TestPersistBlocks {
+  static {
+    ((Log4JLogger)FSImage.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+  }
+  
+  private static final int BLOCK_SIZE = 4096;
+  private static final int NUM_BLOCKS = 5;
+
+  private static final String FILE_NAME = "/data";
+  private static final Path FILE_PATH = new Path(FILE_NAME);
+  
+  static final byte[] DATA_BEFORE_RESTART = new byte[BLOCK_SIZE * NUM_BLOCKS];
+  static final byte[] DATA_AFTER_RESTART = new byte[BLOCK_SIZE * NUM_BLOCKS];
+  static {
+    Random rand = new Random();
+    rand.nextBytes(DATA_BEFORE_RESTART);
+    rand.nextBytes(DATA_AFTER_RESTART);
+  }
+  
+  /** check if DFS remains in proper condition after a restart */
+  @Test
+  public void testRestartDfs() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    // Turn off persistent IPC, so that the DFSClient can survive NN restart
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+        0);
+    conf.setBoolean(DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY, true);
+    MiniDFSCluster cluster = null;
+
+    long len = 0;
+    FSDataOutputStream stream;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      FileSystem fs = cluster.getFileSystem();
+      // Creating a file with 4096 blockSize to write multiple blocks
+      stream = fs.create(FILE_PATH, true, BLOCK_SIZE, (short) 1, BLOCK_SIZE);
+      stream.write(DATA_BEFORE_RESTART);
+      stream.hflush();
+      
+      // Wait for at least a few blocks to get through
+      while (len <= BLOCK_SIZE) {
+        FileStatus status = fs.getFileStatus(FILE_PATH);
+        len = status.getLen();
+        Thread.sleep(100);
+      }
+      
+      // explicitly do NOT close the file.
+      cluster.restartNameNode();
+      
+      // Check that the file has no less bytes than before the restart
+      // This would mean that blocks were successfully persisted to the log
+      FileStatus status = fs.getFileStatus(FILE_PATH);
+      assertTrue("Length too short: " + status.getLen(),
+          status.getLen() >= len);
+      
+      // And keep writing (ensures that leases are also persisted correctly)
+      stream.write(DATA_AFTER_RESTART);
+      stream.close();
+      
+      // Verify that the data showed up, both from before and after the 
restart.
+      FSDataInputStream readStream = fs.open(FILE_PATH);
+      try {
+        byte[] verifyBuf = new byte[DATA_BEFORE_RESTART.length];
+        IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
+        assertArrayEquals(DATA_BEFORE_RESTART, verifyBuf);
+        
+        IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
+        assertArrayEquals(DATA_AFTER_RESTART, verifyBuf);
+      } finally {
+        IOUtils.closeStream(readStream);
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+  
+  @Test
+  public void testRestartDfsWithAbandonedBlock() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    // Turn off persistent IPC, so that the DFSClient can survive NN restart
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+        0);
+    conf.setBoolean(DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY, true);
+    MiniDFSCluster cluster = null;
+
+    long len = 0;
+    FSDataOutputStream stream;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      FileSystem fs = cluster.getFileSystem();
+      // Creating a file with 4096 blockSize to write multiple blocks
+      stream = fs.create(FILE_PATH, true, BLOCK_SIZE, (short) 1, BLOCK_SIZE);
+      stream.write(DATA_BEFORE_RESTART);
+      stream.hflush();
+      
+      // Wait for all of the blocks to get through
+      while (len < BLOCK_SIZE * (NUM_BLOCKS - 1)) {
+        FileStatus status = fs.getFileStatus(FILE_PATH);
+        len = status.getLen();
+        Thread.sleep(100);
+      }
+      
+      // Abandon the last block
+      DFSClient dfsclient = 
DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
+      LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(
+          FILE_NAME, 0, BLOCK_SIZE * NUM_BLOCKS);
+      assertEquals(NUM_BLOCKS, blocks.getLocatedBlocks().size());
+      LocatedBlock b = blocks.getLastLocatedBlock();
+      dfsclient.getNamenode().abandonBlock(b.getBlock(), FILE_NAME,
+          dfsclient.clientName);
+      
+      // explicitly do NOT close the file.
+      cluster.restartNameNode();
+      
+      // Check that the file has no less bytes than before the restart
+      // This would mean that blocks were successfully persisted to the log
+      FileStatus status = fs.getFileStatus(FILE_PATH);
+      assertTrue("Length incorrect: " + status.getLen(),
+          status.getLen() != len - BLOCK_SIZE);
+
+      // Verify the data showed up from before restart, sans abandoned block.
+      FSDataInputStream readStream = fs.open(FILE_PATH);
+      try {
+        byte[] verifyBuf = new byte[DATA_BEFORE_RESTART.length - BLOCK_SIZE];
+        IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
+        byte[] expectedBuf = new byte[DATA_BEFORE_RESTART.length - BLOCK_SIZE];
+        System.arraycopy(DATA_BEFORE_RESTART, 0,
+            expectedBuf, 0, expectedBuf.length);
+        assertArrayEquals(expectedBuf, verifyBuf);
+      } finally {
+        IOUtils.closeStream(readStream);
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+  
+  @Test
+  public void testRestartWithPartialBlockHflushed() throws IOException {
+    final Configuration conf = new HdfsConfiguration();
+    // Turn off persistent IPC, so that the DFSClient can survive NN restart
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+        0);
+    conf.setBoolean(DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY, true);
+    MiniDFSCluster cluster = null;
+
+    FSDataOutputStream stream;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      FileSystem fs = cluster.getFileSystem();
+      NameNode.getAddress(conf).getPort();
+      // Creating a file with 4096 blockSize to write multiple blocks
+      stream = fs.create(FILE_PATH, true, BLOCK_SIZE, (short) 1, BLOCK_SIZE);
+      stream.write(DATA_BEFORE_RESTART);
+      stream.write((byte)1);
+      stream.hflush();
+      
+      // explicitly do NOT close the file before restarting the NN.
+      cluster.restartNameNode();
+      
+      // this will fail if the final block of the file is prematurely COMPLETEd
+      stream.write((byte)2);
+      stream.hflush();
+      stream.close();
+      
+      assertEquals(DATA_BEFORE_RESTART.length + 2,
+          fs.getFileStatus(FILE_PATH).getLen());
+      
+      FSDataInputStream readStream = fs.open(FILE_PATH);
+      try {
+        byte[] verifyBuf = new byte[DATA_BEFORE_RESTART.length + 2];
+        IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
+        byte[] expectedBuf = new byte[DATA_BEFORE_RESTART.length + 2];
+        System.arraycopy(DATA_BEFORE_RESTART, 0, expectedBuf, 0,
+            DATA_BEFORE_RESTART.length);
+        System.arraycopy(new byte[]{1, 2}, 0, expectedBuf,
+            DATA_BEFORE_RESTART.length, 2);
+        assertArrayEquals(expectedBuf, verifyBuf);
+      } finally {
+        IOUtils.closeStream(readStream);
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+  
+  @Test
+  public void testRestartWithAppend() throws IOException {
+    final Configuration conf = new HdfsConfiguration();
+    // Turn off persistent IPC, so that the DFSClient can survive NN restart
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+        0);
+    conf.setBoolean(DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY, true);
+    MiniDFSCluster cluster = null;
+
+    FSDataOutputStream stream;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      FileSystem fs = cluster.getFileSystem();
+      NameNode.getAddress(conf).getPort();
+      // Creating a file with 4096 blockSize to write multiple blocks
+      stream = fs.create(FILE_PATH, true, BLOCK_SIZE, (short) 1, BLOCK_SIZE);
+      stream.write(DATA_BEFORE_RESTART, 0, DATA_BEFORE_RESTART.length / 2);
+      stream.close();
+      stream = fs.append(FILE_PATH, BLOCK_SIZE);
+      stream.write(DATA_BEFORE_RESTART, DATA_BEFORE_RESTART.length / 2,
+          DATA_BEFORE_RESTART.length / 2);
+      stream.close();
+      
+      assertEquals(DATA_BEFORE_RESTART.length,
+          fs.getFileStatus(FILE_PATH).getLen());
+      
+      cluster.restartNameNode();
+      
+      assertEquals(DATA_BEFORE_RESTART.length,
+          fs.getFileStatus(FILE_PATH).getLen());
+      
+      FSDataInputStream readStream = fs.open(FILE_PATH);
+      try {
+        byte[] verifyBuf = new byte[DATA_BEFORE_RESTART.length];
+        IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
+        assertArrayEquals(DATA_BEFORE_RESTART, verifyBuf);
+      } finally {
+        IOUtils.closeStream(readStream);
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+}

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
 Fri Dec 16 04:18:58 2011
@@ -116,10 +116,12 @@ public class TestEditLog extends TestCas
     int numTransactions;
     short replication = 3;
     long blockSize = 64;
+    final int id;
 
-    Transactions(FSNamesystem ns, int num) {
+    Transactions(FSNamesystem ns, int num, int id) {
       namesystem = ns;
       numTransactions = num;
+      this.id = id;
     }
 
     // add a bunch of transactions.
@@ -131,8 +133,9 @@ public class TestEditLog extends TestCas
       for (int i = 0; i < numTransactions; i++) {
         INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
                             p, replication, blockSize, 0, "", "", null);
-        editLog.logOpenFile("/filename" + i, inode);
-        editLog.logCloseFile("/filename" + i, inode);
+        String fileName = "/filename-" + id + "-" + i;
+        editLog.logOpenFile(fileName, inode);
+        editLog.logCloseFile(fileName, inode);
         editLog.logSync();
       }
     }
@@ -280,7 +283,7 @@ public class TestEditLog extends TestCas
       // Create threads and make them run transactions concurrently.
       Thread threadId[] = new Thread[NUM_THREADS];
       for (int i = 0; i < NUM_THREADS; i++) {
-        Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS);
+        Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS, i);
         threadId[i] = new Thread(trans, "TransactionThread-" + i);
         threadId[i].start();
       }


Reply via email to