Repository: orc
Updated Branches:
  refs/heads/branch-1.5 55db9ada2 -> eaee12d09


ORC-397. Allow selective disabling of dictionary encoding.
Original patch was by Mithun Radhakrishnan.

Fixes #304

Signed-off-by: Owen O'Malley <omal...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/e8f01a5c
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/e8f01a5c
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/e8f01a5c

Branch: refs/heads/branch-1.5
Commit: e8f01a5c6ff4e170126220fc0fabd2d6ec1d602c
Parents: 55db9ad
Author: Owen O'Malley <omal...@apache.org>
Authored: Tue Aug 28 16:12:05 2018 -0700
Committer: Owen O'Malley <omal...@apache.org>
Committed: Mon Sep 17 13:19:39 2018 -0700

----------------------------------------------------------------------
 java/core/src/java/org/apache/orc/OrcConf.java  |   4 +-
 java/core/src/java/org/apache/orc/OrcFile.java  |  17 +
 .../java/org/apache/orc/impl/WriterImpl.java    |  11 +
 .../orc/impl/writer/StringBaseTreeWriter.java   |   3 +-
 .../apache/orc/impl/writer/WriterContext.java   |   2 +
 .../apache/orc/impl/writer/WriterImplV2.java    | 548 +------------------
 .../org/apache/orc/TestStringDictionary.java    |  83 ++-
 7 files changed, 117 insertions(+), 551 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/java/org/apache/orc/OrcConf.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcConf.java 
b/java/core/src/java/org/apache/orc/OrcConf.java
index d92f776..bdf8c0d 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -155,7 +155,9 @@ public enum OrcConf {
           "A boolean flag to determine if the comparision of field names in 
schema evolution is case sensitive .\n"),
   WRITE_VARIABLE_LENGTH_BLOCKS("orc.write.variable.length.blocks", null, false,
       "A boolean flag as to whether the ORC writer should write variable 
length\n"
-      + "HDFS blocks.")
+      + "HDFS blocks."),
+  DIRECT_ENCODING_COLUMNS("orc.column.encoding.direct", 
"orc.column.encoding.direct", "",
+      "Comma-separated list of columns for which dictionary encoding is to be 
skipped."),
   ;
 
   private final String attribute;

http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/java/org/apache/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java 
b/java/core/src/java/org/apache/orc/OrcFile.java
index b07355a..33aa431 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -407,6 +407,7 @@ public class OrcFile {
     private boolean overwrite;
     private boolean writeVariableLengthBlocks;
     private HadoopShims shims;
+    private String directEncodingColumns;
 
     protected WriterOptions(Properties tableProperties, Configuration conf) {
       configuration = conf;
@@ -449,6 +450,8 @@ public class OrcFile {
       shims = HadoopShimsFactory.get();
       writeVariableLengthBlocks =
           
OrcConf.WRITE_VARIABLE_LENGTH_BLOCKS.getBoolean(tableProperties,conf);
+      directEncodingColumns = OrcConf.DIRECT_ENCODING_COLUMNS.getString(
+          tableProperties, conf);
     }
 
     /**
@@ -687,6 +690,16 @@ public class OrcFile {
       return this;
     }
 
+    /**
+     * Set the comma-separated list of columns that should be direct encoded.
+     * @param value the value to set
+     * @return this
+     */
+    public WriterOptions directEncodingColumns(String value) {
+      directEncodingColumns = value;
+      return this;
+    }
+
     public boolean getBlockPadding() {
       return blockPaddingValue;
     }
@@ -786,6 +799,10 @@ public class OrcFile {
     public boolean getUseUTCTimestamp() {
       return useUTCTimestamp;
     }
+
+    public String getDirectEncodingColumns() {
+      return directEncodingColumns;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java 
b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index d6239f2..827747e 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -35,6 +35,7 @@ import org.apache.orc.ColumnStatistics;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.MemoryManager;
+import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
 import org.apache.orc.OrcUtils;
@@ -113,6 +114,8 @@ public class WriterImpl implements WriterInternal, 
MemoryManager.Callback {
   private final OrcFile.BloomFilterVersion bloomFilterVersion;
   private final boolean writeTimeZone;
   private final boolean useUTCTimeZone;
+  private final double dictionaryKeySizeThreshold;
+  private final boolean[] directEncodingColumns;
 
   public WriterImpl(FileSystem fs,
                     Path path,
@@ -123,6 +126,10 @@ public class WriterImpl implements WriterInternal, 
MemoryManager.Callback {
     this.schema = opts.getSchema();
     this.writerVersion = opts.getWriterVersion();
     bloomFilterVersion = opts.getBloomFilterVersion();
+    this.directEncodingColumns = OrcUtils.includeColumns(
+        opts.getDirectEncodingColumns(), opts.getSchema());
+    dictionaryKeySizeThreshold =
+        OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
     if (callback != null) {
       callbackContext = new OrcFile.WriterContext(){
 
@@ -410,6 +417,10 @@ public class WriterImpl implements WriterInternal, 
MemoryManager.Callback {
     public boolean getUseUTCTimestamp() {
       return useUTCTimeZone;
     }
+
+    public double getDictionaryKeySizeThreshold(int columnId) {
+      return directEncodingColumns[columnId] ? 0.0 : 
dictionaryKeySizeThreshold;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java
----------------------------------------------------------------------
diff --git 
a/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java 
b/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java
index 742c1ed..e7d3259 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java
@@ -76,8 +76,7 @@ public abstract class StringBaseTreeWriter extends 
TreeWriterBase {
     rowIndexValueCount.add(0L);
     buildIndex = writer.buildIndex();
     Configuration conf = writer.getConfiguration();
-    dictionaryKeySizeThreshold =
-        OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
+    dictionaryKeySizeThreshold = 
writer.getDictionaryKeySizeThreshold(columnId);
     strideDictionaryCheck =
         OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
     if (dictionaryKeySizeThreshold <= 0.0) {

http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java 
b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
index 1c8ca1a..9ef3dda 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
@@ -103,4 +103,6 @@ public interface WriterContext {
                           ) throws IOException;
 
     boolean getUseUTCTimestamp();
+
+    double getDictionaryKeySizeThreshold(int column);
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java 
b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
index e1f410c..6d93a34 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
@@ -71,557 +71,17 @@ import java.util.TreeMap;
  * to be confined to a single thread as well.
  *
  */
-public class WriterImplV2 implements WriterInternal, MemoryManager.Callback {
+public class WriterImplV2 extends WriterImpl {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(WriterImplV2.class);
 
-  private static final int MIN_ROW_INDEX_STRIDE = 1000;
-
-  private final Path path;
-  private long adjustedStripeSize;
-  private final int rowIndexStride;
-  private final CompressionKind compress;
-  private int bufferSize;
-  private final TypeDescription schema;
-  private final PhysicalWriter physicalWriter;
-  private final OrcFile.WriterVersion writerVersion;
-
-  private long rowCount = 0;
-  private long rowsInStripe = 0;
-  private long rawDataSize = 0;
-  private int rowsInIndex = 0;
-  private long lastFlushOffset = 0;
-  private int stripesAtLastFlush = -1;
-  private final List<OrcProto.StripeInformation> stripes =
-    new ArrayList<>();
-  private final OrcProto.Metadata.Builder fileMetadata =
-      OrcProto.Metadata.newBuilder();
-  private final Map<String, ByteString> userMetadata =
-    new TreeMap<>();
-  private final TreeWriter treeWriter;
-  private final boolean buildIndex;
-  private final MemoryManager memoryManager;
-  private final OrcFile.Version version;
-  private final Configuration conf;
-  private final OrcFile.WriterCallback callback;
-  private final OrcFile.WriterContext callbackContext;
-  private final OrcFile.EncodingStrategy encodingStrategy;
-  private final OrcFile.CompressionStrategy compressionStrategy;
-  private final boolean[] bloomFilterColumns;
-  private final double bloomFilterFpp;
-  private final OrcFile.BloomFilterVersion bloomFilterVersion;
-  private final boolean writeTimeZone;
-  private final boolean useUTCTimeZone;
-
   public WriterImplV2(FileSystem fs,
                       Path path,
                       OrcFile.WriterOptions opts) throws IOException {
-    this.path = path;
-    this.conf = opts.getConfiguration();
-    this.callback = opts.getCallback();
-    this.schema = opts.getSchema();
-    this.writerVersion = opts.getWriterVersion();
-    bloomFilterVersion = opts.getBloomFilterVersion();
-    if (callback != null) {
-      callbackContext = new OrcFile.WriterContext(){
-
-        @Override
-        public Writer getWriter() {
-          return WriterImplV2.this;
-        }
-      };
-    } else {
-      callbackContext = null;
-    }
-    this.writeTimeZone = hasTimestamp(schema);
-    this.useUTCTimeZone = opts.getUseUTCTimestamp();
-    this.adjustedStripeSize = opts.getStripeSize();
-    this.version = opts.getVersion();
-    this.encodingStrategy = opts.getEncodingStrategy();
-    this.compressionStrategy = opts.getCompressionStrategy();
-    this.compress = opts.getCompress();
-    this.rowIndexStride = opts.getRowIndexStride();
-    this.memoryManager = opts.getMemoryManager();
-    buildIndex = rowIndexStride > 0;
-    int numColumns = schema.getMaximumId() + 1;
-    if (opts.isEnforceBufferSize()) {
-      OutStream.assertBufferSizeValid(opts.getBufferSize());
-      this.bufferSize = opts.getBufferSize();
-    } else {
-      this.bufferSize = WriterImpl.getEstimatedBufferSize(adjustedStripeSize,
-          numColumns, opts.getBufferSize());
-    }
-    if (version == OrcFile.Version.FUTURE) {
-      throw new IllegalArgumentException("Can not write in a unknown 
version.");
-    } else if (version == OrcFile.Version.UNSTABLE_PRE_2_0) {
-      LOG.warn("ORC files written in " + version.getName() + " will not be" +
+    super(fs, path, opts);
+    LOG.warn("ORC files written in " +
+        OrcFile.Version.UNSTABLE_PRE_2_0.getName() + " will not be" +
           " readable by other versions of the software. It is only for" +
           " developer testing.");
-    }
-    if (version == OrcFile.Version.V_0_11) {
-      /* do not write bloom filters for ORC v11 */
-      this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
-    } else {
-      this.bloomFilterColumns =
-          OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
-    }
-    this.bloomFilterFpp = opts.getBloomFilterFpp();
-    this.physicalWriter = opts.getPhysicalWriter() == null ?
-        new PhysicalFsWriter(fs, path, opts) : opts.getPhysicalWriter();
-    physicalWriter.writeHeader();
-    treeWriter = TreeWriter.Factory.create(schema, new StreamFactory(), false);
-    if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
-      throw new IllegalArgumentException("Row stride must be at least " +
-          MIN_ROW_INDEX_STRIDE);
-    }
-
-    // ensure that we are able to handle callbacks before we register ourselves
-    memoryManager.addWriter(path, opts.getStripeSize(), this);
-    LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: 
{}" +
-        " compression: {} bufferSize: {}", path, adjustedStripeSize, 
opts.getBlockSize(),
-        compress, bufferSize);
-  }
-
-  @Override
-  public boolean checkMemory(double newScale) throws IOException {
-    long limit = Math.round(adjustedStripeSize * newScale);
-    long size = treeWriter.estimateMemory();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("ORC writer " + physicalWriter + " size = " + size +
-          " limit = " + limit);
-    }
-    if (size > limit) {
-      flushStripe();
-      return true;
-    }
-    return false;
-  }
-
-
-  CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) {
-    // TODO: modify may create a new codec here. We want to end() it when the 
stream is closed,
-    //       but at this point there's no close() for the stream.
-    CompressionCodec result = physicalWriter.getCompressionCodec();
-    if (result != null) {
-      switch (kind) {
-        case BLOOM_FILTER:
-        case DATA:
-        case DICTIONARY_DATA:
-        case BLOOM_FILTER_UTF8:
-          if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
-            result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
-                CompressionCodec.Modifier.TEXT));
-          } else {
-            result = 
result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
-                CompressionCodec.Modifier.TEXT));
-          }
-          break;
-        case LENGTH:
-        case DICTIONARY_COUNT:
-        case PRESENT:
-        case ROW_INDEX:
-        case SECONDARY:
-          // easily compressed using the fastest modes
-          result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
-              CompressionCodec.Modifier.BINARY));
-          break;
-        default:
-          LOG.info("Missing ORC compression modifiers for " + kind);
-          break;
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public void increaseCompressionSize(int newSize) {
-    if (newSize > bufferSize) {
-      bufferSize = newSize;
-    }
-  }
-
-  /**
-   * Interface from the Writer to the TreeWriters. This limits the visibility
-   * that the TreeWriters have into the Writer.
-   */
-  private class StreamFactory implements WriterContext {
-    /**
-     * Create a stream to store part of a column.
-     * @param column the column id for the stream
-     * @param kind the kind of stream
-     * @return The output outStream that the section needs to be written to.
-     */
-    public OutStream createStream(int column,
-                                  OrcProto.Stream.Kind kind
-                                  ) throws IOException {
-      final StreamName name = new StreamName(column, kind);
-      CompressionCodec codec = getCustomizedCodec(kind);
-
-      return new OutStream(physicalWriter.toString(), bufferSize, codec,
-          physicalWriter.createDataStream(name));
-    }
-
-    /**
-     * Get the stride rate of the row index.
-     */
-    public int getRowIndexStride() {
-      return rowIndexStride;
-    }
-
-    /**
-     * Should be building the row index.
-     * @return true if we are building the index
-     */
-    public boolean buildIndex() {
-      return buildIndex;
-    }
-
-    /**
-     * Is the ORC file compressed?
-     * @return are the streams compressed
-     */
-    public boolean isCompressed() {
-      return physicalWriter.getCompressionCodec() != null;
-    }
-
-    /**
-     * Get the encoding strategy to use.
-     * @return encoding strategy
-     */
-    public OrcFile.EncodingStrategy getEncodingStrategy() {
-      return encodingStrategy;
-    }
-
-    /**
-     * Get the bloom filter columns
-     * @return bloom filter columns
-     */
-    public boolean[] getBloomFilterColumns() {
-      return bloomFilterColumns;
-    }
-
-    /**
-     * Get bloom filter false positive percentage.
-     * @return fpp
-     */
-    public double getBloomFilterFPP() {
-      return bloomFilterFpp;
-    }
-
-    /**
-     * Get the writer's configuration.
-     * @return configuration
-     */
-    public Configuration getConfiguration() {
-      return conf;
-    }
-
-    /**
-     * Get the version of the file to write.
-     */
-    public OrcFile.Version getVersion() {
-      return version;
-    }
-
-    /**
-     * Get the PhysicalWriter.
-     *
-     * @return the file's physical writer.
-     */
-    @Override
-    public PhysicalWriter getPhysicalWriter() {
-      return physicalWriter;
-    }
-
-    public OrcFile.BloomFilterVersion getBloomFilterVersion() {
-      return bloomFilterVersion;
-    }
-
-    public void writeIndex(StreamName name,
-                           OrcProto.RowIndex.Builder index) throws IOException 
{
-      physicalWriter.writeIndex(name, index, 
getCustomizedCodec(name.getKind()));
-    }
-
-    public void writeBloomFilter(StreamName name,
-                                 OrcProto.BloomFilterIndex.Builder bloom
-                                 ) throws IOException {
-      physicalWriter.writeBloomFilter(name, bloom,
-          getCustomizedCodec(name.getKind()));
-    }
-
-    public boolean getUseUTCTimestamp() {
-      return useUTCTimeZone;
-    }
-  }
-
-
-  private static void writeTypes(OrcProto.Footer.Builder builder,
-                                 TypeDescription schema) {
-    builder.addAllTypes(OrcUtils.getOrcTypes(schema));
-  }
-
-  private void createRowIndexEntry() throws IOException {
-    treeWriter.createRowIndexEntry();
-    rowsInIndex = 0;
-  }
-
-  private void flushStripe() throws IOException {
-    if (buildIndex && rowsInIndex != 0) {
-      createRowIndexEntry();
-    }
-    if (rowsInStripe != 0) {
-      if (callback != null) {
-        callback.preStripeWrite(callbackContext);
-      }
-      // finalize the data for the stripe
-      int requiredIndexEntries = rowIndexStride == 0 ? 0 :
-          (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
-      OrcProto.StripeFooter.Builder builder =
-          OrcProto.StripeFooter.newBuilder();
-      if (writeTimeZone) {
-        if (useUTCTimeZone) {
-          builder.setWriterTimezone("UTC");
-        } else {
-          builder.setWriterTimezone(TimeZone.getDefault().getID());
-        }
-      }
-      OrcProto.StripeStatistics.Builder stats =
-          OrcProto.StripeStatistics.newBuilder();
-
-      treeWriter.flushStreams();
-      treeWriter.writeStripe(builder, stats, requiredIndexEntries);
-
-      OrcProto.StripeInformation.Builder dirEntry =
-          OrcProto.StripeInformation.newBuilder()
-              .setNumberOfRows(rowsInStripe);
-      physicalWriter.finalizeStripe(builder, dirEntry);
-
-      fileMetadata.addStripeStats(stats.build());
-      stripes.add(dirEntry.build());
-      rowCount += rowsInStripe;
-      rowsInStripe = 0;
-    }
-  }
-
-  private long computeRawDataSize() {
-    return treeWriter.getRawDataSize();
-  }
-
-  private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
-    switch (kind) {
-      case NONE: return OrcProto.CompressionKind.NONE;
-      case ZLIB: return OrcProto.CompressionKind.ZLIB;
-      case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
-      case LZO: return OrcProto.CompressionKind.LZO;
-      case LZ4: return OrcProto.CompressionKind.LZ4;
-      default:
-        throw new IllegalArgumentException("Unknown compression " + kind);
-    }
-  }
-
-  private void writeFileStatistics(OrcProto.Footer.Builder builder,
-                                   TreeWriter writer) throws IOException {
-    writer.writeFileStatistics(builder);
-  }
-
-  private void writeMetadata() throws IOException {
-    physicalWriter.writeFileMetadata(fileMetadata);
-  }
-
-  private long writePostScript() throws IOException {
-    OrcProto.PostScript.Builder builder =
-        OrcProto.PostScript.newBuilder()
-            .setCompression(writeCompressionKind(compress))
-            .setMagic(OrcFile.MAGIC)
-            .addVersion(version.getMajor())
-            .addVersion(version.getMinor())
-            .setWriterVersion(writerVersion.getId());
-    if (compress != CompressionKind.NONE) {
-      builder.setCompressionBlockSize(bufferSize);
-    }
-    return physicalWriter.writePostScript(builder);
-  }
-
-  private long writeFooter() throws IOException {
-    writeMetadata();
-    OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder();
-    builder.setNumberOfRows(rowCount);
-    builder.setRowIndexStride(rowIndexStride);
-    rawDataSize = computeRawDataSize();
-    // serialize the types
-    writeTypes(builder, schema);
-    // add the stripe information
-    for(OrcProto.StripeInformation stripe: stripes) {
-      builder.addStripes(stripe);
-    }
-    // add the column statistics
-    writeFileStatistics(builder, treeWriter);
-    // add all of the user metadata
-    for(Map.Entry<String, ByteString> entry: userMetadata.entrySet()) {
-      builder.addMetadata(OrcProto.UserMetadataItem.newBuilder()
-        .setName(entry.getKey()).setValue(entry.getValue()));
-    }
-    builder.setWriter(OrcFile.WriterImplementation.ORC_JAVA.getId());
-    physicalWriter.writeFileFooter(builder);
-    return writePostScript();
-  }
-
-  @Override
-  public TypeDescription getSchema() {
-    return schema;
-  }
-
-  @Override
-  public void addUserMetadata(String name, ByteBuffer value) {
-    userMetadata.put(name, ByteString.copyFrom(value));
-  }
-
-  @Override
-  public void addRowBatch(VectorizedRowBatch batch) throws IOException {
-    if (buildIndex) {
-      // Batch the writes up to the rowIndexStride so that we can get the
-      // right size indexes.
-      int posn = 0;
-      while (posn < batch.size) {
-        int chunkSize = Math.min(batch.size - posn,
-            rowIndexStride - rowsInIndex);
-        treeWriter.writeRootBatch(batch, posn, chunkSize);
-        posn += chunkSize;
-        rowsInIndex += chunkSize;
-        rowsInStripe += chunkSize;
-        if (rowsInIndex >= rowIndexStride) {
-          createRowIndexEntry();
-        }
-      }
-    } else {
-      rowsInStripe += batch.size;
-      treeWriter.writeRootBatch(batch, 0, batch.size);
-    }
-    memoryManager.addedRow(batch.size);
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (callback != null) {
-      callback.preFooterWrite(callbackContext);
-    }
-    // remove us from the memory manager so that we don't get any callbacks
-    memoryManager.removeWriter(path);
-    // actually close the file
-    flushStripe();
-    lastFlushOffset = writeFooter();
-    physicalWriter.close();
-  }
-
-  /**
-   * Raw data size will be compute when writing the file footer. Hence raw data
-   * size value will be available only after closing the writer.
-   */
-  @Override
-  public long getRawDataSize() {
-    return rawDataSize;
-  }
-
-  /**
-   * Row count gets updated when flushing the stripes. To get accurate row
-   * count call this method after writer is closed.
-   */
-  @Override
-  public long getNumberOfRows() {
-    return rowCount;
-  }
-
-  @Override
-  public long writeIntermediateFooter() throws IOException {
-    // flush any buffered rows
-    flushStripe();
-    // write a footer
-    if (stripesAtLastFlush != stripes.size()) {
-      if (callback != null) {
-        callback.preFooterWrite(callbackContext);
-      }
-      lastFlushOffset = writeFooter();
-      stripesAtLastFlush = stripes.size();
-      physicalWriter.flush();
-    }
-    return lastFlushOffset;
-  }
-
-  static void checkArgument(boolean expression, String message) {
-    if (!expression) {
-      throw new IllegalArgumentException(message);
-    }
-  }
-
-  @Override
-  public void appendStripe(byte[] stripe, int offset, int length,
-      StripeInformation stripeInfo,
-      OrcProto.StripeStatistics stripeStatistics) throws IOException {
-    checkArgument(stripe != null, "Stripe must not be null");
-    checkArgument(length <= stripe.length,
-        "Specified length must not be greater specified array length");
-    checkArgument(stripeInfo != null, "Stripe information must not be null");
-    checkArgument(stripeStatistics != null,
-        "Stripe statistics must not be null");
-
-    rowsInStripe = stripeInfo.getNumberOfRows();
-    // update stripe information
-    OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation
-        .newBuilder()
-        .setNumberOfRows(rowsInStripe)
-        .setIndexLength(stripeInfo.getIndexLength())
-        .setDataLength(stripeInfo.getDataLength())
-        .setFooterLength(stripeInfo.getFooterLength());
-    physicalWriter.appendRawStripe(ByteBuffer.wrap(stripe, offset, length),
-        dirEntry);
-
-    // since we have already written the stripe, just update stripe statistics
-    treeWriter.updateFileStatistics(stripeStatistics);
-    fileMetadata.addStripeStats(stripeStatistics);
-
-    stripes.add(dirEntry.build());
-
-    // reset it after writing the stripe
-    rowCount += rowsInStripe;
-    rowsInStripe = 0;
-  }
-
-  @Override
-  public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata) 
{
-    if (userMetadata != null) {
-      for (OrcProto.UserMetadataItem item : userMetadata) {
-        this.userMetadata.put(item.getName(), item.getValue());
-      }
-    }
-  }
-
-  @Override
-  public ColumnStatistics[] getStatistics()
-      throws IOException {
-    // Generate the stats
-    OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder();
-
-    // add the column statistics
-    writeFileStatistics(builder, treeWriter);
-    return ReaderImpl.deserializeStats(schema, builder.getStatisticsList());
-  }
-
-  public CompressionCodec getCompressionCodec() {
-    return physicalWriter.getCompressionCodec();
-  }
-
-  private static boolean hasTimestamp(TypeDescription schema) {
-    if (schema.getCategory() == TypeDescription.Category.TIMESTAMP) {
-      return true;
-    }
-    List<TypeDescription> children = schema.getChildren();
-    if (children != null) {
-      for (TypeDescription child : children) {
-        if (hasTimestamp(child)) {
-          return true;
-        }
-      }
-    }
-    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/test/org/apache/orc/TestStringDictionary.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestStringDictionary.java 
b/java/core/src/test/org/apache/orc/TestStringDictionary.java
index dbd615a..cc4f8d8 100644
--- a/java/core/src/test/org/apache/orc/TestStringDictionary.java
+++ b/java/core/src/test/org/apache/orc/TestStringDictionary.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
@@ -35,14 +34,12 @@ import 
org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 
 import org.apache.orc.impl.OutStream;
 import org.apache.orc.impl.RecordReaderImpl;
-import org.apache.orc.impl.RunLengthIntegerWriter;
 import org.apache.orc.impl.StreamName;
 import org.apache.orc.impl.TestInStream;
-import org.apache.orc.impl.WriterImpl;
 import org.apache.orc.impl.writer.StringTreeWriter;
 import org.apache.orc.impl.writer.TreeWriter;
 import org.apache.orc.impl.writer.WriterContext;
-import org.apache.orc.impl.writer.WriterImplV2;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -245,6 +242,11 @@ public class TestStringDictionary {
     public boolean getUseUTCTimestamp() {
       return true;
     }
+
+    @Override
+    public double getDictionaryKeySizeThreshold(int column) {
+      return OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
+    }
   }
 
   @Test
@@ -409,4 +411,77 @@ public class TestStringDictionary {
 
   }
 
+  /**
+   * Test that dictionaries can be disabled, per column. In this test, we want 
to disable DICTIONARY_V2 for the
+   * `longString` column (presumably for a low hit-ratio), while preserving 
DICTIONARY_V2 for `shortString`.
+   * @throws Exception on unexpected failure
+   */
+  @Test
+  public void testDisableDictionaryForSpecificColumn() throws Exception {
+    final String SHORT_STRING_VALUE = "foo";
+    final String  LONG_STRING_VALUE = "BAAAAAAAAR!!";
+
+    TypeDescription schema =
+        
TypeDescription.fromString("struct<shortString:string,longString:string>");
+
+    Writer writer = OrcFile.createWriter(
+        testFilePath,
+        OrcFile.writerOptions(conf).setSchema(schema)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .directEncodingColumns("longString"));
+
+    VectorizedRowBatch batch = schema.createRowBatch();
+    BytesColumnVector shortStringColumnVector = (BytesColumnVector) 
batch.cols[0];
+    BytesColumnVector longStringColumnVector  = (BytesColumnVector) 
batch.cols[1];
+
+    for (int i = 0; i < 20000; i++) {
+      if (batch.size == batch.getMaxSize()) {
+        writer.addRowBatch(batch);
+        batch.reset();
+      }
+      shortStringColumnVector.setVal(batch.size, 
SHORT_STRING_VALUE.getBytes());
+      longStringColumnVector.setVal( batch.size, LONG_STRING_VALUE.getBytes());
+      ++batch.size;
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath, 
OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader recordReader = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    shortStringColumnVector = (BytesColumnVector) batch.cols[0];
+    longStringColumnVector  = (BytesColumnVector) batch.cols[1];
+    while (recordReader.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(SHORT_STRING_VALUE, shortStringColumnVector.toString(r));
+        assertEquals(LONG_STRING_VALUE,   longStringColumnVector.toString(r));
+      }
+    }
+
+    // make sure the encoding type is correct
+    for (StripeInformation stripe : reader.getStripes()) {
+      // hacky but does the job, this casting will work as long this test 
resides
+      // within the same package as ORC reader
+      OrcProto.StripeFooter footer = ((RecordReaderImpl) 
recordReader).readStripeFooter(stripe);
+      for (int i = 0; i < footer.getColumnsCount(); ++i) {
+        Assert.assertEquals(
+            "Expected 3 columns in the footer: One for the Orc Struct, and two 
for its members.",
+            3, footer.getColumnsCount());
+        Assert.assertEquals(
+            "The ORC schema struct should be DIRECT encoded.",
+            OrcProto.ColumnEncoding.Kind.DIRECT, footer.getColumns(0).getKind()
+        );
+        Assert.assertEquals(
+            "The shortString column must be DICTIONARY_V2 encoded",
+            OrcProto.ColumnEncoding.Kind.DICTIONARY_V2, 
footer.getColumns(1).getKind()
+        );
+        Assert.assertEquals(
+            "The longString column must be DIRECT_V2 encoded",
+            OrcProto.ColumnEncoding.Kind.DIRECT_V2, 
footer.getColumns(2).getKind()
+        );
+      }
+    }
+  }
+
 }

Reply via email to