Author: arp
Date: Thu Aug 21 01:13:49 2014
New Revision: 1619275

URL: http://svn.apache.org/r1619275
Log:
HDFS-6758. Block writer should pass the expected block size to 
DataXceiverServer (Arpit Agarwal)

Added:
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1619275&r1=1619274&r2=1619275&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug 21 
01:13:49 2014
@@ -500,7 +500,10 @@ Release 2.6.0 - UNRELEASED
     hadoop.security.saslproperties.resolver.class. (Benoy Antony via cnauroth)
 
     HDFS-6878. Change MiniDFSCluster to support StorageType configuration
-    for individual directories (Arpit Agarwal)
+    for individual directories. (Arpit Agarwal)
+
+    HDFS-6758. block writer should pass the expected block size to
+    DataXceiverServer. (Arpit Agarwal)
 
   OPTIMIZATIONS
 

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1619275&r1=1619274&r2=1619275&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 Thu Aug 21 01:13:49 2014
@@ -1342,8 +1342,14 @@ public class DFSOutputStream extends FSO
           //
   
           BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): 
stage;
+
+          // We cannot change the block length in 'block' as it counts the 
number
+          // of bytes ack'ed.
+          ExtendedBlock blockCopy = new ExtendedBlock(block);
+          blockCopy.setNumBytes(blockSize);
+
           // send the request
-          new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken,
+          new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], 
accessToken,
               dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
               nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
               cachingStrategy.get());

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1619275&r1=1619274&r2=1619275&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
 Thu Aug 21 01:13:49 2014
@@ -576,7 +576,9 @@ class DataXceiver extends Receiver imple
     // forward the original version of the block to downstream mirrors, so
     // make a copy here.
     final ExtendedBlock originalBlock = new ExtendedBlock(block);
-    block.setNumBytes(dataXceiverServer.estimateBlockSize);
+    if (block.getNumBytes() == 0) {
+      block.setNumBytes(dataXceiverServer.estimateBlockSize);
+    }
     LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
         + localAddress);
 

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1619275&r1=1619274&r2=1619275&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
 Thu Aug 21 01:13:49 2014
@@ -100,11 +100,8 @@ class DataXceiverServer implements Runna
   
   /**
    * We need an estimate for block size to check if the disk partition has
-   * enough space. For now we set it to be the default block size set
-   * in the server side configuration, which is not ideal because the
-   * default block size should be a client-size configuration. 
-   * A better solution is to include in the header the estimated block size,
-   * i.e. either the actual block size or the default block size.
+   * enough space. Newer clients pass the expected block size to the DataNode.
+   * For older clients we just use the server-side default block size.
    */
   final long estimateBlockSize;
   

Added: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java?rev=1619275&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
 (added)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
 Thu Aug 21 01:13:49 2014
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.*;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+
+
+/**
+ * Test to verify that the DFSClient passes the expected block length to
+ * the DataNode via DataTransferProtocol.
+ */
+public class TestWriteBlockGetsBlockLengthHint {
+  static final long DEFAULT_BLOCK_LENGTH = 1024;
+  static final long EXPECTED_BLOCK_LENGTH = DEFAULT_BLOCK_LENGTH * 2;
+
+  @Test
+  public void blockLengthHintIsPropagated() throws IOException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    Configuration conf = new HdfsConfiguration();
+    FsDatasetChecker.setFactory(conf);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_LENGTH);
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+    MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+
+    try {
+      cluster.waitActive();
+
+      // FsDatasetChecker#createRbw asserts during block creation if the test
+      // fails.
+      DFSTestUtil.createFile(
+          cluster.getFileSystem(),
+          path,
+          4096,  // Buffer size.
+          EXPECTED_BLOCK_LENGTH,
+          EXPECTED_BLOCK_LENGTH,
+          (short) 1,
+          0x1BAD5EED);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  static class FsDatasetChecker extends SimulatedFSDataset {
+    static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
+      @Override
+      public SimulatedFSDataset newInstance(DataNode datanode,
+          DataStorage storage, Configuration conf) throws IOException {
+        return new FsDatasetChecker(storage, conf);
+      }
+
+      @Override
+      public boolean isSimulated() {
+        return true;
+      }
+    }
+
+    public static void setFactory(Configuration conf) {
+      conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
+               Factory.class.getName());
+    }
+
+    public FsDatasetChecker(DataStorage storage, Configuration conf) {
+      super(storage, conf);
+    }
+
+    /**
+     * Override createRbw to verify that the block length that is passed
+     * is correct. This requires both DFSOutputStream and BlockReceiver to
+     * correctly propagate the hint to FsDatasetSpi.
+     */
+    @Override
+    public synchronized ReplicaInPipelineInterface createRbw(
+        StorageType storageType, ExtendedBlock b) throws IOException {
+      assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
+      return super.createRbw(storageType, b);
+    }
+  }
+}


Reply via email to