Author: arp
Date: Thu Aug 21 01:13:49 2014
New Revision: 1619275
URL: http://svn.apache.org/r1619275
Log:
HDFS-6758. Block writer should pass the expected block size to
DataXceiverServer (Arpit Agarwal)
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1619275&r1=1619274&r2=1619275&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug 21
01:13:49 2014
@@ -500,7 +500,10 @@ Release 2.6.0 - UNRELEASED
hadoop.security.saslproperties.resolver.class. (Benoy Antony via cnauroth)
HDFS-6878. Change MiniDFSCluster to support StorageType configuration
- for individual directories (Arpit Agarwal)
+ for individual directories. (Arpit Agarwal)
+
+ HDFS-6758. block writer should pass the expected block size to
+ DataXceiverServer. (Arpit Agarwal)
OPTIMIZATIONS
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1619275&r1=1619274&r2=1619275&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
Thu Aug 21 01:13:49 2014
@@ -1342,8 +1342,14 @@ public class DFSOutputStream extends FSO
//
BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage():
stage;
+
+ // We cannot change the block length in 'block' as it counts the
number
+ // of bytes ack'ed.
+ ExtendedBlock blockCopy = new ExtendedBlock(block);
+ blockCopy.setNumBytes(blockSize);
+
// send the request
- new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken,
+ new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0],
accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
cachingStrategy.get());
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1619275&r1=1619274&r2=1619275&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Thu Aug 21 01:13:49 2014
@@ -576,7 +576,9 @@ class DataXceiver extends Receiver imple
// forward the original version of the block to downstream mirrors, so
// make a copy here.
final ExtendedBlock originalBlock = new ExtendedBlock(block);
- block.setNumBytes(dataXceiverServer.estimateBlockSize);
+ if (block.getNumBytes() == 0) {
+ block.setNumBytes(dataXceiverServer.estimateBlockSize);
+ }
LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
+ localAddress);
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1619275&r1=1619274&r2=1619275&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
Thu Aug 21 01:13:49 2014
@@ -100,11 +100,8 @@ class DataXceiverServer implements Runna
/**
* We need an estimate for block size to check if the disk partition has
- * enough space. For now we set it to be the default block size set
- * in the server side configuration, which is not ideal because the
- * default block size should be a client-size configuration.
- * A better solution is to include in the header the estimated block size,
- * i.e. either the actual block size or the default block size.
+ * enough space. Newer clients pass the expected block size to the DataNode.
+ * For older clients we just use the server-side default block size.
*/
final long estimateBlockSize;
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java?rev=1619275&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
Thu Aug 21 01:13:49 2014
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.*;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+
+
+/**
+ * Test to verify that the DFSClient passes the expected block length to
+ * the DataNode via DataTransferProtocol.
+ */
+public class TestWriteBlockGetsBlockLengthHint {
+ static final long DEFAULT_BLOCK_LENGTH = 1024;
+ static final long EXPECTED_BLOCK_LENGTH = DEFAULT_BLOCK_LENGTH * 2;
+
+ @Test
+ public void blockLengthHintIsPropagated() throws IOException {
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ Configuration conf = new HdfsConfiguration();
+ FsDatasetChecker.setFactory(conf);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_LENGTH);
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+ MiniDFSCluster cluster = new
MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+
+ try {
+ cluster.waitActive();
+
+ // FsDatasetChecker#createRbw asserts during block creation if the test
+ // fails.
+ DFSTestUtil.createFile(
+ cluster.getFileSystem(),
+ path,
+ 4096, // Buffer size.
+ EXPECTED_BLOCK_LENGTH,
+ EXPECTED_BLOCK_LENGTH,
+ (short) 1,
+ 0x1BAD5EED);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ static class FsDatasetChecker extends SimulatedFSDataset {
+ static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
+ @Override
+ public SimulatedFSDataset newInstance(DataNode datanode,
+ DataStorage storage, Configuration conf) throws IOException {
+ return new FsDatasetChecker(storage, conf);
+ }
+
+ @Override
+ public boolean isSimulated() {
+ return true;
+ }
+ }
+
+ public static void setFactory(Configuration conf) {
+ conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
+ Factory.class.getName());
+ }
+
+ public FsDatasetChecker(DataStorage storage, Configuration conf) {
+ super(storage, conf);
+ }
+
+ /**
+ * Override createRbw to verify that the block length that is passed
+ * is correct. This requires both DFSOutputStream and BlockReceiver to
+ * correctly propagate the hint to FsDatasetSpi.
+ */
+ @Override
+ public synchronized ReplicaInPipelineInterface createRbw(
+ StorageType storageType, ExtendedBlock b) throws IOException {
+ assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
+ return super.createRbw(storageType, b);
+ }
+ }
+}