[ https://issues.apache.org/jira/browse/HDFS-7854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356094#comment-14356094 ]
Jing Zhao commented on HDFS-7854: --------------------------------- Some comments so far: # Can we keep {{currentSeqno}}, {{lastQueuedSeqno}} and {{bytesCurBlock}} in DFSOutputStream? # Some fields in DataStreamer can be declared as final, e.g., dfsClient, src, checksum4WriteBlock, progress, stat, blockSize, fileId, etc. # We can move the following code into a single function. {code} long usedInLastBlock = stat.getLen() % blockSize; int freeInLastBlock = (int)(blockSize - usedInLastBlock); // calculate the amount of free space in the pre-existing // last crc chunk int usedInCksum = (int)(stat.getLen() % bytesPerChecksum); int freeInCksum = bytesPerChecksum - usedInCksum; // if there is space in the last block, then we have to // append to that block if (freeInLastBlock == blockSize) { throw new IOException("The last block for file " + src + " is full."); } if (usedInCksum > 0 && freeInCksum > 0) { // if there is space in the last partial chunk, then // setup in such a way that the next packet will have only // one chunk that fills up the partial chunk. // computePacketChunkSize(0, freeInCksum); setChecksumBufSize(freeInCksum); streamer.setAppendChunk(true); } else { // if the remaining space in the block is smaller than // that expected size of of a packet, then create // smaller size packet. // computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock), bytesPerChecksum); } {code} # We do not need to compute the checksumSize for a heartbeat packet since it does not contain real data. {code} private DFSPacket createHeartbeatPacket() throws InterruptedIOException { int checksumSize = checksum4WriteBlock.getChecksumSize(); if (isLazyPersist(stat) && stat.getReplication() == 1) { checksumSize = 0; } final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN]; return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, checksumSize, false); } {code} # Instead of directly acquiring {{DataStreamer#dataQueue}}'s monitor and modifying it in DFSOutputStream, we'd better wrap it into a DataStreamer method. {code} private void queueCurrentPacket() { synchronized (streamer.dataQueue) { if (currentPacket == null) return; streamer.dataQueue.addLast(currentPacket); streamer.setLastQueuedSeqno(currentPacket.getSeqno()); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno()); } currentPacket = null; streamer.dataQueue.notifyAll(); } } {code} # Similar for {{DFSOutputStream#waitAndQueueCurrentPacket}}. In general it is not very clean to directly access streamer's dataqueue and using its monitor or call its {{wait}} method from DFSOutputStream. # What is the usage of {{toTerminate}}? Can we only use {{streamerClosed}}? > Separate class DataStreamer out of DFSOutputStream > -------------------------------------------------- > > Key: HDFS-7854 > URL: https://issues.apache.org/jira/browse/HDFS-7854 > Project: Hadoop HDFS > Issue Type: Sub-task > Reporter: Li Bo > Assignee: Li Bo > Attachments: HDFS-7854-001.patch, HDFS-7854-002.patch, > HDFS-7854-003.patch > > > This sub task separate DataStreamer from DFSOutputStream. New DataStreamer > will accept packets and write them to remote datanodes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)