[ 
https://issues.apache.org/jira/browse/HDFS-3721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron T. Myers updated HDFS-3721:
---------------------------------

    Attachment: hdfs-3721.txt

Todd's been busy with some other stuff so I'm going to take over working on 
this issue.

Attached is an updated patch which addresses Suresh's and my feedback.

During testing, I also discovered that Todd's original patch introduced some 
unfortunate interaction with Nagle's algorithm in TCP for small packets during 
writing. This was the cause of the TestFileConcurrentReader failure. So, in 
addition to addressing the feedback, this patch also changes 
DFSOutputStream.Packet#writeTo back to only ever call OutputStream#write once, 
by ensuring that the packet header, checksum data, and actual data are all in a 
single contiguous buffer, similarly to how it was done before this patch.

The functional difference between this patch and the last is the following, to 
make review easier:

{code}
commit 61fe911e60fe4db710813fc97a209456722ef1f7
Author: Aaron T. Myers <a...@cloudera.com>
Date:   Sat Aug 4 12:32:11 2012 -0700

    Send one buffer when writing to avoid nagling.

diff --git 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index e460a3c..4582712 100644
--- 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -147,10 +147,17 @@ public class DFSOutputStream extends FSOutputSummer 
implements Syncable {
      * buf is pointed into like follows:
      *  (C is checksum data, D is payload data)
      *
-     * [CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
-     *           ^               ^               ^
-     *           checksumPos     dataStart       dataPos
+     * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
+     *           ^        ^               ^               ^
+     *           |        checksumPos     dataStart       dataPos
+     *           checksumStart
+     * 
+     * Right before sending, we move the checksum data to immediately precede
+     * the actual data, and then insert the header into the buffer immediately
+     * preceding the checksum data, so we make sure to keep enough space in
+     * front of the checksum data to support the largest conceivable header. 
      */
+    int checksumStart;
     int checksumPos;
     int dataStart;
     int dataPos;
@@ -166,9 +173,9 @@ public class DFSOutputStream extends FSOutputSummer 
implements Syncable {
       this.offsetInBlock = 0;
       this.seqno = HEART_BEAT_SEQNO;
       
-      buf = new byte[0];
+      buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
       
-      checksumPos = dataPos = dataStart = 0;
+      checksumStart = checksumPos = dataPos = dataStart = 
PacketHeader.PKT_MAX_HEADER_LEN;
       maxChunks = 0;
     }
     
@@ -180,10 +187,11 @@ public class DFSOutputStream extends FSOutputSummer 
implements Syncable {
       this.seqno = currentSeqno;
       currentSeqno++;
       
-      buf = new byte[pktSize];
+      buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
       
-      checksumPos = 0;
-      dataStart = chunksPerPkt * checksum.getChecksumSize();
+      checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
+      checksumPos = checksumStart;
+      dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize());
       dataPos = dataStart;
       maxChunks = chunksPerPkt;
     }
@@ -205,19 +213,38 @@ public class DFSOutputStream extends FSOutputSummer 
implements Syncable {
     }
     
     /**
-     * Returns ByteBuffer that contains one full packet, including header.
+     * Write the full packet, including the header, to the given output stream.
      */
     void writeTo(DataOutputStream stm) throws IOException {
-      int dataLen = dataPos - dataStart;
-      int checksumLen = checksumPos;
-      int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+      final int dataLen = dataPos - dataStart;
+      final int checksumLen = checksumPos - checksumStart;
+      final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + 
checksumLen;
 
       PacketHeader header = new PacketHeader(
         pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
       
-      header.write(stm);
-      stm.write(buf, 0, checksumLen);
-      stm.write(buf, dataStart, dataLen);
+      if (checksumPos != dataStart) {
+        // Move the checksum to cover the gap. This can happen for the last
+        // packet or during an hflush/hsync call.
+        System.arraycopy(buf, checksumStart, buf, 
+                         dataStart - checksumLen , checksumLen); 
+        checksumPos = dataStart;
+        checksumStart = checksumPos - checksumLen;
+      }
+      
+      final int headerStart = checksumStart - header.getSerializedSize();
+      assert checksumStart + 1 >= header.getSerializedSize();
+      assert checksumPos == dataStart;
+      assert headerStart >= 0;
+      assert headerStart + header.getSerializedSize() == checksumStart;
+      
+      // Copy the header data into the buffer immediately preceding the 
checksum
+      // data.
+      System.arraycopy(header.getBytes(), 0, buf, headerStart,
+          header.getSerializedSize());
+      
+      // Write the now contiguous full packet to the output stream.
+      stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + 
dataLen);
     }
     
     // get the packet's last byte's offset in the block
diff --git 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
index 1709101..dd56962 100644
--- 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
+++ 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
@@ -166,6 +166,12 @@ public class PacketHeader {
     out.writeShort(proto.getSerializedSize());
     proto.writeTo(out);
   }
+  
+  public byte[] getBytes() {
+    ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
+    putInBuffer(buf);
+    return buf.array();
+  }
 
   /**
    * Perform a sanity check on the packet, returning true if it is sane.
{code}
                
> hsync support broke wire compatibility
> --------------------------------------
>
>                 Key: HDFS-3721
>                 URL: https://issues.apache.org/jira/browse/HDFS-3721
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: data-node, hdfs client
>    Affects Versions: 2.1.0-alpha
>            Reporter: Todd Lipcon
>            Assignee: Todd Lipcon
>            Priority: Critical
>         Attachments: hdfs-3721.txt, hdfs-3721.txt
>
>
> HDFS-744 added support for hsync to the data transfer wire protocol. However, 
> it actually broke wire compatibility: if the client has hsync support but the 
> server does not, the client cannot read or write data on the old cluster.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to