[
https://issues.apache.org/jira/browse/TAJO-1465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14642633#comment-14642633
]
ASF GitHub Bot commented on TAJO-1465:
--------------------------------------
Github user hyunsik commented on a diff in the pull request:
https://github.com/apache/tajo/pull/652#discussion_r35528682
--- Diff:
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java
---
@@ -0,0 +1,2251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.thirdparty.orc.CompressionCodec.Modifier;
+import org.apache.tajo.storage.thirdparty.orc.OrcProto.RowIndexEntry;
+import org.apache.tajo.storage.thirdparty.orc.OrcProto.StripeStatistics;
+import org.apache.tajo.storage.thirdparty.orc.OrcProto.Type;
+import org.apache.tajo.storage.thirdparty.orc.OrcProto.UserMetadataItem;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.io.Text;
+import org.apache.tajo.util.datetime.DateTimeUtil;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.*;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * An ORC file writer. The file is divided into stripes, which is the
natural
+ * unit of work when reading. Each stripe is buffered in memory until the
+ * memory reaches the stripe size and then it is written out broken down by
+ * columns. Each column is written by a TreeWriter that is specific to that
+ * type of column. TreeWriters may have children TreeWriters that handle
the
+ * sub-types. Each of the TreeWriters writes the column's data as a set of
+ * streams.
+ *
+ * This class is unsynchronized like most Stream objects, so from the
creation of an OrcFile and all
+ * access to a single instance has to be from a single thread.
+ *
+ * There are no known cases where these happen between different threads
today.
+ *
+ * Caveat: the MemoryManager is created during WriterOptions create, that
has to be confined to a single
+ * thread as well.
+ *
+ */
+public class WriterImpl implements Writer, MemoryManager.Callback {
+
+ private static final Log LOG = LogFactory.getLog(WriterImpl.class);
+
+ private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+ private static final int MIN_ROW_INDEX_STRIDE = 1000;
+
+ // threshold above which buffer size will be automatically resized
+ private static final int COLUMN_COUNT_THRESHOLD = 1000;
+
+ private final FileSystem fs;
+ private final Path path;
+ private final long defaultStripeSize;
+ private long adjustedStripeSize;
+ private final int rowIndexStride;
+ private final CompressionKind compress;
+ private final CompressionCodec codec;
+ private final boolean addBlockPadding;
+ private final int bufferSize;
+ private final long blockSize;
+ private final float paddingTolerance;
+ // the streams that make up the current stripe
+ private final Map<StreamName, BufferedStream> streams =
+ new TreeMap<StreamName, BufferedStream>();
+
+ private FSDataOutputStream rawWriter = null;
+ // the compressed metadata information outStream
+ private OutStream writer = null;
+ // a protobuf outStream around streamFactory
+ private CodedOutputStream protobufWriter = null;
+ private long headerLength;
+ private int columnCount;
+ private long rowCount = 0;
+ private long rowsInStripe = 0;
+ private long rawDataSize = 0;
+ private int rowsInIndex = 0;
+ private int stripesAtLastFlush = -1;
+ private final List<OrcProto.StripeInformation> stripes =
+ new ArrayList<OrcProto.StripeInformation>();
+ private final Map<String, ByteString> userMetadata =
+ new TreeMap<String, ByteString>();
+ private final TreeWriter treeWriter;
+ private final boolean buildIndex;
+ private final MemoryManager memoryManager;
+ private final OrcFile.Version version;
+ private final Configuration conf;
+ private final OrcFile.WriterCallback callback;
+ private final OrcFile.WriterContext callbackContext;
+ private final OrcFile.EncodingStrategy encodingStrategy;
+ private final OrcFile.CompressionStrategy compressionStrategy;
+ private final boolean[] bloomFilterColumns;
+ private final double bloomFilterFpp;
+ private boolean writeTimeZone;
+
+ WriterImpl(FileSystem fs,
+ Path path,
+ Configuration conf,
+ ObjectInspector inspector,
+ long stripeSize,
+ CompressionKind compress,
+ int bufferSize,
+ int rowIndexStride,
+ MemoryManager memoryManager,
+ boolean addBlockPadding,
+ OrcFile.Version version,
+ OrcFile.WriterCallback callback,
+ OrcFile.EncodingStrategy encodingStrategy,
+ OrcFile.CompressionStrategy compressionStrategy,
+ float paddingTolerance,
+ long blockSizeValue,
+ String bloomFilterColumnNames,
+ double bloomFilterFpp) throws IOException {
+ this.fs = fs;
+ this.path = path;
+ this.conf = conf;
+ this.callback = callback;
+ if (callback != null) {
+ callbackContext = new OrcFile.WriterContext(){
+
+ @Override
+ public Writer getWriter() {
+ return WriterImpl.this;
+ }
+ };
+ } else {
+ callbackContext = null;
+ }
+ this.adjustedStripeSize = stripeSize;
+ this.defaultStripeSize = stripeSize;
+ this.version = version;
+ this.encodingStrategy = encodingStrategy;
+ this.compressionStrategy = compressionStrategy;
+ this.addBlockPadding = addBlockPadding;
+ this.blockSize = blockSizeValue;
+ this.paddingTolerance = paddingTolerance;
+ this.compress = compress;
+ this.rowIndexStride = rowIndexStride;
+ this.memoryManager = memoryManager;
+ buildIndex = rowIndexStride > 0;
+ codec = createCodec(compress);
+ String allColumns = conf.get(IOConstants.COLUMNS);
+ if (allColumns == null) {
+ allColumns = getColumnNamesFromInspector(inspector);
+ }
+ this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize);
+ if (version == OrcFile.Version.V_0_11) {
+ /* do not write bloom filters for ORC v11 */
+ this.bloomFilterColumns =
+ OrcUtils.includeColumns(null, allColumns, inspector);
+ } else {
+ this.bloomFilterColumns =
+ OrcUtils.includeColumns(bloomFilterColumnNames, allColumns,
inspector);
+ }
+ this.bloomFilterFpp = bloomFilterFpp;
+ treeWriter = createTreeWriter(inspector, new StreamFactory(), false);
+ if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
+ throw new IllegalArgumentException("Row stride must be at least " +
+ MIN_ROW_INDEX_STRIDE);
+ }
+
+ // ensure that we are able to handle callbacks before we register
ourselves
+ memoryManager.addWriter(path, stripeSize, this);
+ }
+
+ private String getColumnNamesFromInspector(ObjectInspector inspector) {
+ List<String> fieldNames = Lists.newArrayList();
+ Joiner joiner = Joiner.on(",");
+ if (inspector instanceof StructObjectInspector) {
+ StructObjectInspector soi = (StructObjectInspector) inspector;
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+ for(StructField sf : fields) {
+ fieldNames.add(sf.getFieldName());
+ }
+ }
+ return joiner.join(fieldNames);
+ }
+
+ @VisibleForTesting
+ int getEstimatedBufferSize(int bs) {
+ return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs);
+ }
+
+ int getEstimatedBufferSize(String colNames, int bs) {
+ long availableMem = getMemoryAvailableForORC();
+ if (colNames != null) {
+ final int numCols = colNames.split(",").length;
+ if (numCols > COLUMN_COUNT_THRESHOLD) {
+ // In BufferedStream, there are 3 outstream buffers (compressed,
+ // uncompressed and overflow) and list of previously compressed
buffers.
+ // Since overflow buffer is rarely used, lets consider only 2
allocation.
+ // Also, initially, the list of compression buffers will be empty.
+ final int outStreamBuffers = codec == null ? 1 : 2;
+
+ // max possible streams per column is 5. For string columns, there
is
+ // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams.
+ final int maxStreams = 5;
+
+ // Lets assume 10% memory for holding dictionary in memory and
other
+ // object allocations
+ final long miscAllocation = (long) (0.1f * availableMem);
+
+ // compute the available memory
+ final long remainingMem = availableMem - miscAllocation;
+
+ int estBufferSize = (int) (remainingMem /
+ (maxStreams * outStreamBuffers * numCols));
+ estBufferSize = getClosestBufferSize(estBufferSize, bs);
+ if (estBufferSize > bs) {
+ estBufferSize = bs;
+ }
+
+ LOG.info("WIDE TABLE - Number of columns: " + numCols +
+ " Chosen compression buffer size: " + estBufferSize);
+ return estBufferSize;
+ }
+ }
+ return bs;
+ }
+
+ private int getClosestBufferSize(int estBufferSize, int bs) {
+ final int kb4 = 4 * 1024;
+ final int kb8 = 8 * 1024;
+ final int kb16 = 16 * 1024;
+ final int kb32 = 32 * 1024;
+ final int kb64 = 64 * 1024;
+ final int kb128 = 128 * 1024;
+ final int kb256 = 256 * 1024;
+ if (estBufferSize <= kb4) {
+ return kb4;
+ } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
+ return kb8;
+ } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
+ return kb16;
+ } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
+ return kb32;
+ } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
+ return kb64;
+ } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
+ return kb128;
+ } else {
+ return kb256;
+ }
+ }
+
+ // the assumption is only one ORC writer open at a time, which holds
true for
+ // most of the cases. HIVE-6455 forces single writer case.
+ private long getMemoryAvailableForORC() {
+ OrcConf.ConfVars poolVar = OrcConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL;
+ double maxLoad = conf.getFloat(poolVar.varname,
poolVar.defaultFloatVal);
+ long totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
+ getHeapMemoryUsage().getMax() * maxLoad);
+ return totalMemoryPool;
+ }
+
+ public static CompressionCodec createCodec(CompressionKind kind) {
+ switch (kind) {
+ case NONE:
+ return null;
+ case ZLIB:
+ return new ZlibCodec();
+ case SNAPPY:
+ return new SnappyCodec();
+ case LZO:
+ try {
+ Class<? extends CompressionCodec> lzo =
+ (Class<? extends CompressionCodec>)
+
Class.forName("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
+ return lzo.newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("LZO is not available.", e);
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Problem initializing LZO",
e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Insufficient access to LZO",
e);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown compression codec: " +
+ kind);
+ }
+ }
+
+ @Override
+ public boolean checkMemory(double newScale) throws IOException {
+ long limit = (long) Math.round(adjustedStripeSize * newScale);
+ long size = estimateStripeSize();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
+ limit);
+ }
+ if (size > limit) {
+ flushStripe();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * This class is used to hold the contents of streams as they are
buffered.
+ * The TreeWriters write to the outStream and the codec compresses the
+ * data as buffers fill up and stores them in the output list. When the
+ * stripe is being written, the whole stream is written to the file.
+ */
+ private class BufferedStream implements OutStream.OutputReceiver {
+ private final OutStream outStream;
+ private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
+
+ BufferedStream(String name, int bufferSize,
+ CompressionCodec codec) throws IOException {
+ outStream = new OutStream(name, bufferSize, codec, this);
+ }
+
+ /**
+ * Receive a buffer from the compression codec.
+ * @param buffer the buffer to save
+ * @throws IOException
+ */
+ @Override
+ public void output(ByteBuffer buffer) {
+ output.add(buffer);
+ }
+
+ /**
+ * Get the number of bytes in buffers that are allocated to this
stream.
+ * @return number of bytes in buffers
+ */
+ public long getBufferSize() {
+ long result = 0;
+ for(ByteBuffer buf: output) {
+ result += buf.capacity();
+ }
+ return outStream.getBufferSize() + result;
+ }
+
+ /**
+ * Flush the stream to the codec.
+ * @throws IOException
+ */
+ public void flush() throws IOException {
+ outStream.flush();
+ }
+
+ /**
+ * Clear all of the buffers.
+ * @throws IOException
+ */
+ public void clear() throws IOException {
+ outStream.clear();
+ output.clear();
+ }
+
+ /**
+ * Check the state of suppress flag in output stream
+ * @return value of suppress flag
+ */
+ public boolean isSuppressed() {
+ return outStream.isSuppressed();
+ }
+
+ /**
+ * Get the number of bytes that will be written to the output. Assumes
+ * the stream has already been flushed.
+ * @return the number of bytes
+ */
+ public long getOutputSize() {
+ long result = 0;
+ for(ByteBuffer buffer: output) {
+ result += buffer.remaining();
+ }
+ return result;
+ }
+
+ /**
+ * Write the saved compressed buffers to the OutputStream.
+ * @param out the stream to write to
+ * @throws IOException
+ */
+ void spillTo(OutputStream out) throws IOException {
+ for(ByteBuffer buffer: output) {
+ out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return outStream.toString();
+ }
+ }
+
+ /**
+ * An output receiver that writes the ByteBuffers to the output stream
+ * as they are received.
+ */
+ private class DirectStream implements OutStream.OutputReceiver {
+ private final FSDataOutputStream output;
+
+ DirectStream(FSDataOutputStream output) {
+ this.output = output;
+ }
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ output.write(buffer.array(), buffer.arrayOffset() +
buffer.position(),
+ buffer.remaining());
+ }
+ }
+
+ private static class RowIndexPositionRecorder implements
PositionRecorder {
+ private final OrcProto.RowIndexEntry.Builder builder;
+
+ RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public void addPosition(long position) {
+ builder.addPositions(position);
+ }
+ }
+
+ /**
+ * Interface from the Writer to the TreeWriters. This limits the
visibility
+ * that the TreeWriters have into the Writer.
+ */
+ private class StreamFactory {
+ /**
+ * Create a stream to store part of a column.
+ * @param column the column id for the stream
+ * @param kind the kind of stream
+ * @return The output outStream that the section needs to be written
to.
+ * @throws IOException
+ */
+ public OutStream createStream(int column,
+ OrcProto.Stream.Kind kind
+ ) throws IOException {
+ final StreamName name = new StreamName(column, kind);
+ final EnumSet<CompressionCodec.Modifier> modifiers;
+
+ switch (kind) {
+ case BLOOM_FILTER:
+ case DATA:
+ case DICTIONARY_DATA:
+ if (getCompressionStrategy() ==
OrcFile.CompressionStrategy.SPEED) {
+ modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT);
+ } else {
+ modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT);
+ }
+ break;
+ case LENGTH:
+ case DICTIONARY_COUNT:
+ case PRESENT:
+ case ROW_INDEX:
+ case SECONDARY:
+ // easily compressed using the fastest modes
+ modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY);
+ break;
+ default:
+ LOG.warn("Missing ORC compression modifiers for " + kind);
+ modifiers = null;
+ break;
+ }
+
+ BufferedStream result = streams.get(name);
+ if (result == null) {
+ result = new BufferedStream(name.toString(), bufferSize,
+ codec == null ? codec : codec.modify(modifiers));
+ streams.put(name, result);
+ }
+ return result.outStream;
+ }
+
+ /**
+ * Get the next column id.
+ * @return a number from 0 to the number of columns - 1
+ */
+ public int getNextColumnId() {
+ return columnCount++;
+ }
+
+ /**
+ * Get the current column id. After creating all tree writers this
count should tell how many
+ * columns (including columns within nested complex objects) are
created in total.
+ * @return current column id
+ */
+ public int getCurrentColumnId() {
+ return columnCount;
+ }
+
+ /**
+ * Get the stride rate of the row index.
+ */
+ public int getRowIndexStride() {
+ return rowIndexStride;
+ }
+
+ /**
+ * Should be building the row index.
+ * @return true if we are building the index
+ */
+ public boolean buildIndex() {
+ return buildIndex;
+ }
+
+ /**
+ * Is the ORC file compressed?
+ * @return are the streams compressed
+ */
+ public boolean isCompressed() {
+ return codec != null;
+ }
+
+ /**
+ * Get the encoding strategy to use.
+ * @return encoding strategy
+ */
+ public OrcFile.EncodingStrategy getEncodingStrategy() {
+ return encodingStrategy;
+ }
+
+ /**
+ * Get the compression strategy to use.
+ * @return compression strategy
+ */
+ public OrcFile.CompressionStrategy getCompressionStrategy() {
+ return compressionStrategy;
+ }
+
+ /**
+ * Get the bloom filter columns
+ * @return bloom filter columns
+ */
+ public boolean[] getBloomFilterColumns() {
+ return bloomFilterColumns;
+ }
+
+ /**
+ * Get bloom filter false positive percentage.
+ * @return fpp
+ */
+ public double getBloomFilterFPP() {
+ return bloomFilterFpp;
+ }
+
+ /**
+ * Get the writer's configuration.
+ * @return configuration
+ */
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * Get the version of the file to write.
+ */
+ public OrcFile.Version getVersion() {
+ return version;
+ }
+
+ public void useWriterTimeZone(boolean val) {
+ writeTimeZone = val;
+ }
+
+ public boolean hasWriterTimeZone() {
+ return writeTimeZone;
+ }
+ }
+
+ /**
+ * The parent class of all of the writers for each column. Each column
+ * is written by an instance of this class. The compound types (struct,
+ * list, map, and union) have children tree writers that write the
children
+ * types.
+ */
+ private abstract static class TreeWriter {
+ protected final int id;
+ protected final ObjectInspector inspector;
+ private final BitFieldWriter isPresent;
+ private final boolean isCompressed;
+ protected final ColumnStatisticsImpl indexStatistics;
+ protected final ColumnStatisticsImpl stripeColStatistics;
+ private final ColumnStatisticsImpl fileStatistics;
+ protected TreeWriter[] childrenWriters;
+ protected final RowIndexPositionRecorder rowIndexPosition;
+ private final OrcProto.RowIndex.Builder rowIndex;
+ private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
+ private final PositionedOutputStream rowIndexStream;
+ private final PositionedOutputStream bloomFilterStream;
+ protected final BloomFilterIO bloomFilter;
+ protected final boolean createBloomFilter;
+ private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
+ private final OrcProto.BloomFilter.Builder bloomFilterEntry;
+ private boolean foundNulls;
+ private OutStream isPresentOutStream;
+ private final List<StripeStatistics.Builder> stripeStatsBuilders;
+ private final StreamFactory streamFactory;
+
+ /**
+ * Create a tree writer.
+ * @param columnId the column id of the column to write
+ * @param inspector the object inspector to use
+ * @param streamFactory limited access to the Writer's data.
+ * @param nullable can the value be null?
+ * @throws IOException
+ */
+ TreeWriter(int columnId, ObjectInspector inspector,
+ StreamFactory streamFactory,
+ boolean nullable) throws IOException {
+ this.streamFactory = streamFactory;
+ this.isCompressed = streamFactory.isCompressed();
+ this.id = columnId;
+ this.inspector = inspector;
+ if (nullable) {
+ isPresentOutStream = streamFactory.createStream(id,
+ OrcProto.Stream.Kind.PRESENT);
+ isPresent = new BitFieldWriter(isPresentOutStream, 1);
+ } else {
+ isPresent = null;
+ }
+ this.foundNulls = false;
+ createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
+ indexStatistics = ColumnStatisticsImpl.create(inspector);
+ stripeColStatistics = ColumnStatisticsImpl.create(inspector);
+ fileStatistics = ColumnStatisticsImpl.create(inspector);
+ childrenWriters = new TreeWriter[0];
+ rowIndex = OrcProto.RowIndex.newBuilder();
+ rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
+ rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
+ stripeStatsBuilders = Lists.newArrayList();
+ if (streamFactory.buildIndex()) {
+ rowIndexStream = streamFactory.createStream(id,
OrcProto.Stream.Kind.ROW_INDEX);
+ } else {
+ rowIndexStream = null;
+ }
+ if (createBloomFilter) {
+ bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
+ bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
+ bloomFilterStream = streamFactory.createStream(id,
OrcProto.Stream.Kind.BLOOM_FILTER);
+ bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
+ streamFactory.getBloomFilterFPP());
+ } else {
+ bloomFilterEntry = null;
+ bloomFilterIndex = null;
+ bloomFilterStream = null;
+ bloomFilter = null;
+ }
+ }
+
+ protected OrcProto.RowIndex.Builder getRowIndex() {
+ return rowIndex;
+ }
+
+ protected ColumnStatisticsImpl getStripeStatistics() {
+ return stripeColStatistics;
+ }
+
+ protected ColumnStatisticsImpl getFileStatistics() {
+ return fileStatistics;
+ }
+
+ protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
+ return rowIndexEntry;
+ }
+
+ IntegerWriter createIntegerWriter(PositionedOutputStream output,
+ boolean signed, boolean isDirectV2,
+ StreamFactory writer) {
+ if (isDirectV2) {
+ boolean alignedBitpacking = false;
+ if
(writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
+ alignedBitpacking = true;
+ }
+ return new RunLengthIntegerWriterV2(output, signed,
alignedBitpacking);
+ } else {
+ return new RunLengthIntegerWriter(output, signed);
+ }
+ }
+
+ boolean isNewWriteFormat(StreamFactory writer) {
+ return writer.getVersion() != OrcFile.Version.V_0_11;
+ }
+
+ /**
+ * Add a new value to the column.
+ * @param datum
+ * @throws IOException
+ */
+ void write(Datum datum) throws IOException {
+ if (datum != null && datum.isNotNull()) {
+ indexStatistics.increment();
+ } else {
+ indexStatistics.setNull();
+ }
+ if (isPresent != null) {
+ if(datum == null || datum.isNull()) {
+ foundNulls = true;
+ isPresent.write(0);
+ }
+ else {
+ isPresent.write(1);
+ }
+ }
+ }
+
+ void write(Tuple tuple) throws IOException {
+ if (tuple != null) {
+ indexStatistics.increment();
+ } else {
+ indexStatistics.setNull();
+ }
+ if (isPresent != null) {
+ if (tuple == null) {
+ foundNulls = true;
+ isPresent.write(0);
+ } else {
+ isPresent.write(1);
+ }
+ }
+ }
+
+ private void removeIsPresentPositions() {
+ for(int i=0; i < rowIndex.getEntryCount(); ++i) {
+ RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
+ List<Long> positions = entry.getPositionsList();
+ // bit streams use 3 positions if uncompressed, 4 if compressed
+ positions = positions.subList(isCompressed ? 4 : 3,
positions.size());
+ entry.clearPositions();
+ entry.addAllPositions(positions);
+ }
+ }
+
+ /**
+ * Write the stripe out to the file.
+ * @param builder the stripe footer that contains the information
about the
+ * layout of the stripe. The TreeWriter is required to
update
+ * the footer with its information.
+ * @param requiredIndexEntries the number of index entries that are
+ * required. this is to check to make sure
the
+ * row index is well formed.
+ * @throws IOException
+ */
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ if (isPresent != null) {
+ isPresent.flush();
+
+ // if no nulls are found in a stream, then suppress the stream
+ if(!foundNulls) {
+ isPresentOutStream.suppress();
+ // since isPresent bitstream is suppressed, update the index to
+ // remove the positions of the isPresent stream
+ if (rowIndexStream != null) {
+ removeIsPresentPositions();
+ }
+ }
+ }
+
+ // merge stripe-level column statistics to file statistics and write
it to
+ // stripe statistics
+ OrcProto.StripeStatistics.Builder stripeStatsBuilder =
OrcProto.StripeStatistics.newBuilder();
+ writeStripeStatistics(stripeStatsBuilder, this);
+ stripeStatsBuilders.add(stripeStatsBuilder);
+
+ // reset the flag for next stripe
+ foundNulls = false;
+
+ builder.addColumns(getEncoding());
+ if (streamFactory.hasWriterTimeZone()) {
--- End diff --
This timezone policy may be different from those of Tajo. Could you check
the timezone? The following codes would be helpful to this work.
https://github.com/apache/tajo/blob/master/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java#L56
> Add ORCFileAppender to write into ORCFile table
> -----------------------------------------------
>
> Key: TAJO-1465
> URL: https://issues.apache.org/jira/browse/TAJO-1465
> Project: Tajo
> Issue Type: Sub-task
> Components: Storage
> Affects Versions: 0.10.0
> Reporter: Dongjoon Hyun
> Assignee: Jongyoung Park
> Labels: orc
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)