[partial-ns] Implement fsync().

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f09c483
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f09c483
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f09c483

Branch: refs/heads/feature-HDFS-8286
Commit: 7f09c483706c75b9207e008b7891add133e76da4
Parents: 5fe5b9a
Author: Haohui Mai <whe...@apache.org>
Authored: Mon May 25 19:51:53 2015 -0700
Committer: Haohui Mai <whe...@apache.org>
Committed: Fri Jun 12 13:56:58 2015 -0700

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    | 12 ++++++
 .../hdfs/server/namenode/FSDirWriteFileOp.java  | 28 +++++++------
 .../hadoop/hdfs/server/namenode/FSEditLog.java  |  6 +--
 .../hdfs/server/namenode/FSNamesystem.java      | 41 +++++++++++++-------
 .../namenode/FileUnderConstructionFeature.java  | 18 ---------
 .../hdfs/server/namenode/RWTransaction.java     | 10 +++++
 6 files changed, 68 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f09c483/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index afd8bb1..e6eb635 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -360,6 +360,18 @@ public class BlockManager {
     LOG.info("maxNumBlocksToLog          = " + maxNumBlocksToLog);
   }
 
+  public void updateLastBlockLength(Block block, long blockLength) {
+    BlockInfoContiguous lastBlock = getStoredBlock(block);
+    assert (lastBlock != null) : "The last block " + block + " is null when 
updating its length";
+    assert (lastBlock instanceof BlockInfoContiguousUnderConstruction)
+        : "The last block " + block
+        + " is not a BlockInfoUnderConstruction when updating its length";
+    assert !lastBlock.isComplete();
+    BlockInfoContiguousUnderConstruction uc =
+        (BlockInfoContiguousUnderConstruction) lastBlock;
+    uc.setNumBytes(blockLength);
+  }
+
   private static BlockTokenSecretManager createBlockTokenSecretManager(
       final Configuration conf) throws IOException {
     final boolean isEnabled = conf.getBoolean(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f09c483/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 4cb7f2c..9e9aa93 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -100,13 +100,18 @@ class FSDirWriteFileOp {
    */
   static void persistBlocks(
       FSDirectory fsd, String path, INodeFile file, boolean logRetryCache) {
-    assert fsd.getFSNamesystem().hasWriteLock();
-    Preconditions.checkArgument(file.isUnderConstruction());
-    fsd.getEditLog().logUpdateBlocks(path, file, logRetryCache);
+    throw new IllegalStateException("Unimplemented");
+  }
+
+  static void persistBlocks(
+      RWTransaction tx, String path, FlatINode inode) {
+    FlatINodeFileFeature f = inode.feature(FlatINodeFileFeature.class);
+    Preconditions.checkArgument(f != null && f.inConstruction());
+    tx.logUpdateBlocks(path, f);
     if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("persistBlocks: " + path
-              + " with " + file.getBlocks().length + " blocks is persisted to" 
+
-              " the file system");
+      NameNode.stateChangeLog.debug(
+          "persistBlocks: " + path + " with " + f.numBlocks() + " " +
+              "blocks is persisted to the file system");
     }
   }
 
@@ -528,8 +533,7 @@ class FSDirWriteFileOp {
     final INodeFile newNode;
     assert fsd.hasWriteLock();
     if (underConstruction) {
-      newNode = newINodeFile(id, permissions, modificationTime,
-                                              modificationTime, replication,
+      newNode = newINodeFile(id, permissions, modificationTime, 
modificationTime, replication,
                                               preferredBlockSize,
                                               storagePolicyId);
       newNode.toUnderConstruction(clientName, clientMachine);
@@ -553,7 +557,7 @@ class FSDirWriteFileOp {
         return newNode;
       }
     } catch (IOException e) {
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug(
             "DIR* FSDirectory.unprotectedAddFile: exception when add "
                 + existing.getPath() + " to the file system", e);
@@ -685,9 +689,9 @@ class FSDirWriteFileOp {
     return new FileState(inode, src);
   }
 
-  static boolean completeFile(FSNamesystem fsn, FSPermissionChecker pc,
-      final String srcArg, String holder, ExtendedBlock last, long fileId)
-      throws IOException {
+  static boolean completeFile(
+      FSNamesystem fsn, FSPermissionChecker pc, final String srcArg,
+      String holder, ExtendedBlock last, long fileId) throws IOException {
     String src = srcArg;
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f09c483/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 739b2d4..a6df981 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -799,11 +799,11 @@ public class FSEditLog implements LogsPurgeable {
     logEdit(op);
   }
 
-  public void logUpdateBlocks(String path, INodeFile file, boolean 
toLogRpcIds) {
-    Preconditions.checkArgument(file.isUnderConstruction());
+  public void logUpdateBlocks(String path, Block[] blocks, boolean
+      toLogRpcIds) {
     UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
       .setPath(path)
-      .setBlocks(file.getBlocks());
+      .setBlocks(blocks);
     logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f09c483/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index abb316e..9432bc3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -143,7 +143,6 @@ import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
@@ -3354,31 +3353,45 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       throws IOException {
     NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + 
clientName);
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = 
FSDirectory.getPathComponentsForReservedPath(src);
 
     FSPermissionChecker pc = getPermissionChecker();
     waitForLoadingFSImage();
     writeLock();
-    try {
+    try (RWTransaction tx = dir.newRWTransaction()) {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot fsync file " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INode inode;
-      if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
+      tx.begin();
+      Resolver.Result paths = Resolver.resolve(tx, src);
+      if (true || fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
         // Older clients may not have given us an inode ID to work with.
         // In this case, we have to try to resolve the path and hope it
         // hasn't changed or been deleted since the file was opened for write.
-        inode = dir.getINode(src);
+        paths = Resolver.resolve(tx, src);
       } else {
-        inode = dir.getInode(fileId);
-        if (inode != null) src = inode.getFullPathName();
-      }
-      final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
+        // Newer clients pass the inode ID, so we can just get the inode
+        // directly.
+        paths = Resolver.resolveById(tx, fileId);
+      }
+      if (paths.invalidPath()) {
+        throw new InvalidPathException(src);
+      } else if (paths.notFound()) {
+        throw new FileNotFoundException(src);
+      }
+      FlatINode inode = paths.inodesInPath().getLastINode();
+      checkLease(src, clientName, inode);
+      FlatINodeFileFeature f = inode.feature(FlatINodeFileFeature.class);
+      FlatINodeFileFeature.Builder newFile = new FlatINodeFileFeature.Builder
+          ().mergeFrom(f);
       if (lastBlockLength > 0) {
-        pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock(
-            pendingFile, lastBlockLength);
+        blockManager.updateLastBlockLength(f.lastBlock(), lastBlockLength);
+        Block newLastBlock = f.lastBlock();
+        newLastBlock.setNumBytes(lastBlockLength);
+        newFile.block(f.numBlocks() - 1, newLastBlock);
       }
-      FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, false);
+
+      ByteString newINode = new FlatINode.Builder().mergeFrom(inode)
+          .replaceFeature(FlatINodeFileFeature.wrap(newFile.build())).build();
+      FSDirWriteFileOp.persistBlocks(tx, src, FlatINode.wrap(newINode));
     } finally {
       writeUnlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f09c483/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
index 1ebdde6..603dfc0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
@@ -50,24 +50,6 @@ public class FileUnderConstructionFeature implements 
INode.Feature {
   }
 
   /**
-   * Update the length for the last block
-   *
-   * @param lastBlockLength
-   *          The length of the last block reported from client
-   * @throws IOException
-   */
-  void updateLengthOfLastBlock(INodeFile f, long lastBlockLength)
-      throws IOException {
-    BlockInfoContiguous lastBlock = f.getLastBlock();
-    assert (lastBlock != null) : "The last block for path "
-        + f.getFullPathName() + " is null when updating its length";
-    assert (lastBlock instanceof BlockInfoContiguousUnderConstruction)
-        : "The last block for path " + f.getFullPathName()
-            + " is not a BlockInfoUnderConstruction when updating its length";
-    lastBlock.setNumBytes(lastBlockLength);
-  }
-
-  /**
    * When deleting a file in the current fs directory, and the file is 
contained
    * in a snapshot, we should delete the last block if it's under construction
    * and its size is 0.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f09c483/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
index b0952a8..9b28625 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import com.google.protobuf.ByteString;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.Block;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -172,4 +173,13 @@ class RWTransaction extends Transaction {
   public void logSetPermissions(String src, FsPermission permission) {
     fsd.getEditLog().logSetPermissions(src, permission);
   }
+
+  public void logUpdateBlocks(String path, FlatINodeFileFeature file) {
+    Block[] blocks = new Block[file.numBlocks()];
+    int i = 0;
+    for (Block b : file.blocks()) {
+      blocks[i++] = b;
+    }
+    fsd.getEditLog().logUpdateBlocks(path, blocks, false);
+  }
 }

Reply via email to