PARQUET-1142: Add alternatives to Hadoop classes in the API This updates the read and write paths to avoid using Hadoop classes where possible.
* Adds a generic compression interface, `CompressionCodecFactory` * Adds `OutputFile` and `PositionOutputStream` * Adds classes to help implementations wrap input and output streams: `DelegatingSeekableInputStream` and `DelegatingPositionOutputStream` * Adds `ParquetReadOptions` to avoid passing options with `Configuration` * Updates the read and write APIs to use new abstractions instead of Hadoop Author: Ryan Blue <b...@apache.org> Closes #429 from rdblue/PARQUET-1142-add-hadoop-alternatives and squashes the following commits: 21500337b [Ryan Blue] PARQUET-1142: Fix NPE when not filtering with new read API. 35eddd735 [Ryan Blue] PARQUET-1142: Fix problems from Gabor's review. da391b0d4 [Ryan Blue] PARQUET-1142: Fix binary incompatibilities. 2e3d693ab [Ryan Blue] PARQUET-1142: Update the read and write paths to use new files and streams. 8d57e089f [Ryan Blue] PARQUET-1142: Add OutputFile and PositionOutputStream. 42908a95e [Ryan Blue] PARQUET-1142: Extract non-Hadoop API from CodecFactory. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/8bfd9b4d Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/8bfd9b4d Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/8bfd9b4d Branch: refs/heads/master Commit: 8bfd9b4d8f4fb0a2b522c9328f67eb642066306b Parents: 81f4801 Author: Ryan Blue <b...@apache.org> Authored: Wed Dec 13 11:27:54 2017 -0800 Committer: Ryan Blue <b...@apache.org> Committed: Wed Dec 13 11:27:54 2017 -0800 ---------------------------------------------------------------------- parquet-common/pom.xml | 6 + .../org/apache/parquet/bytes/BytesInput.java | 486 +++++++++++ .../bytes/CapacityByteArrayOutputStream.java | 337 ++++++++ .../bytes/ConcatenatingByteArrayCollector.java | 63 ++ .../bytes/LittleEndianDataInputStream.java | 424 +++++++++ .../bytes/LittleEndianDataOutputStream.java | 220 +++++ .../compression/CompressionCodecFactory.java | 47 + .../CompressionCodecNotSupportedException.java | 38 + .../hadoop/metadata/CompressionCodecName.java | 98 +++ .../io/DelegatingPositionOutputStream.java | 63 ++ .../io/DelegatingSeekableInputStream.java | 171 ++++ .../java/org/apache/parquet/io/InputFile.java | 9 +- .../java/org/apache/parquet/io/OutputFile.java | 34 + .../apache/parquet/io/PositionOutputStream.java | 39 + .../org/apache/parquet/io/MockInputStream.java | 56 ++ .../io/TestDelegatingSeekableInputStream.java | 861 +++++++++++++++++++ .../org/apache/parquet/bytes/BytesInput.java | 486 ----------- .../bytes/CapacityByteArrayOutputStream.java | 337 -------- .../bytes/ConcatenatingByteArrayCollector.java | 63 -- .../bytes/LittleEndianDataInputStream.java | 424 --------- .../bytes/LittleEndianDataOutputStream.java | 220 ----- .../org/apache/parquet/HadoopReadOptions.java | 98 +++ .../org/apache/parquet/ParquetReadOptions.java | 232 +++++ .../parquet/filter2/compat/RowGroupFilter.java | 4 + .../converter/ParquetMetadataConverter.java | 22 +- .../org/apache/parquet/hadoop/CodecFactory.java | 26 +- .../hadoop/ColumnChunkPageReadStore.java | 6 +- .../parquet/hadoop/DirectCodecFactory.java | 12 +- .../hadoop/InternalParquetRecordReader.java | 34 +- .../parquet/hadoop/ParquetFileReader.java | 254 +++--- .../parquet/hadoop/ParquetFileWriter.java | 147 ++-- .../parquet/hadoop/ParquetInputFormat.java | 3 - .../parquet/hadoop/ParquetOutputFormat.java | 5 +- .../apache/parquet/hadoop/ParquetReader.java | 174 +++- .../parquet/hadoop/ParquetRecordReader.java | 26 +- .../apache/parquet/hadoop/ParquetWriter.java | 50 +- .../hadoop/UnmaterializableRecordCounter.java | 15 + .../CompressionCodecNotSupportedException.java | 36 - .../hadoop/metadata/CompressionCodecName.java | 98 --- .../hadoop/util/H1SeekableInputStream.java | 101 +-- .../hadoop/util/H2SeekableInputStream.java | 20 +- .../parquet/hadoop/util/HadoopCodecs.java | 39 + .../parquet/hadoop/util/HadoopOutputFile.java | 100 +++ .../hadoop/util/HadoopPositionOutputStream.java | 66 ++ .../parquet/hadoop/util/HadoopStreams.java | 15 + .../TestInputOutputFormatWithPadding.java | 6 +- .../parquet/hadoop/TestParquetFileWriter.java | 1 + .../hadoop/util/MockHadoopInputStream.java | 87 ++ .../parquet/hadoop/util/MockInputStream.java | 87 -- .../hadoop/util/TestHadoop1ByteBufferReads.java | 761 ---------------- .../hadoop/util/TestHadoop2ByteBufferReads.java | 30 +- .../parquet/tools/command/MergeCommand.java | 3 +- pom.xml | 9 +- 53 files changed, 4158 insertions(+), 2891 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/pom.xml ---------------------------------------------------------------------- diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml index b0357ba..7ae6068 100644 --- a/parquet-common/pom.xml +++ b/parquet-common/pom.xml @@ -37,6 +37,12 @@ <dependencies> <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-format</artifactId> + <version>${parquet.format.version}</version> + </dependency> + + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java new file mode 100644 index 0000000..6e593c2 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A source of bytes capable of writing itself to an output. + * A BytesInput should be consumed right away. + * It is not a container. + * For example if it is referring to a stream, + * subsequent BytesInput reads from the stream will be incorrect + * if the previous has not been consumed. + * + * @author Julien Le Dem + * + */ +abstract public class BytesInput { + private static final Logger LOG = LoggerFactory.getLogger(BytesInput.class); + private static final boolean DEBUG = false;//Log.DEBUG; + private static final EmptyBytesInput EMPTY_BYTES_INPUT = new EmptyBytesInput(); + + /** + * logically concatenate the provided inputs + * @param inputs the inputs to concatenate + * @return a concatenated input + */ + public static BytesInput concat(BytesInput... inputs) { + return new SequenceBytesIn(Arrays.asList(inputs)); + } + + /** + * logically concatenate the provided inputs + * @param inputs the inputs to concatenate + * @return a concatenated input + */ + public static BytesInput concat(List<BytesInput> inputs) { + return new SequenceBytesIn(inputs); + } + + /** + * @param in + * @param bytes number of bytes to read + * @return a BytesInput that will read that number of bytes from the stream + */ + public static BytesInput from(InputStream in, int bytes) { + return new StreamBytesInput(in, bytes); + } + + /** + * @param buffer + * @param length number of bytes to read + * @return a BytesInput that will read the given bytes from the ByteBuffer + */ + public static BytesInput from(ByteBuffer buffer, int offset, int length) { + return new ByteBufferBytesInput(buffer, offset, length); + } + + /** + * + * @param in + * @return a Bytes input that will write the given bytes + */ + public static BytesInput from(byte[] in) { + LOG.debug("BytesInput from array of {} bytes", in.length); + return new ByteArrayBytesInput(in, 0 , in.length); + } + + public static BytesInput from(byte[] in, int offset, int length) { + LOG.debug("BytesInput from array of {} bytes", length); + return new ByteArrayBytesInput(in, offset, length); + } + + /** + * @param intValue the int to write + * @return a BytesInput that will write 4 bytes in little endian + */ + public static BytesInput fromInt(int intValue) { + return new IntBytesInput(intValue); + } + + /** + * @param intValue the int to write + * @return a BytesInput that will write var int + */ + public static BytesInput fromUnsignedVarInt(int intValue) { + return new UnsignedVarIntBytesInput(intValue); + } + + /** + * + * @param intValue the int to write + */ + public static BytesInput fromZigZagVarInt(int intValue) { + int zigZag = (intValue << 1) ^ (intValue >> 31); + return new UnsignedVarIntBytesInput(zigZag); + } + + /** + * @param longValue the long to write + * @return a BytesInput that will write var long + */ + public static BytesInput fromUnsignedVarLong(long longValue) { + return new UnsignedVarLongBytesInput(longValue); + } + + /** + * + * @param longValue the long to write + */ + public static BytesInput fromZigZagVarLong(long longValue) { + long zigZag = (longValue << 1) ^ (longValue >> 63); + return new UnsignedVarLongBytesInput(zigZag); + } + + /** + * @param arrayOut + * @return a BytesInput that will write the content of the buffer + */ + public static BytesInput from(CapacityByteArrayOutputStream arrayOut) { + return new CapacityBAOSBytesInput(arrayOut); + } + + /** + * @param baos - stream to wrap into a BytesInput + * @return a BytesInput that will write the content of the buffer + */ + public static BytesInput from(ByteArrayOutputStream baos) { + return new BAOSBytesInput(baos); + } + + /** + * @return an empty bytes input + */ + public static BytesInput empty() { + return EMPTY_BYTES_INPUT; + } + + /** + * copies the input into a new byte array + * @param bytesInput + * @return + * @throws IOException + */ + public static BytesInput copy(BytesInput bytesInput) throws IOException { + return from(bytesInput.toByteArray()); + } + + /** + * writes the bytes into a stream + * @param out + * @throws IOException + */ + abstract public void writeAllTo(OutputStream out) throws IOException; + + /** + * + * @return a new byte array materializing the contents of this input + * @throws IOException + */ + public byte[] toByteArray() throws IOException { + BAOS baos = new BAOS((int)size()); + this.writeAllTo(baos); + LOG.debug("converted {} to byteArray of {} bytes", size() , baos.size()); + return baos.getBuf(); + } + + /** + * + * @return a new ByteBuffer materializing the contents of this input + * @throws IOException + */ + public ByteBuffer toByteBuffer() throws IOException { + return ByteBuffer.wrap(toByteArray()); + } + + /** + * + * @return a new InputStream materializing the contents of this input + * @throws IOException + */ + public InputStream toInputStream() throws IOException { + return new ByteBufferInputStream(toByteBuffer()); + } + + /** + * + * @return the size in bytes that would be written + */ + abstract public long size(); + + private static final class BAOS extends ByteArrayOutputStream { + private BAOS(int size) { + super(size); + } + + public byte[] getBuf() { + return this.buf; + } + } + + private static class StreamBytesInput extends BytesInput { + private static final Logger LOG = LoggerFactory.getLogger(BytesInput.StreamBytesInput.class); + private final InputStream in; + private final int byteCount; + + private StreamBytesInput(InputStream in, int byteCount) { + super(); + this.in = in; + this.byteCount = byteCount; + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + LOG.debug("write All {} bytes", byteCount); + // TODO: more efficient + out.write(this.toByteArray()); + } + + public byte[] toByteArray() throws IOException { + LOG.debug("read all {} bytes", byteCount); + byte[] buf = new byte[byteCount]; + new DataInputStream(in).readFully(buf); + return buf; + } + + @Override + public long size() { + return byteCount; + } + + } + + private static class SequenceBytesIn extends BytesInput { + private static final Logger LOG = LoggerFactory.getLogger(BytesInput.SequenceBytesIn.class); + + private final List<BytesInput> inputs; + private final long size; + + private SequenceBytesIn(List<BytesInput> inputs) { + this.inputs = inputs; + long total = 0; + for (BytesInput input : inputs) { + total += input.size(); + } + this.size = total; + } + + @SuppressWarnings("unused") + @Override + public void writeAllTo(OutputStream out) throws IOException { + for (BytesInput input : inputs) { + + LOG.debug("write {} bytes to out", input.size()); + if (input instanceof SequenceBytesIn) LOG.debug("{"); + input.writeAllTo(out); + if (input instanceof SequenceBytesIn) LOG.debug("}"); + } + } + + @Override + public long size() { + return size; + } + + } + + private static class IntBytesInput extends BytesInput { + + private final int intValue; + + public IntBytesInput(int intValue) { + this.intValue = intValue; + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + BytesUtils.writeIntLittleEndian(out, intValue); + } + + public ByteBuffer toByteBuffer() throws IOException { + return ByteBuffer.allocate(4).putInt(0, intValue); + } + + @Override + public long size() { + return 4; + } + + } + + private static class UnsignedVarIntBytesInput extends BytesInput { + + private final int intValue; + + public UnsignedVarIntBytesInput(int intValue) { + this.intValue = intValue; + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + BytesUtils.writeUnsignedVarInt(intValue, out); + } + + public ByteBuffer toByteBuffer() throws IOException { + ByteBuffer ret = ByteBuffer.allocate((int) size()); + BytesUtils.writeUnsignedVarInt(intValue, ret); + return ret; + } + + @Override + public long size() { + int s = (38 - Integer.numberOfLeadingZeros(intValue)) / 7; + return s == 0 ? 1 : s; + } + } + + private static class UnsignedVarLongBytesInput extends BytesInput { + + private final long longValue; + + public UnsignedVarLongBytesInput(long longValue) { + this.longValue = longValue; + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + BytesUtils.writeUnsignedVarLong(longValue, out); + } + + @Override + public long size() { + int s = (70 - Long.numberOfLeadingZeros(longValue)) / 7; + return s == 0 ? 1 : s; + } + } + + private static class EmptyBytesInput extends BytesInput { + + @Override + public void writeAllTo(OutputStream out) throws IOException { + } + + @Override + public long size() { + return 0; + } + + public ByteBuffer toByteBuffer() throws IOException { + return ByteBuffer.allocate(0); + } + + } + + private static class CapacityBAOSBytesInput extends BytesInput { + + private final CapacityByteArrayOutputStream arrayOut; + + private CapacityBAOSBytesInput(CapacityByteArrayOutputStream arrayOut) { + this.arrayOut = arrayOut; + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + arrayOut.writeTo(out); + } + + @Override + public long size() { + return arrayOut.size(); + } + + } + + private static class BAOSBytesInput extends BytesInput { + + private final ByteArrayOutputStream arrayOut; + + private BAOSBytesInput(ByteArrayOutputStream arrayOut) { + this.arrayOut = arrayOut; + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + arrayOut.writeTo(out); + } + + @Override + public long size() { + return arrayOut.size(); + } + + } + + private static class ByteArrayBytesInput extends BytesInput { + + private final byte[] in; + private final int offset; + private final int length; + + private ByteArrayBytesInput(byte[] in, int offset, int length) { + this.in = in; + this.offset = offset; + this.length = length; + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + out.write(in, offset, length); + } + + public ByteBuffer toByteBuffer() throws IOException { + return ByteBuffer.wrap(in, offset, length); + } + + @Override + public long size() { + return length; + } + + } + + private static class ByteBufferBytesInput extends BytesInput { + + private final ByteBuffer byteBuf; + private final int length; + private final int offset; + + private ByteBufferBytesInput(ByteBuffer byteBuf, int offset, int length) { + this.byteBuf = byteBuf; + this.offset = offset; + this.length = length; + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + final WritableByteChannel outputChannel = Channels.newChannel(out); + byteBuf.position(offset); + ByteBuffer tempBuf = byteBuf.slice(); + tempBuf.limit(length); + outputChannel.write(tempBuf); + } + + @Override + public ByteBuffer toByteBuffer() throws IOException { + byteBuf.position(offset); + ByteBuffer buf = byteBuf.slice(); + buf.limit(length); + return buf; + } + + @Override + public long size() { + return length; + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java new file mode 100644 index 0000000..92674d4 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import static java.lang.Math.max; +import static java.lang.Math.pow; +import static java.lang.String.format; +import static org.apache.parquet.Preconditions.checkArgument; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.parquet.OutputStreamCloseException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Similar to a {@link ByteArrayOutputStream}, but uses a different strategy for growing that does not involve copying. + * Where ByteArrayOutputStream is backed by a single array that "grows" by copying into a new larger array, this output + * stream grows by allocating a new array (slab) and adding it to a list of previous arrays. + * + * Each new slab is allocated to be the same size as all the previous slabs combined, so these allocations become + * exponentially less frequent, just like ByteArrayOutputStream, with one difference. This output stream accepts a + * max capacity hint, which is a hint describing the max amount of data that will be written to this stream. As the + * total size of this stream nears this max, this stream starts to grow linearly instead of exponentially. + * So new slabs are allocated to be 1/5th of the max capacity hint, + * instead of equal to the total previous size of all slabs. This is useful because it prevents allocating roughly + * twice the needed space when a new slab is added just before the stream is done being used. + * + * When reusing this stream it will adjust the initial slab size based on the previous data size, aiming for fewer + * allocations, with the assumption that a similar amount of data will be written to this stream on re-use. + * See ({@link CapacityByteArrayOutputStream#reset()}). + * + * @author Julien Le Dem + * + */ +public class CapacityByteArrayOutputStream extends OutputStream { + private static final Logger LOG = LoggerFactory.getLogger(CapacityByteArrayOutputStream.class); + private static final ByteBuffer EMPTY_SLAB = ByteBuffer.wrap(new byte[0]); + + private int initialSlabSize; + private final int maxCapacityHint; + private final List<ByteBuffer> slabs = new ArrayList<ByteBuffer>(); + + private ByteBuffer currentSlab; + private int currentSlabIndex; + private int bytesAllocated = 0; + private int bytesUsed = 0; + private ByteBufferAllocator allocator; + + /** + * Return an initial slab size such that a CapacityByteArrayOutputStream constructed with it + * will end up allocating targetNumSlabs in order to reach targetCapacity. This aims to be + * a balance between the overhead of creating new slabs and wasting memory by eagerly making + * initial slabs too big. + * + * Note that targetCapacity here need not match maxCapacityHint in the constructor of + * CapacityByteArrayOutputStream, though often that would make sense. + * + * @param minSlabSize no matter what we shouldn't make slabs any smaller than this + * @param targetCapacity after we've allocated targetNumSlabs how much capacity should we have? + * @param targetNumSlabs how many slabs should it take to reach targetCapacity? + */ + public static int initialSlabSizeHeuristic(int minSlabSize, int targetCapacity, int targetNumSlabs) { + // initialSlabSize = (targetCapacity / (2^targetNumSlabs)) means we double targetNumSlabs times + // before reaching the targetCapacity + // eg for page size of 1MB we start at 1024 bytes. + // we also don't want to start too small, so we also apply a minimum. + return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs)))); + } + + public static CapacityByteArrayOutputStream withTargetNumSlabs( + int minSlabSize, int maxCapacityHint, int targetNumSlabs) { + return withTargetNumSlabs(minSlabSize, maxCapacityHint, targetNumSlabs, new HeapByteBufferAllocator()); + } + + /** + * Construct a CapacityByteArrayOutputStream configured such that its initial slab size is + * determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint + */ + public static CapacityByteArrayOutputStream withTargetNumSlabs( + int minSlabSize, int maxCapacityHint, int targetNumSlabs, ByteBufferAllocator allocator) { + + return new CapacityByteArrayOutputStream( + initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs), + maxCapacityHint, allocator); + } + + /** + * Defaults maxCapacityHint to 1MB + * @param initialSlabSize + * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)} + */ + @Deprecated + public CapacityByteArrayOutputStream(int initialSlabSize) { + this(initialSlabSize, 1024 * 1024, new HeapByteBufferAllocator()); + } + + /** + * Defaults maxCapacityHint to 1MB + * @param initialSlabSize + * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)} + */ + @Deprecated + public CapacityByteArrayOutputStream(int initialSlabSize, ByteBufferAllocator allocator) { + this(initialSlabSize, 1024 * 1024, allocator); + } + + /** + * @param initialSlabSize the size to make the first slab + * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream + * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)} + */ + @Deprecated + public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) { + this(initialSlabSize, maxCapacityHint, new HeapByteBufferAllocator()); + } + + /** + * @param initialSlabSize the size to make the first slab + * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream + */ + public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint, ByteBufferAllocator allocator) { + checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0"); + checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0"); + checkArgument(maxCapacityHint >= initialSlabSize, String.format("maxCapacityHint can't be less than initialSlabSize %d %d", initialSlabSize, maxCapacityHint)); + this.initialSlabSize = initialSlabSize; + this.maxCapacityHint = maxCapacityHint; + this.allocator = allocator; + reset(); + } + + /** + * the new slab is guaranteed to be at least minimumSize + * @param minimumSize the size of the data we want to copy in the new slab + */ + private void addSlab(int minimumSize) { + int nextSlabSize; + + if (bytesUsed == 0) { + nextSlabSize = initialSlabSize; + } else if (bytesUsed > maxCapacityHint / 5) { + // to avoid an overhead of up to twice the needed size, we get linear when approaching target page size + nextSlabSize = maxCapacityHint / 5; + } else { + // double the size every time + nextSlabSize = bytesUsed; + } + + if (nextSlabSize < minimumSize) { + LOG.debug("slab size {} too small for value of size {}. Bumping up slab size", nextSlabSize, minimumSize); + nextSlabSize = minimumSize; + } + + LOG.debug("used {} slabs, adding new slab of size {}", slabs.size(), nextSlabSize); + + this.currentSlab = allocator.allocate(nextSlabSize); + this.slabs.add(currentSlab); + this.bytesAllocated += nextSlabSize; + this.currentSlabIndex = 0; + } + + @Override + public void write(int b) { + if (!currentSlab.hasRemaining()) { + addSlab(1); + } + currentSlab.put(currentSlabIndex, (byte) b); + currentSlabIndex += 1; + currentSlab.position(currentSlabIndex); + bytesUsed += 1; + } + + @Override + public void write(byte b[], int off, int len) { + if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) - b.length > 0)) { + throw new IndexOutOfBoundsException( + String.format("Given byte array of size %d, with requested length(%d) and offset(%d)", b.length, len, off)); + } + if (len >= currentSlab.remaining()) { + final int length1 = currentSlab.remaining(); + currentSlab.put(b, off, length1); + bytesUsed += length1; + currentSlabIndex += length1; + final int length2 = len - length1; + addSlab(length2); + currentSlab.put(b, off + length1, length2); + currentSlabIndex = length2; + bytesUsed += length2; + } else { + currentSlab.put(b, off, len); + currentSlabIndex += len; + bytesUsed += len; + } + } + + private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException { + if (buf.hasArray()) { + out.write(buf.array(), buf.arrayOffset(), len); + } else { + // The OutputStream interface only takes a byte[], unfortunately this means that a ByteBuffer + // not backed by a byte array must be copied to fulfil this interface + byte[] copy = new byte[len]; + buf.flip(); + buf.get(copy); + out.write(copy); + } + } + + /** + * Writes the complete contents of this buffer to the specified output stream argument. the output + * stream's write method <code>out.write(slab, 0, slab.length)</code>) will be called once per slab. + * + * @param out the output stream to which to write the data. + * @exception IOException if an I/O error occurs. + */ + public void writeTo(OutputStream out) throws IOException { + for (int i = 0; i < slabs.size() - 1; i++) { + writeToOutput(out, slabs.get(i), slabs.get(i).position()); + } + writeToOutput(out, currentSlab, currentSlabIndex); + } + + /** + * @return The total size in bytes of data written to this stream. + */ + public long size() { + return bytesUsed; + } + + /** + * + * @return The total size in bytes currently allocated for this stream. + */ + public int getCapacity() { + return bytesAllocated; + } + + /** + * When re-using an instance with reset, it will adjust slab size based on previous data size. + * The intent is to reuse the same instance for the same type of data (for example, the same column). + * The assumption is that the size in the buffer will be consistent. + */ + public void reset() { + // readjust slab size. + // 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the same size + this.initialSlabSize = max(bytesUsed / 7, initialSlabSize); + LOG.debug("initial slab of size {}", initialSlabSize); + for (ByteBuffer slab : slabs) { + allocator.release(slab); + } + this.slabs.clear(); + this.bytesAllocated = 0; + this.bytesUsed = 0; + this.currentSlab = EMPTY_SLAB; + this.currentSlabIndex = 0; + } + + /** + * @return the index of the last value written to this stream, which + * can be passed to {@link #setByte(long, byte)} in order to change it + */ + public long getCurrentIndex() { + checkArgument(bytesUsed > 0, "This is an empty stream"); + return bytesUsed - 1; + } + + /** + * Replace the byte stored at position index in this stream with value + * + * @param index which byte to replace + * @param value the value to replace it with + */ + public void setByte(long index, byte value) { + checkArgument(index < bytesUsed, "Index: " + index + " is >= the current size of: " + bytesUsed); + + long seen = 0; + for (int i = 0; i < slabs.size(); i++) { + ByteBuffer slab = slabs.get(i); + if (index < seen + slab.limit()) { + // ok found index + slab.put((int)(index-seen), value); + break; + } + seen += slab.limit(); + } + } + + /** + * @param prefix a prefix to be used for every new line in the string + * @return a text representation of the memory usage of this structure + */ + public String memUsageString(String prefix) { + return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), getCapacity()); + } + + /** + * @return the total number of allocated slabs + */ + int getSlabCount() { + return slabs.size(); + } + + @Override + public void close() { + for (ByteBuffer slab : slabs) { + allocator.release(slab); + } + try { + super.close(); + }catch(IOException e){ + throw new OutputStreamCloseException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java new file mode 100644 index 0000000..d333168 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import static java.lang.String.format; + +public class ConcatenatingByteArrayCollector extends BytesInput { + private final List<byte[]> slabs = new ArrayList<byte[]>(); + private long size = 0; + + public void collect(BytesInput bytesInput) throws IOException { + byte[] bytes = bytesInput.toByteArray(); + slabs.add(bytes); + size += bytes.length; + } + + public void reset() { + size = 0; + slabs.clear(); + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + for (byte[] slab : slabs) { + out.write(slab); + } + } + + @Override + public long size() { + return size; + } + + /** + * @param prefix a prefix to be used for every new line in the string + * @return a text representation of the memory usage of this structure + */ + public String memUsageString(String prefix) { + return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), size); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java new file mode 100644 index 0000000..a092753 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +/** + * Based on DataInputStream but little endian and without the String/char methods + * + * @author Julien Le Dem + * + */ +public final class LittleEndianDataInputStream extends InputStream { + + private final InputStream in; + + /** + * Creates a LittleEndianDataInputStream that uses the specified + * underlying InputStream. + * + * @param in the specified input stream + */ + public LittleEndianDataInputStream(InputStream in) { + this.in = in; + } + + /** + * See the general contract of the <code>readFully</code> + * method of <code>DataInput</code>. + * <p> + * Bytes + * for this operation are read from the contained + * input stream. + * + * @param b the buffer into which the data is read. + * @exception EOFException if this input stream reaches the end before + * reading all the bytes. + * @exception IOException the stream has been closed and the contained + * input stream does not support reading after close, or + * another I/O error occurs. + * @see java.io.FilterInputStream#in + */ + public final void readFully(byte b[]) throws IOException { + readFully(b, 0, b.length); + } + + /** + * See the general contract of the <code>readFully</code> + * method of <code>DataInput</code>. + * <p> + * Bytes + * for this operation are read from the contained + * input stream. + * + * @param b the buffer into which the data is read. + * @param off the start offset of the data. + * @param len the number of bytes to read. + * @exception EOFException if this input stream reaches the end before + * reading all the bytes. + * @exception IOException the stream has been closed and the contained + * input stream does not support reading after close, or + * another I/O error occurs. + * @see java.io.FilterInputStream#in + */ + public final void readFully(byte b[], int off, int len) throws IOException { + if (len < 0) + throw new IndexOutOfBoundsException(); + int n = 0; + while (n < len) { + int count = in.read(b, off + n, len - n); + if (count < 0) + throw new EOFException(); + n += count; + } + } + + /** + * See the general contract of the <code>skipBytes</code> + * method of <code>DataInput</code>. + * <p> + * Bytes for this operation are read from the contained + * input stream. + * + * @param n the number of bytes to be skipped. + * @return the actual number of bytes skipped. + * @exception IOException if the contained input stream does not support + * seek, or the stream has been closed and + * the contained input stream does not support + * reading after close, or another I/O error occurs. + */ + public final int skipBytes(int n) throws IOException { + int total = 0; + int cur = 0; + + while ((total<n) && ((cur = (int) in.skip(n-total)) > 0)) { + total += cur; + } + + return total; + } + + /** + * @return + * @throws IOException + * @see java.io.InputStream#read() + */ + public int read() throws IOException { + return in.read(); + } + + /** + * @return + * @see java.lang.Object#hashCode() + */ + public int hashCode() { + return in.hashCode(); + } + + /** + * @param b + * @return + * @throws IOException + * @see java.io.InputStream#read(byte[]) + */ + public int read(byte[] b) throws IOException { + return in.read(b); + } + + /** + * @param obj + * @return + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object obj) { + return in.equals(obj); + } + + /** + * @param b + * @param off + * @param len + * @return + * @throws IOException + * @see java.io.InputStream#read(byte[], int, int) + */ + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + + /** + * @param n + * @return + * @throws IOException + * @see java.io.InputStream#skip(long) + */ + public long skip(long n) throws IOException { + return in.skip(n); + } + + /** + * @return + * @throws IOException + * @see java.io.InputStream#available() + */ + public int available() throws IOException { + return in.available(); + } + + /** + * @throws IOException + * @see java.io.InputStream#close() + */ + public void close() throws IOException { + in.close(); + } + + /** + * @param readlimit + * @see java.io.InputStream#mark(int) + */ + public void mark(int readlimit) { + in.mark(readlimit); + } + + /** + * @throws IOException + * @see java.io.InputStream#reset() + */ + public void reset() throws IOException { + in.reset(); + } + + /** + * @return + * @see java.io.InputStream#markSupported() + */ + public boolean markSupported() { + return in.markSupported(); + } + + /** + * See the general contract of the <code>readBoolean</code> + * method of <code>DataInput</code>. + * <p> + * Bytes for this operation are read from the contained + * input stream. + * + * @return the <code>boolean</code> value read. + * @exception EOFException if this input stream has reached the end. + * @exception IOException the stream has been closed and the contained + * input stream does not support reading after close, or + * another I/O error occurs. + * @see java.io.FilterInputStream#in + */ + public final boolean readBoolean() throws IOException { + int ch = in.read(); + if (ch < 0) + throw new EOFException(); + return (ch != 0); + } + + /** + * See the general contract of the <code>readByte</code> + * method of <code>DataInput</code>. + * <p> + * Bytes + * for this operation are read from the contained + * input stream. + * + * @return the next byte of this input stream as a signed 8-bit + * <code>byte</code>. + * @exception EOFException if this input stream has reached the end. + * @exception IOException the stream has been closed and the contained + * input stream does not support reading after close, or + * another I/O error occurs. + * @see java.io.FilterInputStream#in + */ + public final byte readByte() throws IOException { + int ch = in.read(); + if (ch < 0) + throw new EOFException(); + return (byte)(ch); + } + + /** + * See the general contract of the <code>readUnsignedByte</code> + * method of <code>DataInput</code>. + * <p> + * Bytes + * for this operation are read from the contained + * input stream. + * + * @return the next byte of this input stream, interpreted as an + * unsigned 8-bit number. + * @exception EOFException if this input stream has reached the end. + * @exception IOException the stream has been closed and the contained + * input stream does not support reading after close, or + * another I/O error occurs. + * @see java.io.FilterInputStream#in + */ + public final int readUnsignedByte() throws IOException { + int ch = in.read(); + if (ch < 0) + throw new EOFException(); + return ch; + } + + /** + * Bytes + * for this operation are read from the contained + * input stream. + * + * @return the next two bytes of this input stream, interpreted as a + * signed 16-bit number. + * @exception EOFException if this input stream reaches the end before + * reading two bytes. + * @exception IOException the stream has been closed and the contained + * input stream does not support reading after close, or + * another I/O error occurs. + * @see java.io.FilterInputStream#in + */ + public final short readShort() throws IOException { + int ch2 = in.read(); + int ch1 = in.read(); + if ((ch1 | ch2) < 0) + throw new EOFException(); + return (short)((ch1 << 8) + (ch2 << 0)); + } + + /** + * Bytes + * for this operation are read from the contained + * input stream. + * + * @return the next two bytes of this input stream, interpreted as an + * unsigned 16-bit integer. + * @exception EOFException if this input stream reaches the end before + * reading two bytes. + * @exception IOException the stream has been closed and the contained + * input stream does not support reading after close, or + * another I/O error occurs. + * @see java.io.FilterInputStream#in + */ + public final int readUnsignedShort() throws IOException { + int ch2 = in.read(); + int ch1 = in.read(); + if ((ch1 | ch2) < 0) + throw new EOFException(); + return (ch1 << 8) + (ch2 << 0); + } + + /** + * Bytes + * for this operation are read from the contained + * input stream. + * + * @return the next four bytes of this input stream, interpreted as an + * <code>int</code>. + * @exception EOFException if this input stream reaches the end before + * reading four bytes. + * @exception IOException the stream has been closed and the contained + * input stream does not support reading after close, or + * another I/O error occurs. + * @see java.io.FilterInputStream#in + */ + public final int readInt() throws IOException { + // TODO: has this been benchmarked against two alternate implementations? + // 1) Integer.reverseBytes(in.readInt()) + // 2) keep a member byte[4], wrapped by an IntBuffer with appropriate endianness set, + // and call IntBuffer.get() + // Both seem like they might be faster. + int ch4 = in.read(); + int ch3 = in.read(); + int ch2 = in.read(); + int ch1 = in.read(); + if ((ch1 | ch2 | ch3 | ch4) < 0) + throw new EOFException(); + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } + + private byte readBuffer[] = new byte[8]; + + /** + * Bytes + * for this operation are read from the contained + * input stream. + * + * @return the next eight bytes of this input stream, interpreted as a + * <code>long</code>. + * @exception EOFException if this input stream reaches the end before + * reading eight bytes. + * @exception IOException the stream has been closed and the contained + * input stream does not support reading after close, or + * another I/O error occurs. + * @see java.io.FilterInputStream#in + */ + public final long readLong() throws IOException { + // TODO: see perf question above in readInt + readFully(readBuffer, 0, 8); + return (((long)readBuffer[7] << 56) + + ((long)(readBuffer[6] & 255) << 48) + + ((long)(readBuffer[5] & 255) << 40) + + ((long)(readBuffer[4] & 255) << 32) + + ((long)(readBuffer[3] & 255) << 24) + + ((readBuffer[2] & 255) << 16) + + ((readBuffer[1] & 255) << 8) + + ((readBuffer[0] & 255) << 0)); + } + + /** + * Bytes + * for this operation are read from the contained + * input stream. + * + * @return the next four bytes of this input stream, interpreted as a + * <code>float</code>. + * @exception EOFException if this input stream reaches the end before + * reading four bytes. + * @exception IOException the stream has been closed and the contained + * input stream does not support reading after close, or + * another I/O error occurs. + * @see java.lang.Float#intBitsToFloat(int) + */ + public final float readFloat() throws IOException { + return Float.intBitsToFloat(readInt()); + } + + /** + * Bytes + * for this operation are read from the contained + * input stream. + * + * @return the next eight bytes of this input stream, interpreted as a + * <code>double</code>. + * @exception EOFException if this input stream reaches the end before + * reading eight bytes. + * @exception IOException the stream has been closed and the contained + * input stream does not support reading after close, or + * another I/O error occurs. + * @see java.lang.Double#longBitsToDouble(long) + */ + public final double readDouble() throws IOException { + return Double.longBitsToDouble(readLong()); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java new file mode 100644 index 0000000..9d4a8a9 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import org.apache.parquet.IOExceptionUtils; +import org.apache.parquet.ParquetRuntimeException; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Based on DataOutputStream but in little endian and without the String/char methods + * + * @author Julien Le Dem + * + */ +public class LittleEndianDataOutputStream extends OutputStream { + + private final OutputStream out; + + /** + * Creates a new data output stream to write data to the specified + * underlying output stream. The counter <code>written</code> is + * set to zero. + * + * @param out the underlying output stream, to be saved for later + * use. + * @see java.io.FilterOutputStream#out + */ + public LittleEndianDataOutputStream(OutputStream out) { + this.out = out; + } + + /** + * Writes the specified byte (the low eight bits of the argument + * <code>b</code>) to the underlying output stream. If no exception + * is thrown, the counter <code>written</code> is incremented by + * <code>1</code>. + * <p> + * Implements the <code>write</code> method of <code>OutputStream</code>. + * + * @param b the <code>byte</code> to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + public void write(int b) throws IOException { + out.write(b); + } + + /** + * Writes <code>len</code> bytes from the specified byte array + * starting at offset <code>off</code> to the underlying output stream. + * If no exception is thrown, the counter <code>written</code> is + * incremented by <code>len</code>. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + public void write(byte b[], int off, int len) throws IOException { + out.write(b, off, len); + } + + /** + * Flushes this data output stream. This forces any buffered output + * bytes to be written out to the stream. + * <p> + * The <code>flush</code> method of <code>DataOutputStream</code> + * calls the <code>flush</code> method of its underlying output stream. + * + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + * @see java.io.OutputStream#flush() + */ + public void flush() throws IOException { + out.flush(); + } + + /** + * Writes a <code>boolean</code> to the underlying output stream as + * a 1-byte value. The value <code>true</code> is written out as the + * value <code>(byte)1</code>; the value <code>false</code> is + * written out as the value <code>(byte)0</code>. If no exception is + * thrown, the counter <code>written</code> is incremented by + * <code>1</code>. + * + * @param v a <code>boolean</code> value to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + public final void writeBoolean(boolean v) throws IOException { + out.write(v ? 1 : 0); + } + + /** + * Writes out a <code>byte</code> to the underlying output stream as + * a 1-byte value. If no exception is thrown, the counter + * <code>written</code> is incremented by <code>1</code>. + * + * @param v a <code>byte</code> value to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + public final void writeByte(int v) throws IOException { + out.write(v); + } + + /** + * Writes a <code>short</code> to the underlying output stream as two + * bytes, low byte first. If no exception is thrown, the counter + * <code>written</code> is incremented by <code>2</code>. + * + * @param v a <code>short</code> to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + public final void writeShort(int v) throws IOException { + out.write((v >>> 0) & 0xFF); + out.write((v >>> 8) & 0xFF); + } + + /** + * Writes an <code>int</code> to the underlying output stream as four + * bytes, low byte first. If no exception is thrown, the counter + * <code>written</code> is incremented by <code>4</code>. + * + * @param v an <code>int</code> to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + public final void writeInt(int v) throws IOException { + // TODO: see note in LittleEndianDataInputStream: maybe faster + // to use Integer.reverseBytes() and then writeInt, or a ByteBuffer + // approach + out.write((v >>> 0) & 0xFF); + out.write((v >>> 8) & 0xFF); + out.write((v >>> 16) & 0xFF); + out.write((v >>> 24) & 0xFF); + } + + private byte writeBuffer[] = new byte[8]; + + /** + * Writes a <code>long</code> to the underlying output stream as eight + * bytes, low byte first. In no exception is thrown, the counter + * <code>written</code> is incremented by <code>8</code>. + * + * @param v a <code>long</code> to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + public final void writeLong(long v) throws IOException { + writeBuffer[7] = (byte)(v >>> 56); + writeBuffer[6] = (byte)(v >>> 48); + writeBuffer[5] = (byte)(v >>> 40); + writeBuffer[4] = (byte)(v >>> 32); + writeBuffer[3] = (byte)(v >>> 24); + writeBuffer[2] = (byte)(v >>> 16); + writeBuffer[1] = (byte)(v >>> 8); + writeBuffer[0] = (byte)(v >>> 0); + out.write(writeBuffer, 0, 8); + } + + /** + * Converts the float argument to an <code>int</code> using the + * <code>floatToIntBits</code> method in class <code>Float</code>, + * and then writes that <code>int</code> value to the underlying + * output stream as a 4-byte quantity, low byte first. If no + * exception is thrown, the counter <code>written</code> is + * incremented by <code>4</code>. + * + * @param v a <code>float</code> value to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + * @see java.lang.Float#floatToIntBits(float) + */ + public final void writeFloat(float v) throws IOException { + writeInt(Float.floatToIntBits(v)); + } + + /** + * Converts the double argument to a <code>long</code> using the + * <code>doubleToLongBits</code> method in class <code>Double</code>, + * and then writes that <code>long</code> value to the underlying + * output stream as an 8-byte quantity, low byte first. If no + * exception is thrown, the counter <code>written</code> is + * incremented by <code>8</code>. + * + * @param v a <code>double</code> value to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + * @see java.lang.Double#doubleToLongBits(double) + */ + public final void writeDouble(double v) throws IOException { + writeLong(Double.doubleToLongBits(v)); + } + + public void close() { + IOExceptionUtils.closeQuietly(out); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java new file mode 100644 index 0000000..5b1b657 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.compression; + +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface CompressionCodecFactory { + + BytesInputCompressor getCompressor(CompressionCodecName codecName); + + BytesInputDecompressor getDecompressor(CompressionCodecName codecName); + + void release(); + + interface BytesInputCompressor { + BytesInput compress(BytesInput bytes) throws IOException; + CompressionCodecName getCodecName(); + void release(); + } + + interface BytesInputDecompressor { + BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException; + void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException; + void release(); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java b/parquet-common/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java new file mode 100644 index 0000000..bf2da32 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.codec; + +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +/** + * This exception will be thrown when the codec is not supported by parquet, meaning there is no + * matching codec defined in {@link CompressionCodecName} + */ +public class CompressionCodecNotSupportedException extends RuntimeException { + private final Class codecClass; + + public CompressionCodecNotSupportedException(Class codecClass) { + super("codec not supported: " + codecClass.getName()); + this.codecClass = codecClass; + } + + public Class getCodecClass() { + return codecClass; + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java new file mode 100644 index 0000000..8cdede0 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.metadata; + + +import org.apache.parquet.format.CompressionCodec; +import org.apache.parquet.hadoop.codec.CompressionCodecNotSupportedException; +import java.util.Locale; + +public enum CompressionCodecName { + UNCOMPRESSED(null, CompressionCodec.UNCOMPRESSED, ""), + SNAPPY("org.apache.parquet.hadoop.codec.SnappyCodec", CompressionCodec.SNAPPY, ".snappy"), + GZIP("org.apache.hadoop.io.compress.GzipCodec", CompressionCodec.GZIP, ".gz"), + LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"), + BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, ".br"), + LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"), + ZSTD("org.apache.hadoop.io.compress.ZStandardCodec", CompressionCodec.ZSTD, ".zstd"); + + public static CompressionCodecName fromConf(String name) { + if (name == null) { + return UNCOMPRESSED; + } + return valueOf(name.toUpperCase(Locale.ENGLISH)); + } + + public static CompressionCodecName fromCompressionCodec(Class<?> clazz) { + if (clazz == null) { + return UNCOMPRESSED; + } + String name = clazz.getName(); + for (CompressionCodecName codec : CompressionCodecName.values()) { + if (name.equals(codec.getHadoopCompressionCodecClassName())) { + return codec; + } + } + throw new CompressionCodecNotSupportedException(clazz); + } + + public static CompressionCodecName fromParquet(CompressionCodec codec) { + for (CompressionCodecName codecName : CompressionCodecName.values()) { + if (codec.equals(codecName.parquetCompressionCodec)) { + return codecName; + } + } + throw new IllegalArgumentException("Unknown compression codec " + codec); + } + + private final String hadoopCompressionCodecClass; + private final CompressionCodec parquetCompressionCodec; + private final String extension; + + private CompressionCodecName(String hadoopCompressionCodecClass, CompressionCodec parquetCompressionCodec, String extension) { + this.hadoopCompressionCodecClass = hadoopCompressionCodecClass; + this.parquetCompressionCodec = parquetCompressionCodec; + this.extension = extension; + } + + public String getHadoopCompressionCodecClassName() { + return hadoopCompressionCodecClass; + } + + public Class getHadoopCompressionCodecClass() { + String codecClassName = getHadoopCompressionCodecClassName(); + if (codecClassName==null) { + return null; + } + try { + return Class.forName(codecClassName); + } catch (ClassNotFoundException e) { + return null; + } + } + + public CompressionCodec getParquetCompressionCodec() { + return parquetCompressionCodec; + } + + public String getExtension() { + return extension; + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java new file mode 100644 index 0000000..9e52428 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.io; + +import java.io.IOException; +import java.io.OutputStream; + +public abstract class DelegatingPositionOutputStream extends PositionOutputStream { + private final OutputStream stream; + + public DelegatingPositionOutputStream(OutputStream stream) { + this.stream = stream; + } + + public OutputStream getStream() { + return stream; + } + + @Override + public void close() throws IOException { + stream.close(); + } + + @Override + public void flush() throws IOException { + stream.flush(); + } + + @Override + public abstract long getPos() throws IOException; + + @Override + public void write(int b) throws IOException { + stream.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + stream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + stream.write(b, off, len); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/DelegatingSeekableInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/io/DelegatingSeekableInputStream.java b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingSeekableInputStream.java new file mode 100644 index 0000000..bc4940c --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingSeekableInputStream.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.io; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * Implements read methods required by {@link SeekableInputStream} for generic input streams. + * <p> + * Implementations must implement {@link #getPos()} and {@link #seek(long)} and may optionally + * implement other read methods to improve performance. + */ +public abstract class DelegatingSeekableInputStream extends SeekableInputStream { + + private final int COPY_BUFFER_SIZE = 8192; + private final byte[] temp = new byte[COPY_BUFFER_SIZE]; + + private final InputStream stream; + + public DelegatingSeekableInputStream(InputStream stream) { + this.stream = stream; + } + + public InputStream getStream() { + return stream; + } + + @Override + public void close() throws IOException { + stream.close(); + } + + @Override + public abstract long getPos() throws IOException; + + @Override + public abstract void seek(long newPos) throws IOException; + + @Override + public int read() throws IOException { + return stream.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return stream.read(b, off, len); + } + + @Override + public void readFully(byte[] bytes) throws IOException { + readFully(stream, bytes, 0, bytes.length); + } + + @Override + public void readFully(byte[] bytes, int start, int len) throws IOException { + readFully(stream, bytes, start, len); + } + + @Override + public int read(ByteBuffer buf) throws IOException { + if (buf.hasArray()) { + return readHeapBuffer(stream, buf); + } else { + return readDirectBuffer(stream, buf, temp); + } + } + + @Override + public void readFully(ByteBuffer buf) throws IOException { + if (buf.hasArray()) { + readFullyHeapBuffer(stream, buf); + } else { + readFullyDirectBuffer(stream, buf, temp); + } + } + + // Visible for testing + static void readFully(InputStream f, byte[] bytes, int start, int len) throws IOException { + int offset = start; + int remaining = len; + while (remaining > 0) { + int bytesRead = f.read(bytes, offset, remaining); + if (bytesRead < 0) { + throw new EOFException( + "Reached the end of stream with " + remaining + " bytes left to read"); + } + + remaining -= bytesRead; + offset += bytesRead; + } + } + + // Visible for testing + static int readHeapBuffer(InputStream f, ByteBuffer buf) throws IOException { + int bytesRead = f.read(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); + if (bytesRead < 0) { + // if this resulted in EOF, don't update position + return bytesRead; + } else { + buf.position(buf.position() + bytesRead); + return bytesRead; + } + } + + // Visible for testing + static void readFullyHeapBuffer(InputStream f, ByteBuffer buf) throws IOException { + readFully(f, buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); + buf.position(buf.limit()); + } + + // Visible for testing + static int readDirectBuffer(InputStream f, ByteBuffer buf, byte[] temp) throws IOException { + // copy all the bytes that return immediately, stopping at the first + // read that doesn't return a full buffer. + int nextReadLength = Math.min(buf.remaining(), temp.length); + int totalBytesRead = 0; + int bytesRead; + + while ((bytesRead = f.read(temp, 0, nextReadLength)) == temp.length) { + buf.put(temp); + totalBytesRead += bytesRead; + nextReadLength = Math.min(buf.remaining(), temp.length); + } + + if (bytesRead < 0) { + // return -1 if nothing was read + return totalBytesRead == 0 ? -1 : totalBytesRead; + } else { + // copy the last partial buffer + buf.put(temp, 0, bytesRead); + totalBytesRead += bytesRead; + return totalBytesRead; + } + } + + // Visible for testing + static void readFullyDirectBuffer(InputStream f, ByteBuffer buf, byte[] temp) throws IOException { + int nextReadLength = Math.min(buf.remaining(), temp.length); + int bytesRead = 0; + + while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) >= 0) { + buf.put(temp, 0, bytesRead); + nextReadLength = Math.min(buf.remaining(), temp.length); + } + + if (bytesRead < 0 && buf.remaining() > 0) { + throw new EOFException( + "Reached the end of stream with " + buf.remaining() + " bytes left to read"); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java index e2c7cc0..f910074 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java @@ -28,15 +28,16 @@ import java.io.IOException; public interface InputFile { /** - * Returns the total length of the file, in bytes. + * @return the total length of the file, in bytes. * @throws IOException if the length cannot be determined */ long getLength() throws IOException; /** - * Opens a new {@link SeekableInputStream} for the underlying - * data file. - * @throws IOException if the stream cannot be opened. + * Open a new {@link SeekableInputStream} for the underlying data file. + * + * @return a new {@link SeekableInputStream} to read the file + * @throws IOException if the stream cannot be opened */ SeekableInputStream newStream() throws IOException; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java new file mode 100644 index 0000000..2d6de44 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.io; + +import java.io.IOException; + +public interface OutputFile { + + PositionOutputStream create(long blockSizeHint) throws IOException; + + PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException; + + boolean supportsBlockSize(); + + long defaultBlockSize(); + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/PositionOutputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/io/PositionOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/io/PositionOutputStream.java new file mode 100644 index 0000000..066c46b --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/io/PositionOutputStream.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.io; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * {@code PositionOutputStream} is an interface with the methods needed by + * Parquet to write data to a file or Hadoop data stream. + */ +public abstract class PositionOutputStream extends OutputStream { + + /** + * Reports the current position of this output stream. + * + * @return a long, the current position in bytes starting from 0 + * @throws IOException when the underlying stream throws IOException + */ + public abstract long getPos() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/test/java/org/apache/parquet/io/MockInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/test/java/org/apache/parquet/io/MockInputStream.java b/parquet-common/src/test/java/org/apache/parquet/io/MockInputStream.java new file mode 100644 index 0000000..42e3a8a --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/io/MockInputStream.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.io; + +import java.io.ByteArrayInputStream; + +class MockInputStream extends ByteArrayInputStream { + + static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + private int[] lengths; + private int current = 0; + MockInputStream(int... actualReadLengths) { + super(TEST_ARRAY); + this.lengths = actualReadLengths; + } + + @Override + public synchronized int read(byte[] b, int off, int len) { + if (current < lengths.length) { + if (len <= lengths[current]) { + // when len == lengths[current], the next read will by 0 bytes + int bytesRead = super.read(b, off, len); + lengths[current] -= bytesRead; + return bytesRead; + } else { + int bytesRead = super.read(b, off, lengths[current]); + current += 1; + return bytesRead; + } + } else { + return super.read(b, off, len); + } + } + + public long getPos() { + return this.pos; + } +}