[ 
https://issues.apache.org/jira/browse/HDFS-7854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356094#comment-14356094
 ] 

Jing Zhao commented on HDFS-7854:
---------------------------------

Some comments so far:
# Can we keep {{currentSeqno}}, {{lastQueuedSeqno}} and {{bytesCurBlock}} in 
DFSOutputStream?
# Some fields in DataStreamer can be declared as final, e.g., dfsClient, src, 
checksum4WriteBlock, progress, stat, blockSize, fileId, etc.
# We can move the following code into a single function.
{code}
    long usedInLastBlock = stat.getLen() % blockSize;
    int freeInLastBlock = (int)(blockSize - usedInLastBlock);

    // calculate the amount of free space in the pre-existing
    // last crc chunk
    int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
    int freeInCksum = bytesPerChecksum - usedInCksum;

    // if there is space in the last block, then we have to
    // append to that block
    if (freeInLastBlock == blockSize) {
      throw new IOException("The last block for file " +
          src + " is full.");
    }

    if (usedInCksum > 0 && freeInCksum > 0) {
      // if there is space in the last partial chunk, then
      // setup in such a way that the next packet will have only
      // one chunk that fills up the partial chunk.
      //
      computePacketChunkSize(0, freeInCksum);
      setChecksumBufSize(freeInCksum);
      streamer.setAppendChunk(true);
    } else {
      // if the remaining space in the block is smaller than
      // that expected size of of a packet, then create
      // smaller size packet.
      //
      computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, 
freeInLastBlock),
          bytesPerChecksum);
    }
{code}
# We do not need to compute the checksumSize for a heartbeat packet since it 
does not contain real data.
{code}
  private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
    int checksumSize = checksum4WriteBlock.getChecksumSize();
    if (isLazyPersist(stat) && stat.getReplication() == 1) {
      checksumSize = 0;
    }
    final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
    return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, checksumSize, 
false);
  }
{code}
# Instead of directly acquiring {{DataStreamer#dataQueue}}'s monitor and 
modifying it in DFSOutputStream, we'd better wrap it into a DataStreamer method.
{code}
  private void queueCurrentPacket() {
    synchronized (streamer.dataQueue) {
      if (currentPacket == null) return;
      streamer.dataQueue.addLast(currentPacket);
      streamer.setLastQueuedSeqno(currentPacket.getSeqno());
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno());
      }
      currentPacket = null;
      streamer.dataQueue.notifyAll();
    }
  }
{code}
# Similar for {{DFSOutputStream#waitAndQueueCurrentPacket}}. In general it is 
not very clean to directly access streamer's dataqueue and using its monitor or 
call its {{wait}} method from DFSOutputStream.
# What is the usage of {{toTerminate}}? Can we only use {{streamerClosed}}?

> Separate class DataStreamer out of DFSOutputStream
> --------------------------------------------------
>
>                 Key: HDFS-7854
>                 URL: https://issues.apache.org/jira/browse/HDFS-7854
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>            Reporter: Li Bo
>            Assignee: Li Bo
>         Attachments: HDFS-7854-001.patch, HDFS-7854-002.patch, 
> HDFS-7854-003.patch
>
>
> This sub task separate DataStreamer from DFSOutputStream. New DataStreamer 
> will accept packets and write them to remote datanodes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to