http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 0000000,7509da5..6be94f3 mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@@ -1,0 -1,512 +1,517 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs; + + import java.io.BufferedInputStream; + import java.io.BufferedOutputStream; + import java.io.DataInputStream; + import java.io.DataOutputStream; + import java.io.IOException; + import java.nio.ByteBuffer; + import java.util.EnumSet; + + import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.fs.FSInputChecker; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.fs.ReadOption; + import org.apache.hadoop.hdfs.net.Peer; + import org.apache.hadoop.hdfs.protocol.Block; + import org.apache.hadoop.hdfs.protocol.DatanodeID; + import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; + import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; + import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; + import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; + import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; + import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; + import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; + import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; + import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; + import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; + import org.apache.hadoop.io.IOUtils; + import org.apache.hadoop.net.NetUtils; + import org.apache.hadoop.security.token.Token; + import org.apache.hadoop.util.DataChecksum; + import org.apache.htrace.Sampler; + import org.apache.htrace.Trace; + import org.apache.htrace.TraceScope; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + + /** + * @deprecated this is an old implementation that is being left around + * in case any issues spring up with the new {@link RemoteBlockReader2} implementation. + * It will be removed in the next release. + */ + @InterfaceAudience.Private + @Deprecated + public class RemoteBlockReader extends FSInputChecker implements BlockReader { + static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class); + + private final Peer peer; + private final DatanodeID datanodeID; + private final DataInputStream in; + private DataChecksum checksum; + + /** offset in block of the last chunk received */ + private long lastChunkOffset = -1; + private long lastChunkLen = -1; + private long lastSeqNo = -1; + + /** offset in block where reader wants to actually read */ + private long startOffset; + + private final long blockId; + + /** offset in block of of first chunk - may be less than startOffset + if startOffset is not chunk-aligned */ + private final long firstChunkOffset; + + private final int bytesPerChecksum; + private final int checksumSize; + + /** + * The total number of bytes we need to transfer from the DN. + * This is the amount that the user has requested plus some padding + * at the beginning so that the read can begin on a chunk boundary. + */ + private final long bytesNeededToFinish; + + /** + * True if we are reading from a local DataNode. + */ + private final boolean isLocal; + + private boolean eos = false; + private boolean sentStatusCode = false; + + ByteBuffer checksumBytes = null; + /** Amount of unread data in the current received packet */ + int dataLeft = 0; + + private final PeerCache peerCache; + + /* FSInputChecker interface */ + + /* same interface as inputStream java.io.InputStream#read() + * used by DFSInputStream#read() + * This violates one rule when there is a checksum error: + * "Read should not modify user buffer before successful read" + * because it first reads the data to user buffer and then checks + * the checksum. + */ + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + + // This has to be set here, *before* the skip, since we can + // hit EOS during the skip, in the case that our entire read + // is smaller than the checksum chunk. + boolean eosBefore = eos; + + //for the first read, skip the extra bytes at the front. + if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { + // Skip these bytes. But don't call this.skip()! + int toSkip = (int)(startOffset - firstChunkOffset); + if ( super.readAndDiscard(toSkip) != toSkip ) { + // should never happen + throw new IOException("Could not skip required number of bytes"); + } + } + + int nRead = super.read(buf, off, len); + + // if eos was set in the previous read, send a status code to the DN + if (eos && !eosBefore && nRead >= 0) { + if (needChecksum()) { + sendReadResult(peer, Status.CHECKSUM_OK); + } else { + sendReadResult(peer, Status.SUCCESS); + } + } + return nRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + /* How can we make sure we don't throw a ChecksumException, at least + * in majority of the cases?. This one throws. */ + long nSkipped = 0; + while (nSkipped < n) { + int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE); + int ret = readAndDiscard(toSkip); + if (ret <= 0) { + return nSkipped; + } + nSkipped += ret; + } + return nSkipped; + } + + @Override + public int read() throws IOException { + throw new IOException("read() is not expected to be invoked. " + + "Use read(buf, off, len) instead."); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + /* Checksum errors are handled outside the BlockReader. + * DFSInputStream does not always call 'seekToNewSource'. In the + * case of pread(), it just tries a different replica without seeking. + */ + return false; + } + + @Override + public void seek(long pos) throws IOException { + throw new IOException("Seek() is not supported in BlockInputChecker"); + } + + @Override + protected long getChunkPosition(long pos) { + throw new RuntimeException("getChunkPosition() is not supported, " + + "since seek is not required"); + } + + /** + * Makes sure that checksumBytes has enough capacity + * and limit is set to the number of checksum bytes needed + * to be read. + */ + private void adjustChecksumBytes(int dataLen) { + int requiredSize = + ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; + if (checksumBytes == null || requiredSize > checksumBytes.capacity()) { + checksumBytes = ByteBuffer.wrap(new byte[requiredSize]); + } else { + checksumBytes.clear(); + } + checksumBytes.limit(requiredSize); + } + + @Override + protected synchronized int readChunk(long pos, byte[] buf, int offset, + int len, byte[] checksumBuf) + throws IOException { + TraceScope scope = + Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")", + Sampler.NEVER); + try { + return readChunkImpl(pos, buf, offset, len, checksumBuf); + } finally { + scope.close(); + } + } + + private synchronized int readChunkImpl(long pos, byte[] buf, int offset, + int len, byte[] checksumBuf) + throws IOException { + // Read one chunk. + if (eos) { + // Already hit EOF + return -1; + } + + // Read one DATA_CHUNK. + long chunkOffset = lastChunkOffset; + if ( lastChunkLen > 0 ) { + chunkOffset += lastChunkLen; + } + + // pos is relative to the start of the first chunk of the read. + // chunkOffset is relative to the start of the block. + // This makes sure that the read passed from FSInputChecker is the + // for the same chunk we expect to be reading from the DN. + if ( (pos + firstChunkOffset) != chunkOffset ) { + throw new IOException("Mismatch in pos : " + pos + " + " + + firstChunkOffset + " != " + chunkOffset); + } + + // Read next packet if the previous packet has been read completely. + if (dataLeft <= 0) { + //Read packet headers. + PacketHeader header = new PacketHeader(); + header.readFields(in); + + if (LOG.isDebugEnabled()) { + LOG.debug("DFSClient readChunk got header " + header); + } + + // Sanity check the lengths + if (!header.sanityCheck(lastSeqNo)) { + throw new IOException("BlockReader: error in packet header " + + header); + } + + lastSeqNo = header.getSeqno(); + dataLeft = header.getDataLen(); + adjustChecksumBytes(header.getDataLen()); + if (header.getDataLen() > 0) { + IOUtils.readFully(in, checksumBytes.array(), 0, + checksumBytes.limit()); + } + } + + // Sanity checks + assert len >= bytesPerChecksum; + assert checksum != null; + assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0); + + + int checksumsToRead, bytesToRead; + + if (checksumSize > 0) { + + // How many chunks left in our packet - this is a ceiling + // since we may have a partial chunk at the end of the file + int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1; + + // How many chunks we can fit in databuffer + // - note this is a floor since we always read full chunks + int chunksCanFit = Math.min(len / bytesPerChecksum, + checksumBuf.length / checksumSize); + + // How many chunks should we read + checksumsToRead = Math.min(chunksLeft, chunksCanFit); + // How many bytes should we actually read + bytesToRead = Math.min( + checksumsToRead * bytesPerChecksum, // full chunks + dataLeft); // in case we have a partial + } else { + // no checksum + bytesToRead = Math.min(dataLeft, len); + checksumsToRead = 0; + } + + if ( bytesToRead > 0 ) { + // Assert we have enough space + assert bytesToRead <= len; + assert checksumBytes.remaining() >= checksumSize * checksumsToRead; + assert checksumBuf.length >= checksumSize * checksumsToRead; + IOUtils.readFully(in, buf, offset, bytesToRead); + checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead); + } + + dataLeft -= bytesToRead; + assert dataLeft >= 0; + + lastChunkOffset = chunkOffset; + lastChunkLen = bytesToRead; + + // If there's no data left in the current packet after satisfying + // this read, and we have satisfied the client read, we expect + // an empty packet header from the DN to signify this. + // Note that pos + bytesToRead may in fact be greater since the + // DN finishes off the entire last chunk. + if (dataLeft == 0 && + pos + bytesToRead >= bytesNeededToFinish) { + + // Read header + PacketHeader hdr = new PacketHeader(); + hdr.readFields(in); + + if (!hdr.isLastPacketInBlock() || + hdr.getDataLen() != 0) { + throw new IOException("Expected empty end-of-read packet! Header: " + + hdr); + } + + eos = true; + } + + if ( bytesToRead == 0 ) { + return -1; + } + + return bytesToRead; + } + + private RemoteBlockReader(String file, String bpid, long blockId, + DataInputStream in, DataChecksum checksum, boolean verifyChecksum, + long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, + DatanodeID datanodeID, PeerCache peerCache) { + // Path is used only for printing block and file information in debug + super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId + + ":" + bpid + ":of:"+ file)/*too non path-like?*/, + 1, verifyChecksum, + checksum.getChecksumSize() > 0? checksum : null, + checksum.getBytesPerChecksum(), + checksum.getChecksumSize()); + + this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. + createSocketAddr(datanodeID.getXferAddr())); + + this.peer = peer; + this.datanodeID = datanodeID; + this.in = in; + this.checksum = checksum; + this.startOffset = Math.max( startOffset, 0 ); + this.blockId = blockId; + + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read starts/ends mid-chunk. + this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); + + this.firstChunkOffset = firstChunkOffset; + lastChunkOffset = firstChunkOffset; + lastChunkLen = -1; + + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + this.peerCache = peerCache; + } + + /** + * Create a new BlockReader specifically to satisfy a read. + * This method also sends the OP_READ_BLOCK request. + * + * @param file File location + * @param block The block object + * @param blockToken The block token for security + * @param startOffset The read offset, relative to block head + * @param len The number of bytes to read + * @param bufferSize The IO buffer size (not the client buffer size) + * @param verifyChecksum Whether to verify checksum + * @param clientName Client name + * @return New BlockReader instance, or null on error. + */ + public static RemoteBlockReader newBlockReader(String file, + ExtendedBlock block, + Token<BlockTokenIdentifier> blockToken, + long startOffset, long len, + int bufferSize, boolean verifyChecksum, + String clientName, Peer peer, + DatanodeID datanodeID, + PeerCache peerCache, + CachingStrategy cachingStrategy) + throws IOException { + // in and out will be closed when sock is closed (by the caller) + final DataOutputStream out = + new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); + new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, + verifyChecksum, cachingStrategy); + + // + // Get bytes in block, set streams + // + + DataInputStream in = new DataInputStream( + new BufferedInputStream(peer.getInputStream(), bufferSize)); + + BlockOpResponseProto status = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(in)); + RemoteBlockReader2.checkSuccess(status, peer, block, file); + ReadOpChecksumInfoProto checksumInfo = + status.getReadOpChecksumInfo(); + DataChecksum checksum = DataTransferProtoUtil.fromProto( + checksumInfo.getChecksum()); + //Warning when we get CHECKSUM_NULL? + + // Read the first chunk offset. + long firstChunkOffset = checksumInfo.getChunkOffset(); + + if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || + firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { + throw new IOException("BlockReader: error in first chunk offset (" + + firstChunkOffset + ") startOffset is " + + startOffset + " for file " + file); + } + + return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), + in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, + peer, datanodeID, peerCache); + } + + @Override + public synchronized void close() throws IOException { + startOffset = -1; + checksum = null; + if (peerCache != null & sentStatusCode) { + peerCache.put(datanodeID, peer); + } else { + peer.close(); + } + + // in will be closed when its Socket is closed. + } + + @Override + public void readFully(byte[] buf, int readOffset, int amtToRead) + throws IOException { + IOUtils.readFully(this, buf, readOffset, amtToRead); + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return readFully(this, buf, offset, len); + } + + /** + * When the reader reaches end of the read, it sends a status response + * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN + * closing our connection (which we will re-open), but won't affect + * data correctness. + */ + void sendReadResult(Peer peer, Status statusCode) { + assert !sentStatusCode : "already sent status code to " + peer; + try { + RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode); + sentStatusCode = true; + } catch (IOException e) { + // It's ok not to be able to send this. But something is probably wrong. + LOG.info("Could not send read status (" + statusCode + ") to datanode " + + peer.getRemoteAddressString() + ": " + e.getMessage()); + } + } + + @Override + public int read(ByteBuffer buf) throws IOException { + throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader"); + } + + @Override + public int available() throws IOException { + // An optimistic estimate of how much data is available + // to us without doing network I/O. + return RemoteBlockReader2.TCP_WINDOW_SIZE; + } + + @Override + public boolean isLocal() { + return isLocal; + } + + @Override + public boolean isShortCircuit() { + return false; + } + + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + return null; + } ++ ++ @Override ++ public DataChecksum getDataChecksum() { ++ return checksum; ++ } + }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 0000000,5541e6d..9699442 mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@@ -1,0 -1,480 +1,485 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs; + + import java.io.BufferedOutputStream; + import java.io.DataInputStream; + import java.io.DataOutputStream; + import java.io.IOException; + import java.io.OutputStream; + import java.net.InetSocketAddress; + import java.nio.ByteBuffer; + import java.nio.channels.ReadableByteChannel; + import java.util.EnumSet; + import java.util.UUID; + + import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.fs.ReadOption; + import org.apache.hadoop.hdfs.net.Peer; + import org.apache.hadoop.hdfs.protocol.DatanodeID; + import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; + import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; + import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; + import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; + import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; + import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; + import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; + import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; + import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; + import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; + import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; + import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; + import org.apache.hadoop.net.NetUtils; + import org.apache.hadoop.security.token.Token; + import org.apache.hadoop.util.DataChecksum; + import org.apache.htrace.Sampler; + import org.apache.htrace.Trace; + import org.apache.htrace.TraceScope; + + import com.google.common.annotations.VisibleForTesting; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * This is a wrapper around connection to datanode + * and understands checksum, offset etc. + * + * Terminology: + * <dl> + * <dt>block</dt> + * <dd>The hdfs block, typically large (~64MB). + * </dd> + * <dt>chunk</dt> + * <dd>A block is divided into chunks, each comes with a checksum. + * We want transfers to be chunk-aligned, to be able to + * verify checksums. + * </dd> + * <dt>packet</dt> + * <dd>A grouping of chunks used for transport. It contains a + * header, followed by checksum data, followed by real data. + * </dd> + * </dl> + * Please see DataNode for the RPC specification. + * + * This is a new implementation introduced in Hadoop 0.23 which + * is more efficient and simpler than the older BlockReader + * implementation. It should be renamed to RemoteBlockReader + * once we are confident in it. + */ + @InterfaceAudience.Private + public class RemoteBlockReader2 implements BlockReader { + + static final Logger LOG = LoggerFactory.getLogger(RemoteBlockReader2.class); + static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB; + + final private Peer peer; + final private DatanodeID datanodeID; + final private PeerCache peerCache; + final private long blockId; + private final ReadableByteChannel in; + + private DataChecksum checksum; + private final PacketReceiver packetReceiver = new PacketReceiver(true); + + private ByteBuffer curDataSlice = null; + + /** offset in block of the last chunk received */ + private long lastSeqNo = -1; + + /** offset in block where reader wants to actually read */ + private long startOffset; + private final String filename; + + private final int bytesPerChecksum; + private final int checksumSize; + + /** + * The total number of bytes we need to transfer from the DN. + * This is the amount that the user has requested plus some padding + * at the beginning so that the read can begin on a chunk boundary. + */ + private long bytesNeededToFinish; + + /** + * True if we are reading from a local DataNode. + */ + private final boolean isLocal; + + private final boolean verifyChecksum; + + private boolean sentStatusCode = false; + + @VisibleForTesting + public Peer getPeer() { + return peer; + } + + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + + UUID randomId = null; + if (LOG.isTraceEnabled()) { + randomId = UUID.randomUUID(); + LOG.trace(String.format("Starting read #%s file %s from datanode %s", + randomId.toString(), this.filename, + this.datanodeID.getHostName())); + } + + if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + TraceScope scope = Trace.startSpan( + "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER); + try { + readNextPacket(); + } finally { + scope.close(); + } + } + + if (LOG.isTraceEnabled()) { + LOG.trace(String.format("Finishing read #" + randomId)); + } + + if (curDataSlice.remaining() == 0) { + // we're at EOF now + return -1; + } + + int nRead = Math.min(curDataSlice.remaining(), len); + curDataSlice.get(buf, off, nRead); + + return nRead; + } + + + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + TraceScope scope = Trace.startSpan( + "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER); + try { + readNextPacket(); + } finally { + scope.close(); + } + } + if (curDataSlice.remaining() == 0) { + // we're at EOF now + return -1; + } + + int nRead = Math.min(curDataSlice.remaining(), buf.remaining()); + ByteBuffer writeSlice = curDataSlice.duplicate(); + writeSlice.limit(writeSlice.position() + nRead); + buf.put(writeSlice); + curDataSlice.position(writeSlice.position()); + + return nRead; + } + + private void readNextPacket() throws IOException { + //Read packet headers. + packetReceiver.receiveNextPacket(in); + + PacketHeader curHeader = packetReceiver.getHeader(); + curDataSlice = packetReceiver.getDataSlice(); + assert curDataSlice.capacity() == curHeader.getDataLen(); + + if (LOG.isTraceEnabled()) { + LOG.trace("DFSClient readNextPacket got header " + curHeader); + } + + // Sanity check the lengths + if (!curHeader.sanityCheck(lastSeqNo)) { + throw new IOException("BlockReader: error in packet header " + + curHeader); + } + + if (curHeader.getDataLen() > 0) { + int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; + int checksumsLen = chunks * checksumSize; + + assert packetReceiver.getChecksumSlice().capacity() == checksumsLen : + "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + + " checksumsLen=" + checksumsLen; + + lastSeqNo = curHeader.getSeqno(); + if (verifyChecksum && curDataSlice.remaining() > 0) { + // N.B.: the checksum error offset reported here is actually + // relative to the start of the block, not the start of the file. + // This is slightly misleading, but preserves the behavior from + // the older BlockReader. + checksum.verifyChunkedSums(curDataSlice, + packetReceiver.getChecksumSlice(), + filename, curHeader.getOffsetInBlock()); + } + bytesNeededToFinish -= curHeader.getDataLen(); + } + + // First packet will include some data prior to the first byte + // the user requested. Skip it. + if (curHeader.getOffsetInBlock() < startOffset) { + int newPos = (int) (startOffset - curHeader.getOffsetInBlock()); + curDataSlice.position(newPos); + } + + // If we've now satisfied the whole client read, read one last packet + // header, which should be empty + if (bytesNeededToFinish <= 0) { + readTrailingEmptyPacket(); + if (verifyChecksum) { + sendReadResult(Status.CHECKSUM_OK); + } else { + sendReadResult(Status.SUCCESS); + } + } + } + + @Override + public synchronized long skip(long n) throws IOException { + /* How can we make sure we don't throw a ChecksumException, at least + * in majority of the cases?. This one throws. */ + long skipped = 0; + while (skipped < n) { + long needToSkip = n - skipped; + if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + readNextPacket(); + } + if (curDataSlice.remaining() == 0) { + // we're at EOF now + break; + } + + int skip = (int)Math.min(curDataSlice.remaining(), needToSkip); + curDataSlice.position(curDataSlice.position() + skip); + skipped += skip; + } + return skipped; + } + + private void readTrailingEmptyPacket() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Reading empty packet at end of read"); + } + + packetReceiver.receiveNextPacket(in); + + PacketHeader trailer = packetReceiver.getHeader(); + if (!trailer.isLastPacketInBlock() || + trailer.getDataLen() != 0) { + throw new IOException("Expected empty end-of-read packet! Header: " + + trailer); + } + } + + protected RemoteBlockReader2(String file, String bpid, long blockId, + DataChecksum checksum, boolean verifyChecksum, + long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, + DatanodeID datanodeID, PeerCache peerCache) { + this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. + createSocketAddr(datanodeID.getXferAddr())); + // Path is used only for printing block and file information in debug + this.peer = peer; + this.datanodeID = datanodeID; + this.in = peer.getInputStreamChannel(); + this.checksum = checksum; + this.verifyChecksum = verifyChecksum; + this.startOffset = Math.max( startOffset, 0 ); + this.filename = file; + this.peerCache = peerCache; + this.blockId = blockId; + + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read starts/ends mid-chunk. + this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + } + + + @Override + public synchronized void close() throws IOException { + packetReceiver.close(); + startOffset = -1; + checksum = null; + if (peerCache != null && sentStatusCode) { + peerCache.put(datanodeID, peer); + } else { + peer.close(); + } + + // in will be closed when its Socket is closed. + } + + /** + * When the reader reaches end of the read, it sends a status response + * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN + * closing our connection (which we will re-open), but won't affect + * data correctness. + */ + void sendReadResult(Status statusCode) { + assert !sentStatusCode : "already sent status code to " + peer; + try { + writeReadResult(peer.getOutputStream(), statusCode); + sentStatusCode = true; + } catch (IOException e) { + // It's ok not to be able to send this. But something is probably wrong. + LOG.info("Could not send read status (" + statusCode + ") to datanode " + + peer.getRemoteAddressString() + ": " + e.getMessage()); + } + } + + /** + * Serialize the actual read result on the wire. + */ + static void writeReadResult(OutputStream out, Status statusCode) + throws IOException { + + ClientReadStatusProto.newBuilder() + .setStatus(statusCode) + .build() + .writeDelimitedTo(out); + + out.flush(); + } + + /** + * File name to print when accessing a block directly (from servlets) + * @param s Address of the block location + * @param poolId Block pool ID of the block + * @param blockId Block ID of the block + * @return string that has a file name for debug purposes + */ + public static String getFileName(final InetSocketAddress s, + final String poolId, final long blockId) { + return s.toString() + ":" + poolId + ":" + blockId; + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public void readFully(byte[] buf, int off, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, off, len); + } + + /** + * Create a new BlockReader specifically to satisfy a read. + * This method also sends the OP_READ_BLOCK request. + * + * @param file File location + * @param block The block object + * @param blockToken The block token for security + * @param startOffset The read offset, relative to block head + * @param len The number of bytes to read + * @param verifyChecksum Whether to verify checksum + * @param clientName Client name + * @param peer The Peer to use + * @param datanodeID The DatanodeID this peer is connected to + * @return New BlockReader instance, or null on error. + */ + public static BlockReader newBlockReader(String file, + ExtendedBlock block, + Token<BlockTokenIdentifier> blockToken, + long startOffset, long len, + boolean verifyChecksum, + String clientName, + Peer peer, DatanodeID datanodeID, + PeerCache peerCache, + CachingStrategy cachingStrategy) throws IOException { + // in and out will be closed when sock is closed (by the caller) + final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( + peer.getOutputStream())); + new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, + verifyChecksum, cachingStrategy); + + // + // Get bytes in block + // + DataInputStream in = new DataInputStream(peer.getInputStream()); + + BlockOpResponseProto status = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(in)); + checkSuccess(status, peer, block, file); + ReadOpChecksumInfoProto checksumInfo = + status.getReadOpChecksumInfo(); + DataChecksum checksum = DataTransferProtoUtil.fromProto( + checksumInfo.getChecksum()); + //Warning when we get CHECKSUM_NULL? + + // Read the first chunk offset. + long firstChunkOffset = checksumInfo.getChunkOffset(); + + if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || + firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { + throw new IOException("BlockReader: error in first chunk offset (" + + firstChunkOffset + ") startOffset is " + + startOffset + " for file " + file); + } + + return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), + checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, + datanodeID, peerCache); + } + + static void checkSuccess( + BlockOpResponseProto status, Peer peer, + ExtendedBlock block, String file) + throws IOException { + String logInfo = "for OP_READ_BLOCK" + + ", self=" + peer.getLocalAddressString() + + ", remote=" + peer.getRemoteAddressString() + + ", for file " + file + + ", for pool " + block.getBlockPoolId() + + " block " + block.getBlockId() + "_" + block.getGenerationStamp(); + DataTransferProtoUtil.checkBlockOpStatus(status, logInfo); + } + + @Override + public int available() throws IOException { + // An optimistic estimate of how much data is available + // to us without doing network I/O. + return TCP_WINDOW_SIZE; + } + + @Override + public boolean isLocal() { + return isLocal; + } + + @Override + public boolean isShortCircuit() { + return false; + } + + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + return null; + } ++ ++ @Override ++ public DataChecksum getDataChecksum() { ++ return checksum; ++ } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 36da863,9f26ca3..97445a6 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@@ -961,8 -953,8 +961,8 @@@ public class ClientNamenodeProtocolServ RpcController controller, UpdateBlockForPipelineRequestProto req) throws ServiceException { try { - LocatedBlockProto result = PBHelper.convert(server - .updateBlockForPipeline(PBHelperClient.convert(req.getBlock()), + LocatedBlockProto result = PBHelper.convertLocatedBlock( - server.updateBlockForPipeline(PBHelper.convert(req.getBlock()), ++ server.updateBlockForPipeline(PBHelperClient.convert(req.getBlock()), req.getClientName())); return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(result) .build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index f292ee8,6f16d83..f419c46 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@@ -23,18 -23,11 +23,16 @@@ import static org.apache.hadoop.hdfs.pr import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherSuiteProto; import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CryptoProtocolVersionProto; - import java.io.EOFException; import java.io.IOException; - import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; @@@ -134,13 -122,9 +131,14 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ErasureCodingZoneProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; ++import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto; @@@ -233,15 -214,11 +230,12 @@@ import org.apache.hadoop.hdfs.server.pr import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; - import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; - import org.apache.hadoop.hdfs.util.ExactSizeInputStream; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; - import org.apache.hadoop.util.DataChecksum; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@@ -784,23 -726,9 +771,23 @@@ public class PBHelper } } - LocatedBlock lb = new LocatedBlock(PBHelperClient.convert(proto.getB()), targets, - storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(), - cachedLocs.toArray(new DatanodeInfo[0])); + final LocatedBlock lb; + if (indices == null) { - lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets, storageIDs, - storageTypes, proto.getOffset(), proto.getCorrupt(), ++ lb = new LocatedBlock(PBHelperClient.convert(proto.getB()), targets, ++ storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(), + cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()])); + } else { - lb = new LocatedStripedBlock(PBHelper.convert(proto.getB()), targets, ++ lb = new LocatedStripedBlock(PBHelperClient.convert(proto.getB()), targets, + storageIDs, storageTypes, indices, proto.getOffset(), + proto.getCorrupt(), + cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()])); + List<TokenProto> tokenProtos = proto.getBlockTokensList(); + Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length]; + for (int i = 0; i < indices.length; i++) { + blockTokens[i] = PBHelper.convert(tokenProtos.get(i)); + } + ((LocatedStripedBlock) lb).setBlockTokens(blockTokens); + } lb.setBlockToken(PBHelper.convert(proto.getBlockToken())); return lb; @@@ -2954,192 -2860,4 +2935,192 @@@ setLeaseId(context.getLeaseId()). build(); } + + public static ECSchema convertECSchema(ECSchemaProto schema) { + List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList(); + Map<String, String> options = new HashMap<>(optionsList.size()); + for (ECSchemaOptionEntryProto option : optionsList) { + options.put(option.getKey(), option.getValue()); + } + return new ECSchema(schema.getCodecName(), schema.getDataUnits(), + schema.getParityUnits(), options); + } + + public static ECSchemaProto convertECSchema(ECSchema schema) { + ECSchemaProto.Builder builder = ECSchemaProto.newBuilder() + .setCodecName(schema.getCodecName()) + .setDataUnits(schema.getNumDataUnits()) + .setParityUnits(schema.getNumParityUnits()); + Set<Entry<String, String>> entrySet = schema.getExtraOptions().entrySet(); + for (Entry<String, String> entry : entrySet) { + builder.addOptions(ECSchemaOptionEntryProto.newBuilder() + .setKey(entry.getKey()).setValue(entry.getValue()).build()); + } + return builder.build(); + } + + public static ErasureCodingPolicy convertErasureCodingPolicy( + ErasureCodingPolicyProto policy) { + return new ErasureCodingPolicy(policy.getName(), + convertECSchema(policy.getSchema()), + policy.getCellSize()); + } + + public static ErasureCodingPolicyProto convertErasureCodingPolicy( + ErasureCodingPolicy policy) { + ErasureCodingPolicyProto.Builder builder = ErasureCodingPolicyProto + .newBuilder() + .setName(policy.getName()) + .setSchema(convertECSchema(policy.getSchema())) + .setCellSize(policy.getCellSize()); + return builder.build(); + } + + public static ErasureCodingZoneProto convertErasureCodingZone( + ErasureCodingZone ecZone) { + return ErasureCodingZoneProto.newBuilder().setDir(ecZone.getDir()) + .setEcPolicy(convertErasureCodingPolicy(ecZone.getErasureCodingPolicy())) + .build(); + } + + public static ErasureCodingZone convertErasureCodingZone( + ErasureCodingZoneProto ecZoneProto) { + return new ErasureCodingZone(ecZoneProto.getDir(), + convertErasureCodingPolicy(ecZoneProto.getEcPolicy())); + } + + public static BlockECRecoveryInfo convertBlockECRecoveryInfo( + BlockECRecoveryInfoProto blockEcRecoveryInfoProto) { + ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock(); - ExtendedBlock block = convert(blockProto); ++ ExtendedBlock block = PBHelperClient.convert(blockProto); + + DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto + .getSourceDnInfos(); + DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto); + + DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto + .getTargetDnInfos(); + DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto); + + StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto + .getTargetStorageUuids(); + String[] targetStorageUuids = convert(targetStorageUuidsProto); + + StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto + .getTargetStorageTypes(); + StorageType[] convertStorageTypes = convertStorageTypes( + targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto + .getStorageTypesList().size()); + + List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto + .getLiveBlockIndicesList(); + short[] liveBlkIndices = new short[liveBlockIndicesList.size()]; + for (int i = 0; i < liveBlockIndicesList.size(); i++) { + liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue(); + } + + ErasureCodingPolicy ecPolicy = + convertErasureCodingPolicy(blockEcRecoveryInfoProto.getEcPolicy()); + + return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos, + targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy); + } + + public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo( + BlockECRecoveryInfo blockEcRecoveryInfo) { + BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto + .newBuilder(); + builder.setBlock(PBHelperClient.convert( + blockEcRecoveryInfo.getExtendedBlock())); + + DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos(); + builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos)); + + DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos(); + builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos)); + + String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs(); + builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs)); + + StorageType[] targetStorageTypes = blockEcRecoveryInfo + .getTargetStorageTypes(); + builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes)); + + short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices(); + builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices)); + + builder.setEcPolicy(convertErasureCodingPolicy(blockEcRecoveryInfo + .getErasureCodingPolicy())); + + return builder.build(); + } + + private static List<Integer> convertIntArray(short[] liveBlockIndices) { + List<Integer> liveBlockIndicesList = new ArrayList<Integer>(); + for (short s : liveBlockIndices) { + liveBlockIndicesList.add((int) s); + } + return liveBlockIndicesList; + } + + private static StorageTypesProto convertStorageTypesProto( + StorageType[] targetStorageTypes) { + StorageTypesProto.Builder builder = StorageTypesProto.newBuilder(); + for (StorageType storageType : targetStorageTypes) { + builder.addStorageTypes(PBHelperClient.convertStorageType(storageType)); + } + return builder.build(); + } + + private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) { + StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder(); + for (String storageUuid : targetStorageIDs) { + builder.addStorageUuids(storageUuid); + } + return builder.build(); + } + + private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) { + DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder(); + for (DatanodeInfo datanodeInfo : dnInfos) { + builder.addDatanodes(PBHelperClient.convert(datanodeInfo)); + } + return builder.build(); + } + + private static String[] convert(StorageUuidsProto targetStorageUuidsProto) { + List<String> storageUuidsList = targetStorageUuidsProto + .getStorageUuidsList(); + String[] storageUuids = new String[storageUuidsList.size()]; + for (int i = 0; i < storageUuidsList.size(); i++) { + storageUuids[i] = storageUuidsList.get(i); + } + return storageUuids; + } + + public static BlockECRecoveryCommandProto convert( + BlockECRecoveryCommand blkECRecoveryCmd) { + BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto + .newBuilder(); + Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd + .getECTasks(); + for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) { + builder + .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo)); + } + return builder.build(); + } + + public static BlockECRecoveryCommand convert( + BlockECRecoveryCommandProto blkECRecoveryCmdProto) { + Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<BlockECRecoveryInfo>(); + List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto + .getBlockECRecoveryinfoList(); + for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) { + blkECRecoveryInfos + .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto)); + } + return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, + blkECRecoveryInfos); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index dc296ac,810784d..92a1135 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@@ -30,11 -31,11 +31,12 @@@ import org.apache.hadoop.util.LightWeig import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID; /** - * BlockInfo class maintains for a given block - * the {@link BlockCollection} it is part of and datanodes where the replicas of - * the block are stored. + * For a given block (or an erasure coding block group), BlockInfo class + * maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes + * where the replicas of the block, or blocks belonging to the erasure coding + * block group, are stored. */ + @InterfaceAudience.Private public abstract class BlockInfo extends Block implements LightWeightGSet.LinkedElement { @@@ -203,17 -206,6 +205,11 @@@ */ abstract boolean removeStorage(DatanodeStorageInfo storage); - /** - * Replace the current BlockInfo with the new one in corresponding - * DatanodeStorageInfo's linked list - */ - abstract void replaceBlock(BlockInfo newBlock); - + public abstract boolean isStriped(); + + /** @return true if there is no datanode storage associated with the block */ + abstract boolean hasNoStorage(); + /** * Find specified DatanodeStorageInfo. * @return DatanodeStorageInfo or null if not found. http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index b9d8486,94fb222..746e298 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@@ -95,29 -104,4 +95,14 @@@ public class BlockInfoContiguous extend } return 0; } + + @Override - void replaceBlock(BlockInfo newBlock) { - assert newBlock instanceof BlockInfoContiguous; - for (int i = this.numNodes() - 1; i >= 0; i--) { - final DatanodeStorageInfo storage = this.getStorageInfo(i); - final boolean removed = storage.removeBlock(this); - assert removed : "currentBlock not found."; - - final DatanodeStorageInfo.AddBlockResult result = storage.addBlock( - newBlock, newBlock); - assert result == DatanodeStorageInfo.AddBlockResult.ADDED : - "newBlock already exists."; - } - } - - @Override + public final boolean isStriped() { + return false; + } + + @Override + final boolean hasNoStorage() { + return getStorageInfo(0) == null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index 7b21cbe,0000000..df48655 mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@@ -1,253 -1,0 +1,234 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + +/** + * Subclass of {@link BlockInfo}, presenting a block group in erasure coding. + * + * We still use triplets to store DatanodeStorageInfo for each block in the + * block group, as well as the previous/next block in the corresponding + * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units + * are sorted and strictly mapped to the corresponding block. + * + * Normally each block belonging to group is stored in only one DataNode. + * However, it is possible that some block is over-replicated. Thus the triplet + * array's size can be larger than (m+k). Thus currently we use an extra byte + * array to record the block index for each triplet. + */ +public class BlockInfoStriped extends BlockInfo { + private final ErasureCodingPolicy ecPolicy; + /** + * Always the same size with triplets. Record the block index for each triplet + * TODO: actually this is only necessary for over-replicated block. Thus can + * be further optimized to save memory usage. + */ + private byte[] indices; + + public BlockInfoStriped(Block blk, ErasureCodingPolicy ecPolicy) { + super(blk, (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits())); + indices = new byte[ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()]; + initIndices(); + this.ecPolicy = ecPolicy; + } + + public short getTotalBlockNum() { + return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()); + } + + public short getDataBlockNum() { + return (short) ecPolicy.getNumDataUnits(); + } + + public short getParityBlockNum() { + return (short) ecPolicy.getNumParityUnits(); + } + + /** + * If the block is committed/completed and its length is less than a full + * stripe, it returns the the number of actual data blocks. + * Otherwise it returns the number of data units specified by erasure coding policy. + */ + public short getRealDataBlockNum() { + if (isComplete() || getBlockUCState() == BlockUCState.COMMITTED) { + return (short) Math.min(getDataBlockNum(), + (getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1); + } else { + return getDataBlockNum(); + } + } + + public short getRealTotalBlockNum() { + return (short) (getRealDataBlockNum() + getParityBlockNum()); + } + + public ErasureCodingPolicy getErasureCodingPolicy() { + return ecPolicy; + } + + private void initIndices() { + for (int i = 0; i < indices.length; i++) { + indices[i] = -1; + } + } + + private int findSlot() { + int i = getTotalBlockNum(); + for (; i < getCapacity(); i++) { + if (getStorageInfo(i) == null) { + return i; + } + } + // need to expand the triplet size + ensureCapacity(i + 1, true); + return i; + } + + @Override + boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { + int blockIndex = BlockIdManager.getBlockIndex(reportedBlock); + int index = blockIndex; + DatanodeStorageInfo old = getStorageInfo(index); + if (old != null && !old.equals(storage)) { // over replicated + // check if the storage has been stored + int i = findStorageInfo(storage); + if (i == -1) { + index = findSlot(); + } else { + return true; + } + } + addStorage(storage, index, blockIndex); + return true; + } + + private void addStorage(DatanodeStorageInfo storage, int index, + int blockIndex) { + setStorageInfo(index, storage); + setNext(index, null); + setPrevious(index, null); + indices[index] = (byte) blockIndex; + } + + private int findStorageInfoFromEnd(DatanodeStorageInfo storage) { + final int len = getCapacity(); + for(int idx = len - 1; idx >= 0; idx--) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if (storage.equals(cur)) { + return idx; + } + } + return -1; + } + + int getStorageBlockIndex(DatanodeStorageInfo storage) { + int i = this.findStorageInfo(storage); + return i == -1 ? -1 : indices[i]; + } + + /** + * Identify the block stored in the given datanode storage. Note that + * the returned block has the same block Id with the one seen/reported by the + * DataNode. + */ + Block getBlockOnStorage(DatanodeStorageInfo storage) { + int index = getStorageBlockIndex(storage); + if (index < 0) { + return null; + } else { + Block block = new Block(this); + block.setBlockId(this.getBlockId() + index); + return block; + } + } + + @Override + boolean removeStorage(DatanodeStorageInfo storage) { + int dnIndex = findStorageInfoFromEnd(storage); + if (dnIndex < 0) { // the node is not found + return false; + } + assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : + "Block is still in the list and must be removed first."; + // set the triplet to null + setStorageInfo(dnIndex, null); + setNext(dnIndex, null); + setPrevious(dnIndex, null); + indices[dnIndex] = -1; + return true; + } + + private void ensureCapacity(int totalSize, boolean keepOld) { + if (getCapacity() < totalSize) { + Object[] old = triplets; + byte[] oldIndices = indices; + triplets = new Object[totalSize * 3]; + indices = new byte[totalSize]; + initIndices(); + + if (keepOld) { + System.arraycopy(old, 0, triplets, 0, old.length); + System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length); + } + } + } + - @Override - void replaceBlock(BlockInfo newBlock) { - assert newBlock instanceof BlockInfoStriped; - BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock; - final int size = getCapacity(); - newBlockGroup.ensureCapacity(size, false); - for (int i = 0; i < size; i++) { - final DatanodeStorageInfo storage = this.getStorageInfo(i); - if (storage != null) { - final int blockIndex = indices[i]; - final boolean removed = storage.removeBlock(this); - assert removed : "currentBlock not found."; - - newBlockGroup.addStorage(storage, i, blockIndex); - storage.insertToList(newBlockGroup); - } - } - } - + public long spaceConsumed() { + // In case striped blocks, total usage by this striped blocks should + // be the total of data blocks and parity blocks because + // `getNumBytes` is the total of actual data block size. + return StripedBlockUtil.spaceConsumedByStripedBlock(getNumBytes(), + ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits(), + BLOCK_STRIPED_CELL_SIZE); + } + + @Override + public final boolean isStriped() { + return true; + } + + @Override + public int numNodes() { + assert this.triplets != null : "BlockInfo is not initialized"; + assert triplets.length % 3 == 0 : "Malformed BlockInfo"; + int num = 0; + for (int idx = getCapacity()-1; idx >= 0; idx--) { + if (getStorageInfo(idx) != null) { + num++; + } + } + return num; + } + + @Override + final boolean hasNoStorage() { + final int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + if (getStorageInfo(idx) != null) { + return false; + } + } + return true; + } +}
