Repository: hadoop Updated Branches: refs/heads/branch-2 87d013370 -> a727c6db0
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java new file mode 100644 index 0000000..17365fb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.io.EOFException; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; + +/** + * An InputStream implementations which reads from some other InputStream + * but expects an exact number of bytes. Any attempts to read past the + * specified number of bytes will return as if the end of the stream + * was reached. If the end of the underlying stream is reached prior to + * the specified number of bytes, an EOFException is thrown. + */ [email protected] [email protected] +public class ExactSizeInputStream extends FilterInputStream { + private int remaining; + + /** + * Construct an input stream that will read no more than + * 'numBytes' bytes. + * + * If an EOF occurs on the underlying stream before numBytes + * bytes have been read, an EOFException will be thrown. + * + * @param in the inputstream to wrap + * @param numBytes the number of bytes to read + */ + public ExactSizeInputStream(InputStream in, int numBytes) { + super(in); + Preconditions.checkArgument(numBytes >= 0, + "Negative expected bytes: ", numBytes); + this.remaining = numBytes; + } + + @Override + public int available() throws IOException { + return Math.min(super.available(), remaining); + } + + @Override + public int read() throws IOException { + // EOF if we reached our limit + if (remaining <= 0) { + return -1; + } + final int result = super.read(); + if (result >= 0) { + --remaining; + } else if (remaining > 0) { + // Underlying stream reached EOF but we haven't read the expected + // number of bytes. + throw new EOFException( + "Premature EOF. Expected " + remaining + "more bytes"); + } + return result; + } + + @Override + public int read(final byte[] b, final int off, int len) + throws IOException { + if (remaining <= 0) { + return -1; + } + len = Math.min(len, remaining); + final int result = super.read(b, off, len); + if (result >= 0) { + remaining -= result; + } else if (remaining > 0) { + // Underlying stream reached EOF but we haven't read the expected + // number of bytes. + throw new EOFException( + "Premature EOF. Expected " + remaining + "more bytes"); + } + return result; + } + + @Override + public long skip(final long n) throws IOException { + final long result = super.skip(Math.min(n, remaining)); + if (result > 0) { + remaining -= result; + } else if (remaining > 0) { + // Underlying stream reached EOF but we haven't read the expected + // number of bytes. + throw new EOFException( + "Premature EOF. Expected " + remaining + "more bytes"); + } + return result; + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readlimit) { + throw new UnsupportedOperationException(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index f25fb1b..c7233bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -153,7 +153,7 @@ public class TestPBHelper { @Test public void testConvertDatanodeID() { DatanodeID dn = DFSTestUtil.getLocalDatanodeID(); - DatanodeIDProto dnProto = PBHelperClient.convert(dn); + DatanodeIDProto dnProto = PBHelper.convert(dn); DatanodeID dn2 = PBHelper.convert(dnProto); compare(dn, dn2); } @@ -332,12 +332,12 @@ public class TestPBHelper { @Test public void testConvertExtendedBlock() { ExtendedBlock b = getExtendedBlock(); - ExtendedBlockProto bProto = PBHelperClient.convert(b); + ExtendedBlockProto bProto = PBHelper.convert(b); ExtendedBlock b1 = PBHelper.convert(bProto); assertEquals(b, b1); b.setBlockId(-1); - bProto = PBHelperClient.convert(b); + bProto = PBHelper.convert(b); b1 = PBHelper.convert(bProto); assertEquals(b, b1); } @@ -398,7 +398,7 @@ public class TestPBHelper { Token<BlockTokenIdentifier> token = new Token<BlockTokenIdentifier>( "identifier".getBytes(), "password".getBytes(), new Text("kind"), new Text("service")); - TokenProto tokenProto = PBHelperClient.convert(token); + TokenProto tokenProto = PBHelper.convert(token); Token<BlockTokenIdentifier> token2 = PBHelper.convert(tokenProto); compare(token, token2); } @@ -592,16 +592,16 @@ public class TestPBHelper { @Test public void testChecksumTypeProto() { assertEquals(DataChecksum.Type.NULL, - PBHelperClient.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL)); + PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL)); assertEquals(DataChecksum.Type.CRC32, - PBHelperClient.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32)); + PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32)); assertEquals(DataChecksum.Type.CRC32C, - PBHelperClient.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C)); - assertEquals(PBHelperClient.convert(DataChecksum.Type.NULL), + PBHelper.convert(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C)); + assertEquals(PBHelper.convert(DataChecksum.Type.NULL), HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL); - assertEquals(PBHelperClient.convert(DataChecksum.Type.CRC32), + assertEquals(PBHelper.convert(DataChecksum.Type.CRC32), HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32); - assertEquals(PBHelperClient.convert(DataChecksum.Type.CRC32C), + assertEquals(PBHelper.convert(DataChecksum.Type.CRC32C), HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C); }
