http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java deleted file mode 100644 index ba8c13f..0000000 --- a/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java +++ /dev/null @@ -1,529 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.orc.CompressionCodec; -import org.apache.orc.CompressionCodec.Modifier; -import org.apache.orc.CompressionKind; -import org.apache.orc.OrcFile; -import org.apache.orc.OrcFile.CompressionStrategy; -import org.apache.orc.OrcProto; -import org.apache.orc.OrcProto.BloomFilterIndex; -import org.apache.orc.OrcProto.Footer; -import org.apache.orc.OrcProto.Metadata; -import org.apache.orc.OrcProto.PostScript; -import org.apache.orc.OrcProto.Stream.Kind; -import org.apache.orc.OrcProto.StripeFooter; -import org.apache.orc.OrcProto.StripeInformation; -import org.apache.orc.OrcProto.RowIndex.Builder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.CodedOutputStream; - -public class PhysicalFsWriter implements PhysicalWriter { - private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class); - - private static final int HDFS_BUFFER_SIZE = 256 * 1024; - - private FSDataOutputStream rawWriter = null; - // the compressed metadata information outStream - private OutStream writer = null; - // a protobuf outStream around streamFactory - private CodedOutputStream protobufWriter = null; - - private final FileSystem fs; - private final Path path; - private final long blockSize; - private final int bufferSize; - private final CompressionCodec codec; - private final double paddingTolerance; - private final long defaultStripeSize; - private final CompressionKind compress; - private final boolean addBlockPadding; - private final CompressionStrategy compressionStrategy; - - // the streams that make up the current stripe - private final Map<StreamName, BufferedStream> streams = - new TreeMap<StreamName, BufferedStream>(); - - private long adjustedStripeSize; - private long headerLength; - private long stripeStart; - private int metadataLength; - private int footerLength; - - public PhysicalFsWriter(FileSystem fs, Path path, int numColumns, OrcFile.WriterOptions opts) { - this.fs = fs; - this.path = path; - this.defaultStripeSize = this.adjustedStripeSize = opts.getStripeSize(); - this.addBlockPadding = opts.getBlockPadding(); - if (opts.isEnforceBufferSize()) { - this.bufferSize = opts.getBufferSize(); - } else { - this.bufferSize = getEstimatedBufferSize(defaultStripeSize, numColumns, opts.getBufferSize()); - } - this.compress = opts.getCompress(); - this.compressionStrategy = opts.getCompressionStrategy(); - codec = createCodec(compress); - this.paddingTolerance = opts.getPaddingTolerance(); - this.blockSize = opts.getBlockSize(); - LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" + - " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize, - compress, bufferSize); - } - - @Override - public void initialize() throws IOException { - if (rawWriter != null) return; - rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE, - fs.getDefaultReplication(path), blockSize); - rawWriter.writeBytes(OrcFile.MAGIC); - headerLength = rawWriter.getPos(); - writer = new OutStream("metadata", bufferSize, codec, - new DirectStream(rawWriter)); - protobufWriter = CodedOutputStream.newInstance(writer); - } - - private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException { - this.stripeStart = rawWriter.getPos(); - final long currentStripeSize = indexSize + dataSize + footerSize; - final long available = blockSize - (stripeStart % blockSize); - final long overflow = currentStripeSize - adjustedStripeSize; - final float availRatio = (float) available / (float) defaultStripeSize; - - if (availRatio > 0.0f && availRatio < 1.0f - && availRatio > paddingTolerance) { - // adjust default stripe size to fit into remaining space, also adjust - // the next stripe for correction based on the current stripe size - // and user specified padding tolerance. Since stripe size can overflow - // the default stripe size we should apply this correction to avoid - // writing portion of last stripe to next hdfs block. - double correction = overflow > 0 ? (double) overflow - / (double) adjustedStripeSize : 0.0; - - // correction should not be greater than user specified padding - // tolerance - correction = correction > paddingTolerance ? paddingTolerance - : correction; - - // adjust next stripe size based on current stripe estimate correction - adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize)); - } else if (availRatio >= 1.0) { - adjustedStripeSize = defaultStripeSize; - } - - if (availRatio < paddingTolerance && addBlockPadding) { - long padding = blockSize - (stripeStart % blockSize); - byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)]; - LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)", - padding, availRatio, defaultStripeSize)); - stripeStart += padding; - while (padding > 0) { - int writeLen = (int) Math.min(padding, pad.length); - rawWriter.write(pad, 0, writeLen); - padding -= writeLen; - } - adjustedStripeSize = defaultStripeSize; - } else if (currentStripeSize < blockSize - && (stripeStart % blockSize) + currentStripeSize > blockSize) { - // even if you don't pad, reset the default stripe size when crossing a - // block boundary - adjustedStripeSize = defaultStripeSize; - } - } - - /** - * An output receiver that writes the ByteBuffers to the output stream - * as they are received. - */ - private class DirectStream implements OutStream.OutputReceiver { - private final FSDataOutputStream output; - - DirectStream(FSDataOutputStream output) { - this.output = output; - } - - @Override - public void output(ByteBuffer buffer) throws IOException { - output.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); - } - } - - @Override - public long getPhysicalStripeSize() { - return adjustedStripeSize; - } - - @Override - public boolean isCompressed() { - return codec != null; - } - - - public static CompressionCodec createCodec(CompressionKind kind) { - switch (kind) { - case NONE: - return null; - case ZLIB: - return new ZlibCodec(); - case SNAPPY: - return new SnappyCodec(); - case LZO: - try { - ClassLoader loader = Thread.currentThread().getContextClassLoader(); - if (loader == null) { - loader = WriterImpl.class.getClassLoader(); - } - @SuppressWarnings("unchecked") - Class<? extends CompressionCodec> lzo = - (Class<? extends CompressionCodec>) - loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec"); - return lzo.newInstance(); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("LZO is not available.", e); - } catch (InstantiationException e) { - throw new IllegalArgumentException("Problem initializing LZO", e); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException("Insufficient access to LZO", e); - } - default: - throw new IllegalArgumentException("Unknown compression codec: " + - kind); - } - } - - private void writeStripeFooter(StripeFooter footer, long dataSize, long indexSize, - StripeInformation.Builder dirEntry) throws IOException { - footer.writeTo(protobufWriter); - protobufWriter.flush(); - writer.flush(); - dirEntry.setOffset(stripeStart); - dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - dataSize - indexSize); - } - - @VisibleForTesting - public static int getEstimatedBufferSize(long stripeSize, int numColumns, - int bs) { - // The worst case is that there are 2 big streams per a column and - // we want to guarantee that each stream gets ~10 buffers. - // This keeps buffers small enough that we don't get really small stripe - // sizes. - int estBufferSize = (int) (stripeSize / (20 * numColumns)); - estBufferSize = getClosestBufferSize(estBufferSize); - return estBufferSize > bs ? bs : estBufferSize; - } - - private static int getClosestBufferSize(int estBufferSize) { - final int kb4 = 4 * 1024; - final int kb8 = 8 * 1024; - final int kb16 = 16 * 1024; - final int kb32 = 32 * 1024; - final int kb64 = 64 * 1024; - final int kb128 = 128 * 1024; - final int kb256 = 256 * 1024; - if (estBufferSize <= kb4) { - return kb4; - } else if (estBufferSize > kb4 && estBufferSize <= kb8) { - return kb8; - } else if (estBufferSize > kb8 && estBufferSize <= kb16) { - return kb16; - } else if (estBufferSize > kb16 && estBufferSize <= kb32) { - return kb32; - } else if (estBufferSize > kb32 && estBufferSize <= kb64) { - return kb64; - } else if (estBufferSize > kb64 && estBufferSize <= kb128) { - return kb128; - } else { - return kb256; - } - } - - @Override - public void writeFileMetadata(Metadata.Builder builder) throws IOException { - long startPosn = rawWriter.getPos(); - Metadata metadata = builder.build(); - metadata.writeTo(protobufWriter); - protobufWriter.flush(); - writer.flush(); - this.metadataLength = (int) (rawWriter.getPos() - startPosn); - } - - @Override - public void writeFileFooter(Footer.Builder builder) throws IOException { - long bodyLength = rawWriter.getPos() - metadataLength; - builder.setContentLength(bodyLength); - builder.setHeaderLength(headerLength); - long startPosn = rawWriter.getPos(); - Footer footer = builder.build(); - footer.writeTo(protobufWriter); - protobufWriter.flush(); - writer.flush(); - this.footerLength = (int) (rawWriter.getPos() - startPosn); - } - - @Override - public void writePostScript(PostScript.Builder builder) throws IOException { - builder.setCompression(writeCompressionKind(compress)); - builder.setFooterLength(footerLength); - builder.setMetadataLength(metadataLength); - if (compress != CompressionKind.NONE) { - builder.setCompressionBlockSize(bufferSize); - } - PostScript ps = builder.build(); - // need to write this uncompressed - long startPosn = rawWriter.getPos(); - ps.writeTo(rawWriter); - long length = rawWriter.getPos() - startPosn; - if (length > 255) { - throw new IllegalArgumentException("PostScript too large at " + length); - } - rawWriter.writeByte((int)length); - } - - @Override - public void close() throws IOException { - rawWriter.close(); - } - - private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) { - switch (kind) { - case NONE: return OrcProto.CompressionKind.NONE; - case ZLIB: return OrcProto.CompressionKind.ZLIB; - case SNAPPY: return OrcProto.CompressionKind.SNAPPY; - case LZO: return OrcProto.CompressionKind.LZO; - default: - throw new IllegalArgumentException("Unknown compression " + kind); - } - } - - @Override - public void flush() throws IOException { - rawWriter.hflush(); - // TODO: reset? - } - - @Override - public long getRawWriterPosition() throws IOException { - return rawWriter.getPos(); - } - - @Override - public void appendRawStripe(byte[] stripe, int offset, int length, - StripeInformation.Builder dirEntry) throws IOException { - long start = rawWriter.getPos(); - long availBlockSpace = blockSize - (start % blockSize); - - // see if stripe can fit in the current hdfs block, else pad the remaining - // space in the block - if (length < blockSize && length > availBlockSpace && - addBlockPadding) { - byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; - LOG.info(String.format("Padding ORC by %d bytes while merging..", - availBlockSpace)); - start += availBlockSpace; - while (availBlockSpace > 0) { - int writeLen = (int) Math.min(availBlockSpace, pad.length); - rawWriter.write(pad, 0, writeLen); - availBlockSpace -= writeLen; - } - } - - rawWriter.write(stripe); - dirEntry.setOffset(start); - } - - - /** - * This class is used to hold the contents of streams as they are buffered. - * The TreeWriters write to the outStream and the codec compresses the - * data as buffers fill up and stores them in the output list. When the - * stripe is being written, the whole stream is written to the file. - */ - private class BufferedStream implements OutStream.OutputReceiver { - private final OutStream outStream; - private final List<ByteBuffer> output = new ArrayList<ByteBuffer>(); - - BufferedStream(String name, int bufferSize, - CompressionCodec codec) throws IOException { - outStream = new OutStream(name, bufferSize, codec, this); - } - - /** - * Receive a buffer from the compression codec. - * @param buffer the buffer to save - */ - @Override - public void output(ByteBuffer buffer) { - output.add(buffer); - } - - /** - * @return the number of bytes in buffers that are allocated to this stream. - */ - public long getBufferSize() { - long result = 0; - for (ByteBuffer buf: output) { - result += buf.capacity(); - } - return outStream.getBufferSize() + result; - } - - /** - * Write any saved buffers to the OutputStream if needed, and clears all the buffers. - */ - public void spillToDiskAndClear() throws IOException { - if (!outStream.isSuppressed()) { - for (ByteBuffer buffer: output) { - rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(), - buffer.remaining()); - } - } - outStream.clear(); - output.clear(); - } - - /** - * @return The number of bytes that will be written to the output. Assumes the stream writing - * into this receiver has already been flushed. - */ - public long getOutputSize() { - long result = 0; - for (ByteBuffer buffer: output) { - result += buffer.remaining(); - } - return result; - } - - @Override - public String toString() { - return outStream.toString(); - } - } - - @Override - public OutStream getOrCreatePhysicalStream(StreamName name) throws IOException { - BufferedStream result = streams.get(name); - if (result == null) { - EnumSet<Modifier> modifiers = createCompressionModifiers(name.getKind()); - result = new BufferedStream(name.toString(), bufferSize, - codec == null ? null : codec.modify(modifiers)); - streams.put(name, result); - } - return result.outStream; - } - - private EnumSet<Modifier> createCompressionModifiers(Kind kind) { - switch (kind) { - case BLOOM_FILTER: - case DATA: - case DICTIONARY_DATA: - return EnumSet.of(Modifier.TEXT, - compressionStrategy == CompressionStrategy.SPEED ? Modifier.FAST : Modifier.DEFAULT); - case LENGTH: - case DICTIONARY_COUNT: - case PRESENT: - case ROW_INDEX: - case SECONDARY: - // easily compressed using the fastest modes - return EnumSet.of(CompressionCodec.Modifier.FASTEST, CompressionCodec.Modifier.BINARY); - default: - LOG.warn("Missing ORC compression modifiers for " + kind); - return null; - } - } - - @Override - public void finalizeStripe(StripeFooter.Builder footerBuilder, - StripeInformation.Builder dirEntry) throws IOException { - long indexSize = 0; - long dataSize = 0; - for (Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) { - BufferedStream receiver = pair.getValue(); - OutStream outStream = receiver.outStream; - if (!outStream.isSuppressed()) { - outStream.flush(); - long streamSize = receiver.getOutputSize(); - StreamName name = pair.getKey(); - footerBuilder.addStreams(OrcProto.Stream.newBuilder().setColumn(name.getColumn()) - .setKind(name.getKind()).setLength(streamSize)); - if (StreamName.Area.INDEX == name.getArea()) { - indexSize += streamSize; - } else { - dataSize += streamSize; - } - } - } - dirEntry.setIndexLength(indexSize).setDataLength(dataSize); - - OrcProto.StripeFooter footer = footerBuilder.build(); - // Do we need to pad the file so the stripe doesn't straddle a block boundary? - padStripe(indexSize, dataSize, footer.getSerializedSize()); - - // write out the data streams - for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) { - pair.getValue().spillToDiskAndClear(); - } - // Write out the footer. - writeStripeFooter(footer, dataSize, indexSize, dirEntry); - } - - @Override - public long estimateMemory() { - long result = 0; - for (BufferedStream stream: streams.values()) { - result += stream.getBufferSize(); - } - return result; - } - - @Override - public void writeIndexStream(StreamName name, Builder rowIndex) throws IOException { - OutStream stream = getOrCreatePhysicalStream(name); - rowIndex.build().writeTo(stream); - stream.flush(); - } - - @Override - public void writeBloomFilterStream( - StreamName name, BloomFilterIndex.Builder bloomFilterIndex) throws IOException { - OutStream stream = getOrCreatePhysicalStream(name); - bloomFilterIndex.build().writeTo(stream); - stream.flush(); - } - - @VisibleForTesting - public OutputStream getStream() throws IOException { - initialize(); - return rawWriter; - } -}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/PhysicalWriter.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/PhysicalWriter.java b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java deleted file mode 100644 index 5ba1b9b..0000000 --- a/orc/src/java/org/apache/orc/impl/PhysicalWriter.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import java.io.IOException; - -import org.apache.orc.OrcProto.BloomFilterIndex; -import org.apache.orc.OrcProto.Footer; -import org.apache.orc.OrcProto.Metadata; -import org.apache.orc.OrcProto.PostScript; -import org.apache.orc.OrcProto.RowIndex; -import org.apache.orc.OrcProto.StripeFooter; -import org.apache.orc.OrcProto.StripeInformation; - -public interface PhysicalWriter { - - /** - * Creates all the streams/connections/etc. necessary to write. - */ - void initialize() throws IOException; - - /** - * Writes out the file metadata. - * @param builder Metadata builder to finalize and write. - */ - void writeFileMetadata(Metadata.Builder builder) throws IOException; - - /** - * Writes out the file footer. - * @param builder Footer builder to finalize and write. - */ - void writeFileFooter(Footer.Builder builder) throws IOException; - - /** - * Writes out the postscript (including the size byte if needed). - * @param builder Postscript builder to finalize and write. - */ - void writePostScript(PostScript.Builder builder) throws IOException; - - /** - * Creates physical stream to write data to. - * @param name Stream name. - * @return The output stream. - */ - OutStream getOrCreatePhysicalStream(StreamName name) throws IOException; - - /** - * Flushes the data in all the streams, spills them to disk, write out stripe footer. - * @param footer Stripe footer to be updated with relevant data and written out. - * @param dirEntry File metadata entry for the stripe, to be updated with relevant data. - */ - void finalizeStripe(StripeFooter.Builder footer, - StripeInformation.Builder dirEntry) throws IOException; - - /** - * Writes out the index for the stripe column. - * @param streamName Stream name. - * @param rowIndex Row index entries to write. - */ - void writeIndexStream(StreamName name, RowIndex.Builder rowIndex) throws IOException; - - /** - * Writes out the index for the stripe column. - * @param streamName Stream name. - * @param bloomFilterIndex Bloom filter index to write. - */ - void writeBloomFilterStream(StreamName streamName, - BloomFilterIndex.Builder bloomFilterIndex) throws IOException; - - /** - * Closes the writer. - */ - void close() throws IOException; - - /** - * Force-flushes the writer. - */ - void flush() throws IOException; - - /** - * @return the physical writer position (e.g. for updater). - */ - long getRawWriterPosition() throws IOException; - - /** @return physical stripe size, taking padding into account. */ - long getPhysicalStripeSize(); - - /** @return whether the writer is compressed. */ - boolean isCompressed(); - - /** - * Appends raw stripe data (e.g. for file merger). - * @param stripe Stripe data buffer. - * @param offset Stripe data buffer offset. - * @param length Stripe data buffer length. - * @param dirEntry File metadata entry for the stripe, to be updated with relevant data. - * @throws IOException - */ - void appendRawStripe(byte[] stripe, int offset, int length, - StripeInformation.Builder dirEntry) throws IOException; - - /** - * @return the estimated memory usage for the stripe. - */ - long estimateMemory(); -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/PositionProvider.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/PositionProvider.java b/orc/src/java/org/apache/orc/impl/PositionProvider.java deleted file mode 100644 index 47cf481..0000000 --- a/orc/src/java/org/apache/orc/impl/PositionProvider.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -/** - * An interface used for seeking to a row index. - */ -public interface PositionProvider { - long getNext(); -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/PositionRecorder.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/PositionRecorder.java b/orc/src/java/org/apache/orc/impl/PositionRecorder.java deleted file mode 100644 index 1fff760..0000000 --- a/orc/src/java/org/apache/orc/impl/PositionRecorder.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -/** - * An interface for recording positions in a stream. - */ -public interface PositionRecorder { - void addPosition(long offset); -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/PositionedOutputStream.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/PositionedOutputStream.java b/orc/src/java/org/apache/orc/impl/PositionedOutputStream.java deleted file mode 100644 index d412939..0000000 --- a/orc/src/java/org/apache/orc/impl/PositionedOutputStream.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import java.io.IOException; -import java.io.OutputStream; - -public abstract class PositionedOutputStream extends OutputStream { - - /** - * Record the current position to the recorder. - * @param recorder the object that receives the position - * @throws IOException - */ - public abstract void getPosition(PositionRecorder recorder - ) throws IOException; - - /** - * Get the memory size currently allocated as buffer associated with this - * stream. - * @return the number of bytes used by buffers. - */ - public abstract long getBufferSize(); -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/ReaderImpl.java b/orc/src/java/org/apache/orc/impl/ReaderImpl.java deleted file mode 100644 index 70fa628..0000000 --- a/orc/src/java/org/apache/orc/impl/ReaderImpl.java +++ /dev/null @@ -1,764 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.orc.CompressionKind; -import org.apache.orc.FileMetadata; -import org.apache.orc.OrcFile; -import org.apache.orc.OrcUtils; -import org.apache.orc.Reader; -import org.apache.orc.RecordReader; -import org.apache.orc.TypeDescription; -import org.apache.orc.ColumnStatistics; -import org.apache.orc.CompressionCodec; -import org.apache.orc.FileFormatException; -import org.apache.orc.StripeInformation; -import org.apache.orc.StripeStatistics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.io.DiskRange; -import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.apache.hadoop.io.Text; -import org.apache.orc.OrcProto; - -import com.google.common.collect.Lists; -import com.google.protobuf.CodedInputStream; - -public class ReaderImpl implements Reader { - - private static final Logger LOG = LoggerFactory.getLogger(ReaderImpl.class); - - private static final int DIRECTORY_SIZE_GUESS = 16 * 1024; - - protected final FileSystem fileSystem; - private final long maxLength; - protected final Path path; - protected final org.apache.orc.CompressionKind compressionKind; - protected CompressionCodec codec; - protected int bufferSize; - protected OrcProto.Metadata metadata; - private List<OrcProto.StripeStatistics> stripeStats; - private final int metadataSize; - protected final List<OrcProto.Type> types; - private TypeDescription schema; - private final List<OrcProto.UserMetadataItem> userMetadata; - private final List<OrcProto.ColumnStatistics> fileStats; - private final List<StripeInformation> stripes; - protected final int rowIndexStride; - private final long contentLength, numberOfRows; - - private long deserializedSize = -1; - protected final Configuration conf; - private final List<Integer> versionList; - private final OrcFile.WriterVersion writerVersion; - - protected OrcTail tail; - - public static class StripeInformationImpl - implements StripeInformation { - private final OrcProto.StripeInformation stripe; - - public StripeInformationImpl(OrcProto.StripeInformation stripe) { - this.stripe = stripe; - } - - @Override - public long getOffset() { - return stripe.getOffset(); - } - - @Override - public long getLength() { - return stripe.getDataLength() + getIndexLength() + getFooterLength(); - } - - @Override - public long getDataLength() { - return stripe.getDataLength(); - } - - @Override - public long getFooterLength() { - return stripe.getFooterLength(); - } - - @Override - public long getIndexLength() { - return stripe.getIndexLength(); - } - - @Override - public long getNumberOfRows() { - return stripe.getNumberOfRows(); - } - - @Override - public String toString() { - return "offset: " + getOffset() + " data: " + getDataLength() + - " rows: " + getNumberOfRows() + " tail: " + getFooterLength() + - " index: " + getIndexLength(); - } - } - - @Override - public long getNumberOfRows() { - return numberOfRows; - } - - @Override - public List<String> getMetadataKeys() { - List<String> result = new ArrayList<String>(); - for(OrcProto.UserMetadataItem item: userMetadata) { - result.add(item.getName()); - } - return result; - } - - @Override - public ByteBuffer getMetadataValue(String key) { - for(OrcProto.UserMetadataItem item: userMetadata) { - if (item.hasName() && item.getName().equals(key)) { - return item.getValue().asReadOnlyByteBuffer(); - } - } - throw new IllegalArgumentException("Can't find user metadata " + key); - } - - public boolean hasMetadataValue(String key) { - for(OrcProto.UserMetadataItem item: userMetadata) { - if (item.hasName() && item.getName().equals(key)) { - return true; - } - } - return false; - } - - @Override - public org.apache.orc.CompressionKind getCompressionKind() { - return compressionKind; - } - - @Override - public int getCompressionSize() { - return bufferSize; - } - - @Override - public List<StripeInformation> getStripes() { - return stripes; - } - - @Override - public long getContentLength() { - return contentLength; - } - - @Override - public List<OrcProto.Type> getTypes() { - return types; - } - - @Override - public OrcFile.Version getFileVersion() { - for (OrcFile.Version version: OrcFile.Version.values()) { - if ((versionList != null && !versionList.isEmpty()) && - version.getMajor() == versionList.get(0) && - version.getMinor() == versionList.get(1)) { - return version; - } - } - return OrcFile.Version.V_0_11; - } - - @Override - public OrcFile.WriterVersion getWriterVersion() { - return writerVersion; - } - - @Override - public OrcProto.FileTail getFileTail() { - return tail.getFileTail(); - } - - @Override - public int getRowIndexStride() { - return rowIndexStride; - } - - @Override - public ColumnStatistics[] getStatistics() { - ColumnStatistics[] result = new ColumnStatistics[types.size()]; - for(int i=0; i < result.length; ++i) { - result[i] = ColumnStatisticsImpl.deserialize(fileStats.get(i)); - } - return result; - } - - @Override - public TypeDescription getSchema() { - return schema; - } - - /** - * Ensure this is an ORC file to prevent users from trying to read text - * files or RC files as ORC files. - * @param in the file being read - * @param path the filename for error messages - * @param psLen the postscript length - * @param buffer the tail of the file - * @throws IOException - */ - protected static void ensureOrcFooter(FSDataInputStream in, - Path path, - int psLen, - ByteBuffer buffer) throws IOException { - int magicLength = OrcFile.MAGIC.length(); - int fullLength = magicLength + 1; - if (psLen < fullLength || buffer.remaining() < fullLength) { - throw new FileFormatException("Malformed ORC file " + path + - ". Invalid postscript length " + psLen); - } - int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - fullLength; - byte[] array = buffer.array(); - // now look for the magic string at the end of the postscript. - if (!Text.decode(array, offset, magicLength).equals(OrcFile.MAGIC)) { - // If it isn't there, this may be the 0.11.0 version of ORC. - // Read the first 3 bytes of the file to check for the header - byte[] header = new byte[magicLength]; - in.readFully(0, header, 0, magicLength); - // if it isn't there, this isn't an ORC file - if (!Text.decode(header, 0 , magicLength).equals(OrcFile.MAGIC)) { - throw new FileFormatException("Malformed ORC file " + path + - ". Invalid postscript."); - } - } - } - - /** - * Ensure this is an ORC file to prevent users from trying to read text - * files or RC files as ORC files. - * @param psLen the postscript length - * @param buffer the tail of the file - * @throws IOException - */ - protected static void ensureOrcFooter(ByteBuffer buffer, int psLen) throws IOException { - int magicLength = OrcFile.MAGIC.length(); - int fullLength = magicLength + 1; - if (psLen < fullLength || buffer.remaining() < fullLength) { - throw new FileFormatException("Malformed ORC file. Invalid postscript length " + psLen); - } - - int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - fullLength; - byte[] array = buffer.array(); - // now look for the magic string at the end of the postscript. - if (!Text.decode(array, offset, magicLength).equals(OrcFile.MAGIC)) { - // if it isn't there, this may be 0.11.0 version of the ORC file. - // Read the first 3 bytes from the buffer to check for the header - if (!Text.decode(buffer.array(), 0, magicLength).equals(OrcFile.MAGIC)) { - throw new FileFormatException("Malformed ORC file. Invalid postscript length " + psLen); - } - } - } - - /** - * Build a version string out of an array. - * @param version the version number as a list - * @return the human readable form of the version string - */ - private static String versionString(List<Integer> version) { - StringBuilder buffer = new StringBuilder(); - for(int i=0; i < version.size(); ++i) { - if (i != 0) { - buffer.append('.'); - } - buffer.append(version.get(i)); - } - return buffer.toString(); - } - - /** - * Check to see if this ORC file is from a future version and if so, - * warn the user that we may not be able to read all of the column encodings. - * @param log the logger to write any error message to - * @param path the data source path for error messages - * @param version the version of hive that wrote the file. - */ - protected static void checkOrcVersion(Logger log, Path path, - List<Integer> version) { - if (version.size() >= 1) { - int major = version.get(0); - int minor = 0; - if (version.size() >= 2) { - minor = version.get(1); - } - if (major > OrcFile.Version.CURRENT.getMajor() || - (major == OrcFile.Version.CURRENT.getMajor() && - minor > OrcFile.Version.CURRENT.getMinor())) { - log.warn(path + " was written by a future Hive version " + - versionString(version) + - ". This file may not be readable by this version of Hive."); - } - } - } - - /** - * Constructor that let's the user specify additional options. - * @param path pathname for file - * @param options options for reading - * @throws IOException - */ - public ReaderImpl(Path path, OrcFile.ReaderOptions options) throws IOException { - FileSystem fs = options.getFilesystem(); - if (fs == null) { - fs = path.getFileSystem(options.getConfiguration()); - } - this.fileSystem = fs; - this.path = path; - this.conf = options.getConfiguration(); - this.maxLength = options.getMaxLength(); - FileMetadata fileMetadata = options.getFileMetadata(); - if (fileMetadata != null) { - this.compressionKind = fileMetadata.getCompressionKind(); - this.bufferSize = fileMetadata.getCompressionBufferSize(); - this.codec = PhysicalFsWriter.createCodec(compressionKind); - this.metadataSize = fileMetadata.getMetadataSize(); - this.stripeStats = fileMetadata.getStripeStats(); - this.versionList = fileMetadata.getVersionList(); - this.writerVersion = - OrcFile.WriterVersion.from(fileMetadata.getWriterVersionNum()); - this.types = fileMetadata.getTypes(); - this.rowIndexStride = fileMetadata.getRowIndexStride(); - this.contentLength = fileMetadata.getContentLength(); - this.numberOfRows = fileMetadata.getNumberOfRows(); - this.fileStats = fileMetadata.getFileStats(); - this.stripes = fileMetadata.getStripes(); - this.userMetadata = null; // not cached and not needed here - } else { - OrcTail orcTail = options.getOrcTail(); - if (orcTail == null) { - tail = extractFileTail(fs, path, options.getMaxLength()); - options.orcTail(tail); - } else { - tail = orcTail; - } - this.compressionKind = tail.getCompressionKind(); - this.codec = tail.getCompressionCodec(); - this.bufferSize = tail.getCompressionBufferSize(); - this.metadataSize = tail.getMetadataSize(); - this.versionList = tail.getPostScript().getVersionList(); - this.types = tail.getFooter().getTypesList(); - this.rowIndexStride = tail.getFooter().getRowIndexStride(); - this.contentLength = tail.getFooter().getContentLength(); - this.numberOfRows = tail.getFooter().getNumberOfRows(); - this.userMetadata = tail.getFooter().getMetadataList(); - this.fileStats = tail.getFooter().getStatisticsList(); - this.writerVersion = tail.getWriterVersion(); - this.stripes = tail.getStripes(); - this.stripeStats = tail.getStripeStatisticsProto(); - } - this.schema = OrcUtils.convertTypeFromProtobuf(this.types, 0); - } - - /** - * Get the WriterVersion based on the ORC file postscript. - * @param writerVersion the integer writer version - * @return the version of the software that produced the file - */ - public static OrcFile.WriterVersion getWriterVersion(int writerVersion) { - for(OrcFile.WriterVersion version: OrcFile.WriterVersion.values()) { - if (version.getId() == writerVersion) { - return version; - } - } - return OrcFile.WriterVersion.FUTURE; - } - - private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos, - int footerSize, CompressionCodec codec, int bufferSize) throws IOException { - bb.position(footerAbsPos); - bb.limit(footerAbsPos + footerSize); - return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer", - Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), footerSize, codec, bufferSize)); - } - - public static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos, - int metadataSize, CompressionCodec codec, int bufferSize) throws IOException { - bb.position(metadataAbsPos); - bb.limit(metadataAbsPos + metadataSize); - return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata", - Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize)); - } - - private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path, - int psLen, int psAbsOffset) throws IOException { - // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here. - assert bb.hasArray(); - CodedInputStream in = CodedInputStream.newInstance( - bb.array(), bb.arrayOffset() + psAbsOffset, psLen); - OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in); - checkOrcVersion(LOG, path, ps.getVersionList()); - - // Check compression codec. - switch (ps.getCompression()) { - case NONE: - break; - case ZLIB: - break; - case SNAPPY: - break; - case LZO: - break; - default: - throw new IllegalArgumentException("Unknown compression"); - } - return ps; - } - - public static OrcTail extractFileTail(ByteBuffer buffer) - throws IOException { - return extractFileTail(buffer, -1, -1); - } - - public static OrcTail extractFileTail(ByteBuffer buffer, long fileLength, long modificationTime) - throws IOException { - int readSize = buffer.limit(); - int psLen = buffer.get(readSize - 1) & 0xff; - int psOffset = readSize - 1 - psLen; - ensureOrcFooter(buffer, psLen); - byte[] psBuffer = new byte[psLen]; - System.arraycopy(buffer.array(), psOffset, psBuffer, 0, psLen); - OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(psBuffer); - int footerSize = (int) ps.getFooterLength(); - CompressionCodec codec = PhysicalFsWriter - .createCodec(CompressionKind.valueOf(ps.getCompression().name())); - OrcProto.Footer footer = extractFooter(buffer, - (int) (buffer.position() + ps.getMetadataLength()), - footerSize, codec, (int) ps.getCompressionBlockSize()); - OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder() - .setPostscriptLength(psLen) - .setPostscript(ps) - .setFooter(footer) - .setFileLength(fileLength); - // clear does not clear the contents but sets position to 0 and limit = capacity - buffer.clear(); - return new OrcTail(fileTailBuilder.build(), buffer.slice(), modificationTime); - } - - protected OrcTail extractFileTail(FileSystem fs, Path path, - long maxFileLength) throws IOException { - FSDataInputStream file = fs.open(path); - ByteBuffer buffer; - OrcProto.PostScript ps; - OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder(); - long modificationTime; - try { - // figure out the size of the file using the option or filesystem - long size; - if (maxFileLength == Long.MAX_VALUE) { - FileStatus fileStatus = fs.getFileStatus(path); - size = fileStatus.getLen(); - modificationTime = fileStatus.getModificationTime(); - } else { - size = maxFileLength; - modificationTime = -1; - } - fileTailBuilder.setFileLength(size); - - //read last bytes into buffer to get PostScript - int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS); - buffer = ByteBuffer.allocate(readSize); - assert buffer.position() == 0; - file.readFully((size - readSize), - buffer.array(), buffer.arrayOffset(), readSize); - buffer.position(0); - - //read the PostScript - //get length of PostScript - int psLen = buffer.get(readSize - 1) & 0xff; - ensureOrcFooter(file, path, psLen, buffer); - int psOffset = readSize - 1 - psLen; - ps = extractPostScript(buffer, path, psLen, psOffset); - bufferSize = (int) ps.getCompressionBlockSize(); - codec = PhysicalFsWriter.createCodec(CompressionKind.valueOf(ps.getCompression().name())); - fileTailBuilder.setPostscriptLength(psLen).setPostscript(ps); - - int footerSize = (int) ps.getFooterLength(); - int metadataSize = (int) ps.getMetadataLength(); - - //check if extra bytes need to be read - int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize); - int tailSize = 1 + psLen + footerSize + metadataSize; - if (extra > 0) { - //more bytes need to be read, seek back to the right place and read extra bytes - ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize); - file.readFully((size - readSize - extra), extraBuf.array(), - extraBuf.arrayOffset() + extraBuf.position(), extra); - extraBuf.position(extra); - //append with already read bytes - extraBuf.put(buffer); - buffer = extraBuf; - buffer.position(0); - buffer.limit(tailSize); - readSize += extra; - psOffset = readSize - 1 - psLen; - } else { - //footer is already in the bytes in buffer, just adjust position, length - buffer.position(psOffset - footerSize - metadataSize); - buffer.limit(buffer.position() + tailSize); - } - - buffer.mark(); - int footerOffset = psOffset - footerSize; - buffer.position(footerOffset); - ByteBuffer footerBuffer = buffer.slice(); - buffer.reset(); - OrcProto.Footer footer = extractFooter(footerBuffer, 0, footerSize, - codec, bufferSize); - fileTailBuilder.setFooter(footer); - } finally { - try { - file.close(); - } catch (IOException ex) { - LOG.error("Failed to close the file after another error", ex); - } - } - - ByteBuffer serializedTail = ByteBuffer.allocate(buffer.remaining()); - serializedTail.put(buffer.slice()); - serializedTail.rewind(); - return new OrcTail(fileTailBuilder.build(), serializedTail, modificationTime); - } - - @Override - public ByteBuffer getSerializedFileFooter() { - return tail.getSerializedTail(); - } - - @Override - public RecordReader rows() throws IOException { - return rows(new Options()); - } - - @Override - public RecordReader rows(Options options) throws IOException { - LOG.info("Reading ORC rows from " + path + " with " + options); - return new RecordReaderImpl(this, options); - } - - - @Override - public long getRawDataSize() { - // if the deserializedSize is not computed, then compute it, else - // return the already computed size. since we are reading from the footer - // we don't have to compute deserialized size repeatedly - if (deserializedSize == -1) { - List<Integer> indices = Lists.newArrayList(); - for (int i = 0; i < fileStats.size(); ++i) { - indices.add(i); - } - deserializedSize = getRawDataSizeFromColIndices(indices); - } - return deserializedSize; - } - - @Override - public long getRawDataSizeFromColIndices(List<Integer> colIndices) { - return getRawDataSizeFromColIndices(colIndices, types, fileStats); - } - - public static long getRawDataSizeFromColIndices( - List<Integer> colIndices, List<OrcProto.Type> types, - List<OrcProto.ColumnStatistics> stats) { - long result = 0; - for (int colIdx : colIndices) { - result += getRawDataSizeOfColumn(colIdx, types, stats); - } - return result; - } - - private static long getRawDataSizeOfColumn(int colIdx, List<OrcProto.Type> types, - List<OrcProto.ColumnStatistics> stats) { - OrcProto.ColumnStatistics colStat = stats.get(colIdx); - long numVals = colStat.getNumberOfValues(); - OrcProto.Type type = types.get(colIdx); - - switch (type.getKind()) { - case BINARY: - // old orc format doesn't support binary statistics. checking for binary - // statistics is not required as protocol buffers takes care of it. - return colStat.getBinaryStatistics().getSum(); - case STRING: - case CHAR: - case VARCHAR: - // old orc format doesn't support sum for string statistics. checking for - // existence is not required as protocol buffers takes care of it. - - // ORC strings are deserialized to java strings. so use java data model's - // string size - numVals = numVals == 0 ? 1 : numVals; - int avgStrLen = (int) (colStat.getStringStatistics().getSum() / numVals); - return numVals * JavaDataModel.get().lengthForStringOfLength(avgStrLen); - case TIMESTAMP: - return numVals * JavaDataModel.get().lengthOfTimestamp(); - case DATE: - return numVals * JavaDataModel.get().lengthOfDate(); - case DECIMAL: - return numVals * JavaDataModel.get().lengthOfDecimal(); - case DOUBLE: - case LONG: - return numVals * JavaDataModel.get().primitive2(); - case FLOAT: - case INT: - case SHORT: - case BOOLEAN: - case BYTE: - return numVals * JavaDataModel.get().primitive1(); - default: - LOG.debug("Unknown primitive category: " + type.getKind()); - break; - } - - return 0; - } - - @Override - public long getRawDataSizeOfColumns(List<String> colNames) { - List<Integer> colIndices = getColumnIndicesFromNames(colNames); - return getRawDataSizeFromColIndices(colIndices); - } - - private List<Integer> getColumnIndicesFromNames(List<String> colNames) { - // top level struct - OrcProto.Type type = types.get(0); - List<Integer> colIndices = Lists.newArrayList(); - List<String> fieldNames = type.getFieldNamesList(); - int fieldIdx; - for (String colName : colNames) { - if (fieldNames.contains(colName)) { - fieldIdx = fieldNames.indexOf(colName); - } else { - String s = "Cannot find field for: " + colName + " in "; - for (String fn : fieldNames) { - s += fn + ", "; - } - LOG.warn(s); - continue; - } - - // a single field may span multiple columns. find start and end column - // index for the requested field - int idxStart = type.getSubtypes(fieldIdx); - - int idxEnd; - - // if the specified is the last field and then end index will be last - // column index - if (fieldIdx + 1 > fieldNames.size() - 1) { - idxEnd = getLastIdx() + 1; - } else { - idxEnd = type.getSubtypes(fieldIdx + 1); - } - - // if start index and end index are same then the field is a primitive - // field else complex field (like map, list, struct, union) - if (idxStart == idxEnd) { - // simple field - colIndices.add(idxStart); - } else { - // complex fields spans multiple columns - for (int i = idxStart; i < idxEnd; i++) { - colIndices.add(i); - } - } - } - return colIndices; - } - - private int getLastIdx() { - Set<Integer> indices = new HashSet<>(); - for (OrcProto.Type type : types) { - indices.addAll(type.getSubtypesList()); - } - return Collections.max(indices); - } - - @Override - public List<OrcProto.StripeStatistics> getOrcProtoStripeStatistics() { - return stripeStats; - } - - @Override - public List<OrcProto.ColumnStatistics> getOrcProtoFileStatistics() { - return fileStats; - } - - @Override - public List<StripeStatistics> getStripeStatistics() throws IOException { - if (stripeStats == null && metadata == null) { - metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, codec, bufferSize); - stripeStats = metadata.getStripeStatsList(); - } - List<StripeStatistics> result = new ArrayList<>(); - for (OrcProto.StripeStatistics ss : stripeStats) { - result.add(new StripeStatistics(ss.getColStatsList())); - } - return result; - } - - public List<OrcProto.UserMetadataItem> getOrcProtoUserMetadata() { - return userMetadata; - } - - @Override - public List<Integer> getVersionList() { - return versionList; - } - - @Override - public int getMetadataSize() { - return metadataSize; - } - - @Override - public String toString() { - StringBuilder buffer = new StringBuilder(); - buffer.append("ORC Reader("); - buffer.append(path); - if (maxLength != -1) { - buffer.append(", "); - buffer.append(maxLength); - } - buffer.append(")"); - return buffer.toString(); - } -}
