[ 
https://issues.apache.org/jira/browse/TAJO-1465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14642633#comment-14642633
 ] 

ASF GitHub Bot commented on TAJO-1465:
--------------------------------------

Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/652#discussion_r35528682
  
    --- Diff: 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java
 ---
    @@ -0,0 +1,2251 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.storage.thirdparty.orc;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Lists;
    +import com.google.common.primitives.Longs;
    +import com.google.protobuf.ByteString;
    +import com.google.protobuf.CodedOutputStream;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.io.IOConstants;
    +import org.apache.hadoop.hive.serde2.io.DateWritable;
    +import org.apache.hadoop.hive.shims.ShimLoader;
    +import org.apache.tajo.datum.*;
    +import org.apache.tajo.storage.Tuple;
    +import org.apache.tajo.storage.thirdparty.orc.CompressionCodec.Modifier;
    +import org.apache.tajo.storage.thirdparty.orc.OrcProto.RowIndexEntry;
    +import org.apache.tajo.storage.thirdparty.orc.OrcProto.StripeStatistics;
    +import org.apache.tajo.storage.thirdparty.orc.OrcProto.Type;
    +import org.apache.tajo.storage.thirdparty.orc.OrcProto.UserMetadataItem;
    +import org.apache.hadoop.hive.ql.util.JavaDataModel;
    +import org.apache.hadoop.hive.serde2.objectinspector.*;
    +import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
    +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
    +import org.apache.hadoop.io.Text;
    +import org.apache.tajo.util.datetime.DateTimeUtil;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.lang.management.ManagementFactory;
    +import java.nio.ByteBuffer;
    +import java.sql.Timestamp;
    +import java.util.*;
    +
    +import static com.google.common.base.Preconditions.checkArgument;
    +
    +/**
    + * An ORC file writer. The file is divided into stripes, which is the 
natural
    + * unit of work when reading. Each stripe is buffered in memory until the
    + * memory reaches the stripe size and then it is written out broken down by
    + * columns. Each column is written by a TreeWriter that is specific to that
    + * type of column. TreeWriters may have children TreeWriters that handle 
the
    + * sub-types. Each of the TreeWriters writes the column's data as a set of
    + * streams.
    + *
    + * This class is unsynchronized like most Stream objects, so from the 
creation of an OrcFile and all
    + * access to a single instance has to be from a single thread.
    + *
    + * There are no known cases where these happen between different threads 
today.
    + *
    + * Caveat: the MemoryManager is created during WriterOptions create, that 
has to be confined to a single
    + * thread as well.
    + *
    + */
    +public class WriterImpl implements Writer, MemoryManager.Callback {
    +
    +  private static final Log LOG = LogFactory.getLog(WriterImpl.class);
    +
    +  private static final int HDFS_BUFFER_SIZE = 256 * 1024;
    +  private static final int MIN_ROW_INDEX_STRIDE = 1000;
    +
    +  // threshold above which buffer size will be automatically resized
    +  private static final int COLUMN_COUNT_THRESHOLD = 1000;
    +
    +  private final FileSystem fs;
    +  private final Path path;
    +  private final long defaultStripeSize;
    +  private long adjustedStripeSize;
    +  private final int rowIndexStride;
    +  private final CompressionKind compress;
    +  private final CompressionCodec codec;
    +  private final boolean addBlockPadding;
    +  private final int bufferSize;
    +  private final long blockSize;
    +  private final float paddingTolerance;
    +  // the streams that make up the current stripe
    +  private final Map<StreamName, BufferedStream> streams =
    +    new TreeMap<StreamName, BufferedStream>();
    +
    +  private FSDataOutputStream rawWriter = null;
    +  // the compressed metadata information outStream
    +  private OutStream writer = null;
    +  // a protobuf outStream around streamFactory
    +  private CodedOutputStream protobufWriter = null;
    +  private long headerLength;
    +  private int columnCount;
    +  private long rowCount = 0;
    +  private long rowsInStripe = 0;
    +  private long rawDataSize = 0;
    +  private int rowsInIndex = 0;
    +  private int stripesAtLastFlush = -1;
    +  private final List<OrcProto.StripeInformation> stripes =
    +    new ArrayList<OrcProto.StripeInformation>();
    +  private final Map<String, ByteString> userMetadata =
    +    new TreeMap<String, ByteString>();
    +  private final TreeWriter treeWriter;
    +  private final boolean buildIndex;
    +  private final MemoryManager memoryManager;
    +  private final OrcFile.Version version;
    +  private final Configuration conf;
    +  private final OrcFile.WriterCallback callback;
    +  private final OrcFile.WriterContext callbackContext;
    +  private final OrcFile.EncodingStrategy encodingStrategy;
    +  private final OrcFile.CompressionStrategy compressionStrategy;
    +  private final boolean[] bloomFilterColumns;
    +  private final double bloomFilterFpp;
    +  private boolean writeTimeZone;
    +
    +  WriterImpl(FileSystem fs,
    +      Path path,
    +      Configuration conf,
    +      ObjectInspector inspector,
    +      long stripeSize,
    +      CompressionKind compress,
    +      int bufferSize,
    +      int rowIndexStride,
    +      MemoryManager memoryManager,
    +      boolean addBlockPadding,
    +      OrcFile.Version version,
    +      OrcFile.WriterCallback callback,
    +      OrcFile.EncodingStrategy encodingStrategy,
    +      OrcFile.CompressionStrategy compressionStrategy,
    +      float paddingTolerance,
    +      long blockSizeValue,
    +      String bloomFilterColumnNames,
    +      double bloomFilterFpp) throws IOException {
    +    this.fs = fs;
    +    this.path = path;
    +    this.conf = conf;
    +    this.callback = callback;
    +    if (callback != null) {
    +      callbackContext = new OrcFile.WriterContext(){
    +
    +        @Override
    +        public Writer getWriter() {
    +          return WriterImpl.this;
    +        }
    +      };
    +    } else {
    +      callbackContext = null;
    +    }
    +    this.adjustedStripeSize = stripeSize;
    +    this.defaultStripeSize = stripeSize;
    +    this.version = version;
    +    this.encodingStrategy = encodingStrategy;
    +    this.compressionStrategy = compressionStrategy;
    +    this.addBlockPadding = addBlockPadding;
    +    this.blockSize = blockSizeValue;
    +    this.paddingTolerance = paddingTolerance;
    +    this.compress = compress;
    +    this.rowIndexStride = rowIndexStride;
    +    this.memoryManager = memoryManager;
    +    buildIndex = rowIndexStride > 0;
    +    codec = createCodec(compress);
    +    String allColumns = conf.get(IOConstants.COLUMNS);
    +    if (allColumns == null) {
    +      allColumns = getColumnNamesFromInspector(inspector);
    +    }
    +    this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize);
    +    if (version == OrcFile.Version.V_0_11) {
    +      /* do not write bloom filters for ORC v11 */
    +      this.bloomFilterColumns =
    +          OrcUtils.includeColumns(null, allColumns, inspector);
    +    } else {
    +      this.bloomFilterColumns =
    +          OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, 
inspector);
    +    }
    +    this.bloomFilterFpp = bloomFilterFpp;
    +    treeWriter = createTreeWriter(inspector, new StreamFactory(), false);
    +    if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
    +      throw new IllegalArgumentException("Row stride must be at least " +
    +          MIN_ROW_INDEX_STRIDE);
    +    }
    +
    +    // ensure that we are able to handle callbacks before we register 
ourselves
    +    memoryManager.addWriter(path, stripeSize, this);
    +  }
    +
    +  private String getColumnNamesFromInspector(ObjectInspector inspector) {
    +    List<String> fieldNames = Lists.newArrayList();
    +    Joiner joiner = Joiner.on(",");
    +    if (inspector instanceof StructObjectInspector) {
    +      StructObjectInspector soi = (StructObjectInspector) inspector;
    +      List<? extends StructField> fields = soi.getAllStructFieldRefs();
    +      for(StructField sf : fields) {
    +        fieldNames.add(sf.getFieldName());
    +      }
    +    }
    +    return joiner.join(fieldNames);
    +  }
    +
    +  @VisibleForTesting
    +  int getEstimatedBufferSize(int bs) {
    +      return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs);
    +  }
    +
    +  int getEstimatedBufferSize(String colNames, int bs) {
    +    long availableMem = getMemoryAvailableForORC();
    +    if (colNames != null) {
    +      final int numCols = colNames.split(",").length;
    +      if (numCols > COLUMN_COUNT_THRESHOLD) {
    +        // In BufferedStream, there are 3 outstream buffers (compressed,
    +        // uncompressed and overflow) and list of previously compressed 
buffers.
    +        // Since overflow buffer is rarely used, lets consider only 2 
allocation.
    +        // Also, initially, the list of compression buffers will be empty.
    +        final int outStreamBuffers = codec == null ? 1 : 2;
    +
    +        // max possible streams per column is 5. For string columns, there 
is
    +        // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams.
    +        final int maxStreams = 5;
    +
    +        // Lets assume 10% memory for holding dictionary in memory and 
other
    +        // object allocations
    +        final long miscAllocation = (long) (0.1f * availableMem);
    +
    +        // compute the available memory
    +        final long remainingMem = availableMem - miscAllocation;
    +
    +        int estBufferSize = (int) (remainingMem /
    +            (maxStreams * outStreamBuffers * numCols));
    +        estBufferSize = getClosestBufferSize(estBufferSize, bs);
    +        if (estBufferSize > bs) {
    +          estBufferSize = bs;
    +        }
    +
    +        LOG.info("WIDE TABLE - Number of columns: " + numCols +
    +            " Chosen compression buffer size: " + estBufferSize);
    +        return estBufferSize;
    +      }
    +    }
    +    return bs;
    +  }
    +
    +  private int getClosestBufferSize(int estBufferSize, int bs) {
    +    final int kb4 = 4 * 1024;
    +    final int kb8 = 8 * 1024;
    +    final int kb16 = 16 * 1024;
    +    final int kb32 = 32 * 1024;
    +    final int kb64 = 64 * 1024;
    +    final int kb128 = 128 * 1024;
    +    final int kb256 = 256 * 1024;
    +    if (estBufferSize <= kb4) {
    +      return kb4;
    +    } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
    +      return kb8;
    +    } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
    +      return kb16;
    +    } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
    +      return kb32;
    +    } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
    +      return kb64;
    +    } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
    +      return kb128;
    +    } else {
    +      return kb256;
    +    }
    +  }
    +
    +  // the assumption is only one ORC writer open at a time, which holds 
true for
    +  // most of the cases. HIVE-6455 forces single writer case.
    +  private long getMemoryAvailableForORC() {
    +    OrcConf.ConfVars poolVar = OrcConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL;
    +    double maxLoad = conf.getFloat(poolVar.varname, 
poolVar.defaultFloatVal);
    +    long totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
    +        getHeapMemoryUsage().getMax() * maxLoad);
    +    return totalMemoryPool;
    +  }
    +
    +  public static CompressionCodec createCodec(CompressionKind kind) {
    +    switch (kind) {
    +      case NONE:
    +        return null;
    +      case ZLIB:
    +        return new ZlibCodec();
    +      case SNAPPY:
    +        return new SnappyCodec();
    +      case LZO:
    +        try {
    +          Class<? extends CompressionCodec> lzo =
    +              (Class<? extends CompressionCodec>)
    +                  
Class.forName("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
    +          return lzo.newInstance();
    +        } catch (ClassNotFoundException e) {
    +          throw new IllegalArgumentException("LZO is not available.", e);
    +        } catch (InstantiationException e) {
    +          throw new IllegalArgumentException("Problem initializing LZO", 
e);
    +        } catch (IllegalAccessException e) {
    +          throw new IllegalArgumentException("Insufficient access to LZO", 
e);
    +        }
    +      default:
    +        throw new IllegalArgumentException("Unknown compression codec: " +
    +            kind);
    +    }
    +  }
    +
    +  @Override
    +  public boolean checkMemory(double newScale) throws IOException {
    +    long limit = (long) Math.round(adjustedStripeSize * newScale);
    +    long size = estimateStripeSize();
    +    if (LOG.isDebugEnabled()) {
    +      LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
    +                limit);
    +    }
    +    if (size > limit) {
    +      flushStripe();
    +      return true;
    +    }
    +    return false;
    +  }
    +
    +  /**
    +   * This class is used to hold the contents of streams as they are 
buffered.
    +   * The TreeWriters write to the outStream and the codec compresses the
    +   * data as buffers fill up and stores them in the output list. When the
    +   * stripe is being written, the whole stream is written to the file.
    +   */
    +  private class BufferedStream implements OutStream.OutputReceiver {
    +    private final OutStream outStream;
    +    private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
    +
    +    BufferedStream(String name, int bufferSize,
    +                   CompressionCodec codec) throws IOException {
    +      outStream = new OutStream(name, bufferSize, codec, this);
    +    }
    +
    +    /**
    +     * Receive a buffer from the compression codec.
    +     * @param buffer the buffer to save
    +     * @throws IOException
    +     */
    +    @Override
    +    public void output(ByteBuffer buffer) {
    +      output.add(buffer);
    +    }
    +
    +    /**
    +     * Get the number of bytes in buffers that are allocated to this 
stream.
    +     * @return number of bytes in buffers
    +     */
    +    public long getBufferSize() {
    +      long result = 0;
    +      for(ByteBuffer buf: output) {
    +        result += buf.capacity();
    +      }
    +      return outStream.getBufferSize() + result;
    +    }
    +
    +    /**
    +     * Flush the stream to the codec.
    +     * @throws IOException
    +     */
    +    public void flush() throws IOException {
    +      outStream.flush();
    +    }
    +
    +    /**
    +     * Clear all of the buffers.
    +     * @throws IOException
    +     */
    +    public void clear() throws IOException {
    +      outStream.clear();
    +      output.clear();
    +    }
    +
    +    /**
    +     * Check the state of suppress flag in output stream
    +     * @return value of suppress flag
    +     */
    +    public boolean isSuppressed() {
    +      return outStream.isSuppressed();
    +    }
    +
    +    /**
    +     * Get the number of bytes that will be written to the output. Assumes
    +     * the stream has already been flushed.
    +     * @return the number of bytes
    +     */
    +    public long getOutputSize() {
    +      long result = 0;
    +      for(ByteBuffer buffer: output) {
    +        result += buffer.remaining();
    +      }
    +      return result;
    +    }
    +
    +    /**
    +     * Write the saved compressed buffers to the OutputStream.
    +     * @param out the stream to write to
    +     * @throws IOException
    +     */
    +    void spillTo(OutputStream out) throws IOException {
    +      for(ByteBuffer buffer: output) {
    +        out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
    +          buffer.remaining());
    +      }
    +    }
    +
    +    @Override
    +    public String toString() {
    +      return outStream.toString();
    +    }
    +  }
    +
    +  /**
    +   * An output receiver that writes the ByteBuffers to the output stream
    +   * as they are received.
    +   */
    +  private class DirectStream implements OutStream.OutputReceiver {
    +    private final FSDataOutputStream output;
    +
    +    DirectStream(FSDataOutputStream output) {
    +      this.output = output;
    +    }
    +
    +    @Override
    +    public void output(ByteBuffer buffer) throws IOException {
    +      output.write(buffer.array(), buffer.arrayOffset() + 
buffer.position(),
    +        buffer.remaining());
    +    }
    +  }
    +
    +  private static class RowIndexPositionRecorder implements 
PositionRecorder {
    +    private final OrcProto.RowIndexEntry.Builder builder;
    +
    +    RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
    +      this.builder = builder;
    +    }
    +
    +    @Override
    +    public void addPosition(long position) {
    +      builder.addPositions(position);
    +    }
    +  }
    +
    +  /**
    +   * Interface from the Writer to the TreeWriters. This limits the 
visibility
    +   * that the TreeWriters have into the Writer.
    +   */
    +  private class StreamFactory {
    +    /**
    +     * Create a stream to store part of a column.
    +     * @param column the column id for the stream
    +     * @param kind the kind of stream
    +     * @return The output outStream that the section needs to be written 
to.
    +     * @throws IOException
    +     */
    +    public OutStream createStream(int column,
    +                                  OrcProto.Stream.Kind kind
    +                                  ) throws IOException {
    +      final StreamName name = new StreamName(column, kind);
    +      final EnumSet<CompressionCodec.Modifier> modifiers;
    +
    +      switch (kind) {
    +        case BLOOM_FILTER:
    +        case DATA:
    +        case DICTIONARY_DATA:
    +          if (getCompressionStrategy() == 
OrcFile.CompressionStrategy.SPEED) {
    +            modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT);
    +          } else {
    +            modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT);
    +          }
    +          break;
    +        case LENGTH:
    +        case DICTIONARY_COUNT:
    +        case PRESENT:
    +        case ROW_INDEX:
    +        case SECONDARY:
    +          // easily compressed using the fastest modes
    +          modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY);
    +          break;
    +        default:
    +          LOG.warn("Missing ORC compression modifiers for " + kind);
    +          modifiers = null;
    +          break;
    +      }
    +
    +      BufferedStream result = streams.get(name);
    +      if (result == null) {
    +        result = new BufferedStream(name.toString(), bufferSize,
    +            codec == null ? codec : codec.modify(modifiers));
    +        streams.put(name, result);
    +      }
    +      return result.outStream;
    +    }
    +
    +    /**
    +     * Get the next column id.
    +     * @return a number from 0 to the number of columns - 1
    +     */
    +    public int getNextColumnId() {
    +      return columnCount++;
    +    }
    +
    +    /**
    +     * Get the current column id. After creating all tree writers this 
count should tell how many
    +     * columns (including columns within nested complex objects) are 
created in total.
    +     * @return current column id
    +     */
    +    public int getCurrentColumnId() {
    +      return columnCount;
    +    }
    +
    +    /**
    +     * Get the stride rate of the row index.
    +     */
    +    public int getRowIndexStride() {
    +      return rowIndexStride;
    +    }
    +
    +    /**
    +     * Should be building the row index.
    +     * @return true if we are building the index
    +     */
    +    public boolean buildIndex() {
    +      return buildIndex;
    +    }
    +
    +    /**
    +     * Is the ORC file compressed?
    +     * @return are the streams compressed
    +     */
    +    public boolean isCompressed() {
    +      return codec != null;
    +    }
    +
    +    /**
    +     * Get the encoding strategy to use.
    +     * @return encoding strategy
    +     */
    +    public OrcFile.EncodingStrategy getEncodingStrategy() {
    +      return encodingStrategy;
    +    }
    +
    +    /**
    +     * Get the compression strategy to use.
    +     * @return compression strategy
    +     */
    +    public OrcFile.CompressionStrategy getCompressionStrategy() {
    +      return compressionStrategy;
    +    }
    +
    +    /**
    +     * Get the bloom filter columns
    +     * @return bloom filter columns
    +     */
    +    public boolean[] getBloomFilterColumns() {
    +      return bloomFilterColumns;
    +    }
    +
    +    /**
    +     * Get bloom filter false positive percentage.
    +     * @return fpp
    +     */
    +    public double getBloomFilterFPP() {
    +      return bloomFilterFpp;
    +    }
    +
    +    /**
    +     * Get the writer's configuration.
    +     * @return configuration
    +     */
    +    public Configuration getConfiguration() {
    +      return conf;
    +    }
    +
    +    /**
    +     * Get the version of the file to write.
    +     */
    +    public OrcFile.Version getVersion() {
    +      return version;
    +    }
    +
    +    public void useWriterTimeZone(boolean val) {
    +      writeTimeZone = val;
    +    }
    +
    +    public boolean hasWriterTimeZone() {
    +      return writeTimeZone;
    +    }
    +  }
    +
    +  /**
    +   * The parent class of all of the writers for each column. Each column
    +   * is written by an instance of this class. The compound types (struct,
    +   * list, map, and union) have children tree writers that write the 
children
    +   * types.
    +   */
    +  private abstract static class TreeWriter {
    +    protected final int id;
    +    protected final ObjectInspector inspector;
    +    private final BitFieldWriter isPresent;
    +    private final boolean isCompressed;
    +    protected final ColumnStatisticsImpl indexStatistics;
    +    protected final ColumnStatisticsImpl stripeColStatistics;
    +    private final ColumnStatisticsImpl fileStatistics;
    +    protected TreeWriter[] childrenWriters;
    +    protected final RowIndexPositionRecorder rowIndexPosition;
    +    private final OrcProto.RowIndex.Builder rowIndex;
    +    private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
    +    private final PositionedOutputStream rowIndexStream;
    +    private final PositionedOutputStream bloomFilterStream;
    +    protected final BloomFilterIO bloomFilter;
    +    protected final boolean createBloomFilter;
    +    private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
    +    private final OrcProto.BloomFilter.Builder bloomFilterEntry;
    +    private boolean foundNulls;
    +    private OutStream isPresentOutStream;
    +    private final List<StripeStatistics.Builder> stripeStatsBuilders;
    +    private final StreamFactory streamFactory;
    +
    +    /**
    +     * Create a tree writer.
    +     * @param columnId the column id of the column to write
    +     * @param inspector the object inspector to use
    +     * @param streamFactory limited access to the Writer's data.
    +     * @param nullable can the value be null?
    +     * @throws IOException
    +     */
    +    TreeWriter(int columnId, ObjectInspector inspector,
    +               StreamFactory streamFactory,
    +               boolean nullable) throws IOException {
    +      this.streamFactory = streamFactory;
    +      this.isCompressed = streamFactory.isCompressed();
    +      this.id = columnId;
    +      this.inspector = inspector;
    +      if (nullable) {
    +        isPresentOutStream = streamFactory.createStream(id,
    +            OrcProto.Stream.Kind.PRESENT);
    +        isPresent = new BitFieldWriter(isPresentOutStream, 1);
    +      } else {
    +        isPresent = null;
    +      }
    +      this.foundNulls = false;
    +      createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
    +      indexStatistics = ColumnStatisticsImpl.create(inspector);
    +      stripeColStatistics = ColumnStatisticsImpl.create(inspector);
    +      fileStatistics = ColumnStatisticsImpl.create(inspector);
    +      childrenWriters = new TreeWriter[0];
    +      rowIndex = OrcProto.RowIndex.newBuilder();
    +      rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
    +      rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
    +      stripeStatsBuilders = Lists.newArrayList();
    +      if (streamFactory.buildIndex()) {
    +        rowIndexStream = streamFactory.createStream(id, 
OrcProto.Stream.Kind.ROW_INDEX);
    +      } else {
    +        rowIndexStream = null;
    +      }
    +      if (createBloomFilter) {
    +        bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
    +        bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
    +        bloomFilterStream = streamFactory.createStream(id, 
OrcProto.Stream.Kind.BLOOM_FILTER);
    +        bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
    +            streamFactory.getBloomFilterFPP());
    +      } else {
    +        bloomFilterEntry = null;
    +        bloomFilterIndex = null;
    +        bloomFilterStream = null;
    +        bloomFilter = null;
    +      }
    +    }
    +
    +    protected OrcProto.RowIndex.Builder getRowIndex() {
    +      return rowIndex;
    +    }
    +
    +    protected ColumnStatisticsImpl getStripeStatistics() {
    +      return stripeColStatistics;
    +    }
    +
    +    protected ColumnStatisticsImpl getFileStatistics() {
    +      return fileStatistics;
    +    }
    +
    +    protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
    +      return rowIndexEntry;
    +    }
    +
    +    IntegerWriter createIntegerWriter(PositionedOutputStream output,
    +                                      boolean signed, boolean isDirectV2,
    +                                      StreamFactory writer) {
    +      if (isDirectV2) {
    +        boolean alignedBitpacking = false;
    +        if 
(writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
    +          alignedBitpacking = true;
    +        }
    +        return new RunLengthIntegerWriterV2(output, signed, 
alignedBitpacking);
    +      } else {
    +        return new RunLengthIntegerWriter(output, signed);
    +      }
    +    }
    +
    +    boolean isNewWriteFormat(StreamFactory writer) {
    +      return writer.getVersion() != OrcFile.Version.V_0_11;
    +    }
    +
    +    /**
    +     * Add a new value to the column.
    +     * @param datum
    +     * @throws IOException
    +     */
    +    void write(Datum datum) throws IOException {
    +      if (datum != null && datum.isNotNull()) {
    +        indexStatistics.increment();
    +      } else {
    +        indexStatistics.setNull();
    +      }
    +      if (isPresent != null) {
    +        if(datum == null || datum.isNull()) {
    +          foundNulls = true;
    +          isPresent.write(0);
    +        }
    +        else {
    +          isPresent.write(1);
    +        }
    +      }
    +    }
    +
    +    void write(Tuple tuple) throws IOException {
    +      if (tuple != null) {
    +        indexStatistics.increment();
    +      } else {
    +        indexStatistics.setNull();
    +      }
    +      if (isPresent != null) {
    +        if (tuple == null) {
    +          foundNulls = true;
    +          isPresent.write(0);
    +        } else {
    +          isPresent.write(1);
    +        }
    +      }
    +    }
    +
    +    private void removeIsPresentPositions() {
    +      for(int i=0; i < rowIndex.getEntryCount(); ++i) {
    +        RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
    +        List<Long> positions = entry.getPositionsList();
    +        // bit streams use 3 positions if uncompressed, 4 if compressed
    +        positions = positions.subList(isCompressed ? 4 : 3, 
positions.size());
    +        entry.clearPositions();
    +        entry.addAllPositions(positions);
    +      }
    +    }
    +
    +    /**
    +     * Write the stripe out to the file.
    +     * @param builder the stripe footer that contains the information 
about the
    +     *                layout of the stripe. The TreeWriter is required to 
update
    +     *                the footer with its information.
    +     * @param requiredIndexEntries the number of index entries that are
    +     *                             required. this is to check to make sure 
the
    +     *                             row index is well formed.
    +     * @throws IOException
    +     */
    +    void writeStripe(OrcProto.StripeFooter.Builder builder,
    +                     int requiredIndexEntries) throws IOException {
    +      if (isPresent != null) {
    +        isPresent.flush();
    +
    +        // if no nulls are found in a stream, then suppress the stream
    +        if(!foundNulls) {
    +          isPresentOutStream.suppress();
    +          // since isPresent bitstream is suppressed, update the index to
    +          // remove the positions of the isPresent stream
    +          if (rowIndexStream != null) {
    +            removeIsPresentPositions();
    +          }
    +        }
    +      }
    +
    +      // merge stripe-level column statistics to file statistics and write 
it to
    +      // stripe statistics
    +      OrcProto.StripeStatistics.Builder stripeStatsBuilder = 
OrcProto.StripeStatistics.newBuilder();
    +      writeStripeStatistics(stripeStatsBuilder, this);
    +      stripeStatsBuilders.add(stripeStatsBuilder);
    +
    +      // reset the flag for next stripe
    +      foundNulls = false;
    +
    +      builder.addColumns(getEncoding());
    +      if (streamFactory.hasWriterTimeZone()) {
    --- End diff --
    
    This timezone policy may be different from those of Tajo. Could you check 
the timezone? The following codes would be helpful to this work.
    
    
https://github.com/apache/tajo/blob/master/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java#L56


> Add ORCFileAppender to write into ORCFile table
> -----------------------------------------------
>
>                 Key: TAJO-1465
>                 URL: https://issues.apache.org/jira/browse/TAJO-1465
>             Project: Tajo
>          Issue Type: Sub-task
>          Components: Storage
>    Affects Versions: 0.10.0
>            Reporter: Dongjoon Hyun
>            Assignee: Jongyoung Park
>              Labels: orc
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to