Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 b9a6f9aa1 -> 238458b25


HDFS-9289. Make DataStreamer#block thread safe and verify genStamp in 
commitBlock. Contributed by Chang Li.

Conflicts:
        hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
        
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

Change-Id: Ibd44ff1bf92bad7262db724990a6a64c1975ffb6


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/238458b2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/238458b2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/238458b2

Branch: refs/heads/branch-2.6
Commit: 238458b25921a652eefead2cebd797c1b9de0343
Parents: b9a6f9a
Author: Kihwal Lee <kih...@apache.org>
Authored: Wed Nov 4 12:10:59 2015 -0600
Committer: Zhe Zhang <z...@apache.org>
Committed: Tue Nov 24 09:44:50 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  2 +-
 .../BlockInfoUnderConstruction.java             |  2 +-
 .../server/blockmanagement/BlockManager.java    |  4 +
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     | 67 +++++++++++++
 .../TestCommitBlockWithInvalidGenStamp.java     | 98 ++++++++++++++++++++
 6 files changed, 174 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/238458b2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index cc7bae8..5e683e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -20,6 +20,9 @@ Release 2.6.3 - UNRELEASED
 
     HDFS-9083. Replication violates block placement policy (Rushabh Shah)
 
+    HDFS-9289. Make DataStreamer#block thread safe and verify genStamp in
+    commitBlock. (Chang Li via zhz)
+
 Release 2.6.2 - 2015-10-28
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/238458b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 92dbc8e..21e4d4e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -363,7 +363,7 @@ public class DFSOutputStream extends FSOutputSummer
   //
   class DataStreamer extends Daemon {
     private volatile boolean streamerClosed = false;
-    private ExtendedBlock block; // its length is number of bytes acked
+    private volatile ExtendedBlock block; // its length is number of bytes 
acked
     private Token<BlockTokenIdentifier> accessToken;
     private DataOutputStream blockStream;
     private DataInputStream blockReplyStream;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/238458b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
index dd3593f..703373e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
@@ -262,7 +262,7 @@ public class BlockInfoUnderConstruction extends BlockInfo {
       throw new IOException("Trying to commit inconsistent block: id = "
           + block.getBlockId() + ", expected id = " + getBlockId());
     blockUCState = BlockUCState.COMMITTED;
-    this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
+    this.setNumBytes(block.getNumBytes());
     // Sort out invalid replicas.
     setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/238458b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 1febc53..feaf843 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -591,6 +591,10 @@ public class BlockManager {
     assert block.getNumBytes() <= commitBlock.getNumBytes() :
       "commitBlock length is less than the stored one "
       + commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
+    if(block.getGenerationStamp() != commitBlock.getGenerationStamp()) {
+      throw new IOException("Commit block with mismatching GS. NN has " +
+        block + ", client submits " + commitBlock);
+    }
     block.commitBlock(commitBlock);
     return true;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/238458b2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index c728b2b..c012f67 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -63,11 +63,15 @@ import 
org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
 import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.ha
         .ConfiguredFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import 
org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
@@ -1636,4 +1640,67 @@ public class DFSTestUtil {
     LayoutVersion.updateMap(DataNodeLayoutVersion.FEATURES,
                             new LayoutVersion.LayoutFeature[] { feature });
   }
+
+  public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
+      Block block, BlockStatus blockStatus, DatanodeStorage storage) {
+    ReceivedDeletedBlockInfo[] receivedBlocks = new 
ReceivedDeletedBlockInfo[1];
+    receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, blockStatus, null);
+    StorageReceivedDeletedBlocks[] reports = new 
StorageReceivedDeletedBlocks[1];
+    reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
+    return reports;
+  }
+
+  /**
+   * Adds a block to a file.
+   * This method only manipulates NameNode
+   * states of the file and the block without injecting data to DataNode.
+   * It does mimic block reports.
+   * You should disable periodical heartbeat before use this.
+   * @param dataNodes List DataNodes to host the block
+   * @param previous Previous block in the file
+   * @param len block size
+   * @return The added block
+   */
+  public static Block addBlockToFile(
+      List<DataNode> dataNodes, DistributedFileSystem fs, FSNamesystem ns,
+      String file, INodeFile fileNode,
+      String clientName, ExtendedBlock previous, int len)
+      throws Exception {
+    fs.getClient().namenode.addBlock(file, clientName, previous, null,
+        fileNode.getId(), null);
+
+    final BlockInfo lastBlock =
+        fileNode.getLastBlock();
+    final int groupSize = fileNode.getBlockReplication();
+    assert dataNodes.size() >= groupSize;
+    // 1. RECEIVING_BLOCK IBR
+    for (int i = 0; i < groupSize; i++) {
+      DataNode dn = dataNodes.get(i);
+      final Block block = new Block(lastBlock.getBlockId() + i, 0,
+          lastBlock.getGenerationStamp());
+      DatanodeStorage storage = new 
DatanodeStorage(UUID.randomUUID().toString());
+      StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+          .makeReportForReceivedBlock(block,
+              ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
+      for (StorageReceivedDeletedBlocks report : reports) {
+        ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+      }
+    }
+
+    // 2. RECEIVED_BLOCK IBR
+    for (int i = 0; i < groupSize; i++) {
+      DataNode dn = dataNodes.get(i);
+      final Block block = new Block(lastBlock.getBlockId() + i,
+          len, lastBlock.getGenerationStamp());
+      DatanodeStorage storage = new 
DatanodeStorage(UUID.randomUUID().toString());
+      StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+          .makeReportForReceivedBlock(block,
+              ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+      for (StorageReceivedDeletedBlocks report : reports) {
+        ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+      }
+    }
+    lastBlock.setNumBytes(len);
+    return lastBlock;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/238458b2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
new file mode 100644
index 0000000..5f8abc5
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+
+public class TestCommitBlockWithInvalidGenStamp {
+  private static final int BLOCK_SIZE = 1024;
+  private MiniDFSCluster cluster;
+  private FSDirectory dir;
+  private DistributedFileSystem dfs;
+
+  @Before
+  public void setUp() throws IOException {
+    final Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    cluster = new MiniDFSCluster.Builder(conf).build();
+    cluster.waitActive();
+
+    dir = cluster.getNamesystem().getFSDirectory();
+    dfs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testCommitWithInvalidGenStamp() throws Exception {
+    final Path file = new Path("/file");
+    FSDataOutputStream out = null;
+
+    try {
+      out = dfs.create(file, (short) 1);
+      INodeFile fileNode = dir.getINode4Write(file.toString()).asFile();
+      ExtendedBlock previous = null;
+
+      Block newBlock = DFSTestUtil.addBlockToFile(cluster.getDataNodes(),
+          dfs, cluster.getNamesystem(), file.toString(), fileNode,
+          dfs.getClient().getClientName(), previous, 100);
+      Block newBlockClone = new Block(newBlock);
+      previous = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(),
+          newBlockClone);
+
+      previous.setGenerationStamp(123);
+      try{
+        dfs.getClient().getNamenode().complete(file.toString(),
+            dfs.getClient().getClientName(), previous, fileNode.getId());
+        Assert.fail("should throw exception because invalid genStamp");
+      } catch (IOException e) {
+        Assert.assertTrue(e.toString().contains(
+            "Commit block with mismatching GS. NN has " +
+            newBlock + ", client submits " + newBlockClone));
+      }
+      previous = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(),
+          newBlock);
+      boolean complete =  
dfs.getClient().getNamenode().complete(file.toString(),
+      dfs.getClient().getClientName(), previous, fileNode.getId());
+      Assert.assertTrue("should complete successfully", complete);
+    } finally {
+      IOUtils.cleanup(null, out);
+    }
+  }
+}

Reply via email to