[ 
https://issues.apache.org/jira/browse/ORC-435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701145#comment-16701145
 ] 

ASF GitHub Bot commented on ORC-435:
------------------------------------

omalley closed pull request #338: ORC-435: Ability to read stripes that are 
greater than 2GB
URL: https://github.com/apache/orc/pull/338
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/java/core/src/java/org/apache/orc/OrcConf.java 
b/java/core/src/java/org/apache/orc/OrcConf.java
index bdf8c0d2db..23e301e96e 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -158,6 +158,9 @@
       + "HDFS blocks."),
   DIRECT_ENCODING_COLUMNS("orc.column.encoding.direct", 
"orc.column.encoding.direct", "",
       "Comma-separated list of columns for which dictionary encoding is to be 
skipped."),
+  // some JVM doesn't allow array creation of size Integer.MAX_VALUE, so chunk 
size is slightly less than max int
+  ORC_MAX_DISK_RANGE_CHUNK_LIMIT("orc.max.disk.range.chunk.limit", 
"hive.exec.orc.max.disk.range.chunk.limit",
+    Integer.MAX_VALUE - 1024, "When reading stripes >2GB, specify max limit 
for the chunk size.")
   ;
 
   private final String attribute;
@@ -205,6 +208,22 @@ private String lookupValue(Properties tbl, Configuration 
conf) {
     return result;
   }
 
+  public int getInt(Properties tbl, Configuration conf) {
+    String value = lookupValue(tbl, conf);
+    if (value != null) {
+      return Integer.parseInt(value);
+    }
+    return ((Number) defaultValue).intValue();
+  }
+
+  public int getInt(Configuration conf) {
+    return getInt(null, conf);
+  }
+
+  public void getInt(Configuration conf, int value) {
+    conf.setInt(attribute, value);
+  }
+
   public long getLong(Properties tbl, Configuration conf) {
     String value = lookupValue(tbl, conf);
     if (value != null) {
diff --git a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java 
b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
index fbdc145ce1..8420149a98 100644
--- a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
+++ b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
@@ -20,6 +20,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
 
 public final class DataReaderProperties {
 
@@ -29,6 +30,7 @@
   private final boolean zeroCopy;
   private final int typeCount;
   private final int bufferSize;
+  private final int maxDiskRangeChunkLimit;
 
   private DataReaderProperties(Builder builder) {
     this.fileSystem = builder.fileSystem;
@@ -37,6 +39,7 @@ private DataReaderProperties(Builder builder) {
     this.zeroCopy = builder.zeroCopy;
     this.typeCount = builder.typeCount;
     this.bufferSize = builder.bufferSize;
+    this.maxDiskRangeChunkLimit = builder.maxDiskRangeChunkLimit;
   }
 
   public FileSystem getFileSystem() {
@@ -63,6 +66,10 @@ public int getBufferSize() {
     return bufferSize;
   }
 
+  public int getMaxDiskRangeChunkLimit() {
+    return maxDiskRangeChunkLimit;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -75,6 +82,7 @@ public static Builder builder() {
     private boolean zeroCopy;
     private int typeCount;
     private int bufferSize;
+    private int maxDiskRangeChunkLimit = (int) 
OrcConf.ORC_MAX_DISK_RANGE_CHUNK_LIMIT.getDefaultValue();
 
     private Builder() {
 
@@ -110,6 +118,11 @@ public Builder withBufferSize(int value) {
       return this;
     }
 
+    public Builder withMaxDiskRangeChunkLimit(int value) {
+      this.maxDiskRangeChunkLimit = value;
+      return this;
+    }
+
     public DataReaderProperties build() {
       if (fileSystem == null || path == null) {
         throw new NullPointerException("Filesystem = " + fileSystem +
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java 
b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 2b8a6bf46a..3c4342a423 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -99,6 +99,7 @@
   private final DataReader dataReader;
   private final boolean ignoreNonUtf8BloomFilter;
   private final OrcFile.WriterVersion writerVersion;
+  private final int maxDiskRangeChunkLimit;
 
   /**
    * Given a list of column names, find the given column and return the index.
@@ -231,7 +232,7 @@ protected RecordReaderImpl(ReaderImpl fileReader,
         rows += stripe.getNumberOfRows();
       }
     }
-
+    this.maxDiskRangeChunkLimit = 
OrcConf.ORC_MAX_DISK_RANGE_CHUNK_LIMIT.getInt(fileReader.conf);
     Boolean zeroCopy = options.getUseZeroCopy();
     if (zeroCopy == null) {
       zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
@@ -247,6 +248,7 @@ protected RecordReaderImpl(ReaderImpl fileReader,
               .withPath(fileReader.path)
               .withTypeCount(types.size())
               .withZeroCopy(zeroCopy)
+              .withMaxDiskRangeChunkLimit(maxDiskRangeChunkLimit)
               .build());
     }
     firstRow = skippedRows;
@@ -1456,4 +1458,8 @@ public static String encodeTranslatedSargColumn(int 
rootColumn, Integer indexInS
   public CompressionCodec getCompressionCodec() {
     return dataReader.getCompressionCodec();
   }
+
+  public int getMaxDiskRangeChunkLimit() {
+    return maxDiskRangeChunkLimit;
+  }
 }
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java 
b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index a70c9883c2..dcd51e89c5 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -153,6 +153,7 @@ public static DiskRangeList 
planIndexReading(TypeDescription fileSchema,
     private InStream.StreamOptions options = InStream.options();
     private final int typeCount;
     private CompressionKind compressionKind;
+    private final int maxDiskRangeChunkLimit;
 
     private DefaultDataReader(DataReaderProperties properties) {
       this.fs = properties.getFileSystem();
@@ -162,6 +163,7 @@ private DefaultDataReader(DataReaderProperties properties) {
       options.withCodec(OrcCodecPool.getCodec(compressionKind))
           .withBufferSize(properties.getBufferSize());
       this.typeCount = properties.getTypeCount();
+      this.maxDiskRangeChunkLimit = properties.getMaxDiskRangeChunkLimit();
     }
 
     @Override
@@ -206,7 +208,7 @@ public OrcIndex readRowIndex(StripeInformation stripe,
       DiskRangeList ranges = planIndexReading(fileSchema, footer,
           ignoreNonUtf8BloomFilter, included, sargColumns, version,
           bloomFilterKinds);
-      ranges = readDiskRanges(file, zcr, stripe.getOffset(), ranges, false);
+      ranges = readDiskRanges(file, zcr, stripe.getOffset(), ranges, false, 
maxDiskRangeChunkLimit);
       long offset = 0;
       DiskRangeList range = ranges;
       for(OrcProto.Stream stream: footer.getStreamsList()) {
@@ -272,7 +274,7 @@ public OrcIndex readRowIndex(StripeInformation stripe,
     @Override
     public DiskRangeList readFileData(
         DiskRangeList range, long baseOffset, boolean doForceDirect) throws 
IOException {
-      return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, 
doForceDirect);
+      return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, 
doForceDirect, maxDiskRangeChunkLimit);
     }
 
     @Override
@@ -520,8 +522,9 @@ static DiskRangeList readDiskRanges(FSDataInputStream file,
                                       HadoopShims.ZeroCopyReaderShim zcr,
                                  long base,
                                  DiskRangeList range,
-                                 boolean doForceDirect) throws IOException {
-    if (range == null) return null;
+                                 boolean doForceDirect, int maxChunkLimit) 
throws IOException {
+    if (range == null)
+      return null;
     DiskRangeList prev = range.prev;
     if (prev == null) {
       prev = new DiskRangeList.MutateHelper(range);
@@ -531,39 +534,51 @@ static DiskRangeList readDiskRanges(FSDataInputStream 
file,
         range = range.next;
         continue;
       }
-      int len = (int) (range.getEnd() - range.getOffset());
+      boolean hasReplaced = false;
+      long len = range.getEnd() - range.getOffset();
       long off = range.getOffset();
-      if (zcr != null) {
-        file.seek(base + off);
-        boolean hasReplaced = false;
-        while (len > 0) {
-          ByteBuffer partial = zcr.readBuffer(len, false);
-          BufferChunk bc = new BufferChunk(partial, off);
-          if (!hasReplaced) {
-            range.replaceSelfWith(bc);
-            hasReplaced = true;
+      while (len > 0) {
+        BufferChunk bc;
+
+        // Stripe could be too large to read fully into a single buffer and 
will need to be chunked
+        int memChunkSize = (len >= maxChunkLimit) ? maxChunkLimit : (int) len;
+        int read;
+
+        if (!hasReplaced) {
+          file.seek(base + off);
+        }
+
+        // create chunk
+        if (zcr != null) {
+          ByteBuffer partial = zcr.readBuffer(memChunkSize, false);
+          bc = new BufferChunk(partial, off);
+          read = partial.remaining();
+        } else {
+          // Don't use HDFS ByteBuffer API because it has no readFully, and is 
buggy and pointless.
+          byte[] buffer = new byte[memChunkSize];
+          file.readFully((base + off), buffer, 0, buffer.length);
+          ByteBuffer partial;
+          if (doForceDirect) {
+            partial = ByteBuffer.allocateDirect(memChunkSize);
+            partial.put(buffer);
+            partial.position(0);
+            partial.limit(memChunkSize);
           } else {
-            range.insertAfter(bc);
+            partial = ByteBuffer.wrap(buffer);
           }
-          range = bc;
-          int read = partial.remaining();
-          len -= read;
-          off += read;
+          bc = new BufferChunk(partial, off);
+          read = partial.remaining();
         }
-      } else {
-        // Don't use HDFS ByteBuffer API because it has no readFully, and is 
buggy and pointless.
-        byte[] buffer = new byte[len];
-        file.readFully((base + off), buffer, 0, buffer.length);
-        ByteBuffer bb = null;
-        if (doForceDirect) {
-          bb = ByteBuffer.allocateDirect(len);
-          bb.put(buffer);
-          bb.position(0);
-          bb.limit(len);
+
+        if (!hasReplaced) {
+          range.replaceSelfWith(bc);
+          hasReplaced = true;
         } else {
-          bb = ByteBuffer.wrap(buffer);
+          range.insertAfter(bc);
         }
-        range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset()));
+        range = bc;
+        len -= read;
+        off += read;
       }
       range = range.next;
     }
diff --git a/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java 
b/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java
new file mode 100644
index 0000000000..756991ff9a
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2015 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestOrcLargeStripe {
+
+  private static final long MB = 1024 * 1024;
+  private Path workDir = new Path(System.getProperty("test.tmp.dir", "target" 
+ File.separator + "test"
+    + File.separator + "tmp"));
+
+  Configuration conf;
+  FileSystem fs;
+  private Path testFilePath;
+
+  @Rule
+  public TestName testCaseName = new TestName();
+
+  @Before
+  public void openFileSystem() throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    testFilePath = new Path(workDir, "TestOrcFile." + 
testCaseName.getMethodName() + ".orc");
+    fs.delete(testFilePath, false);
+  }
+
+  @Mock
+  private FSDataInputStream mockDataInput;
+
+  private DiskRangeList createRangeList(long... stripeSizes) {
+    DiskRangeList.CreateHelper list = new DiskRangeList.CreateHelper();
+    long prev = 0;
+    for (long stripe : stripeSizes) {
+      list.addOrMerge(prev, stripe, true, true);
+      prev = stripe;
+    }
+    return list.extract();
+  }
+
+  private void verifyDiskRanges(long stripeLength, int expectedChunks) throws 
Exception {
+
+    DiskRangeList rangeList = createRangeList(stripeLength);
+
+    DiskRangeList newList = RecordReaderUtils.readDiskRanges(mockDataInput, 
null, 0, rangeList, false, (int) (2 * MB));
+    assertEquals(expectedChunks, newList.listSize());
+
+    newList = RecordReaderUtils.readDiskRanges(mockDataInput, null, 0, 
rangeList, true, (int) (2 * MB));
+    assertEquals(expectedChunks, newList.listSize());
+
+    HadoopShims.ZeroCopyReaderShim mockZcr = 
mock(HadoopShims.ZeroCopyReaderShim.class);
+    when(mockZcr.readBuffer(anyInt(), 
anyBoolean())).thenReturn(ByteBuffer.allocate((int) (2 * MB)));
+    newList = RecordReaderUtils.readDiskRanges(mockDataInput, mockZcr, 0, 
rangeList, true, (int) (2 * MB));
+    assertEquals(expectedChunks, newList.listSize());
+  }
+
+  @Test
+  public void testStripeSizesBelowAndGreaterThanLimit() throws Exception {
+    verifyDiskRanges(MB, 1);
+    verifyDiskRanges(5 * MB, 3);
+  }
+
+
+  @Test
+  public void testConfigMaxChunkLimit() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    TypeDescription schema = TypeDescription.createTimestamp();
+    testFilePath = new Path(workDir, "TestOrcLargeStripe." +
+      testCaseName.getMethodName() + ".orc");
+    Writer writer = OrcFile.createWriter(testFilePath,
+      
OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000).bufferSize(10000)
+        .version(OrcFile.Version.V_0_11).fileSystem(fs));
+    writer.close();
+
+    try {
+      OrcFile.ReaderOptions opts = OrcFile.readerOptions(conf);
+      Reader reader = OrcFile.createReader(testFilePath, opts);
+      RecordReader recordReader = reader.rows(new Reader.Options().range(0L, 
Long.MAX_VALUE));
+      assertTrue(recordReader instanceof RecordReaderImpl);
+      assertEquals(Integer.MAX_VALUE - 1024, ((RecordReaderImpl) 
recordReader).getMaxDiskRangeChunkLimit());
+
+      conf = new Configuration();
+      conf.setInt(OrcConf.ORC_MAX_DISK_RANGE_CHUNK_LIMIT.getHiveConfName(), 
1000);
+      opts = OrcFile.readerOptions(conf);
+      reader = OrcFile.createReader(testFilePath, opts);
+      recordReader = reader.rows(new Reader.Options().range(0L, 
Long.MAX_VALUE));
+      assertTrue(recordReader instanceof RecordReaderImpl);
+      assertEquals(1000, ((RecordReaderImpl) 
recordReader).getMaxDiskRangeChunkLimit());
+    } finally {
+      fs.delete(testFilePath, false);
+    }
+  }
+
+  // @Test travis cannot run this test as this requires >2GB heap, so 
commenting out
+  public void testStringDirectGreaterThan2GB() throws IOException {
+    TypeDescription schema = TypeDescription.createString();
+
+    conf.setDouble("hive.exec.orc.dictionary.key.size.threshold", 0.0);
+    Writer writer = OrcFile.createWriter(
+      testFilePath,
+      OrcFile.writerOptions(conf).setSchema(schema)
+        .compress(CompressionKind.NONE)
+        .bufferSize(10000));
+    int size = 5000;
+    int width = 500_000;
+    int[] input = new int[size];
+    for (int i = 0; i < size; i++) {
+      input[i] = width;
+    }
+    Random random = new Random(123);
+    byte[] randBytes = new byte[width];
+    VectorizedRowBatch batch = schema.createRowBatch();
+    BytesColumnVector string = (BytesColumnVector) batch.cols[0];
+    for (final int ignored : input) {
+      if (batch.size == batch.getMaxSize()) {
+        writer.addRowBatch(batch);
+        batch.reset();
+      }
+      random.nextBytes(randBytes);
+      string.setVal(batch.size++, randBytes);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    try {
+      Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+      RecordReader rows = reader.rows();
+      batch = reader.getSchema().createRowBatch();
+      int rowsRead = 0;
+      while (rows.nextBatch(batch)) {
+        for (int r = 0; r < batch.size; ++r) {
+          rowsRead++;
+        }
+      }
+      assertEquals(size, rowsRead);
+    } finally {
+      fs.delete(testFilePath, false);
+    }
+  }
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to read stripes that are greater than 2GB
> -------------------------------------------------
>
>                 Key: ORC-435
>                 URL: https://issues.apache.org/jira/browse/ORC-435
>             Project: ORC
>          Issue Type: Bug
>          Components: Reader
>    Affects Versions: 1.3.4, 1.4.4, 1.6.0, 1.5.3
>            Reporter: Prasanth Jayachandran
>            Assignee: Prasanth Jayachandran
>            Priority: Major
>             Fix For: 1.5.4, 1.6.0
>
>
> ORC reader fails with NegativeArraySizeException if the stripe size is >2GB. 
> Even though default stripe size is 64MB there are cases where stripe size 
> will reach >2GB even before memory manager can kick in to check memory size. 
> Say if we are inserting 500KB strings (mostly unique) by the time we reach 
> 5000 rows stripe size is already over 2GB. Reader will have to chunk the disk 
> range reads for such cases instead of reading the stripe as whole blob. 
> Exception thrown when reading such files
> {code:java}
> 2018-10-12 21:43:58,833 WARN [main] org.apache.hadoop.mapred.YarnChild: 
> Exception running child : java.lang.NegativeArraySizeException
>         at 
> org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.readDiskRanges(RecordReaderUtils.java:272)
>         at 
> org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readPartialDataStreams(RecordReaderImpl.java:1007)
>         at 
> org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripe(RecordReaderImpl.java:835)
>         at 
> org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.advanceStripe(RecordReaderImpl.java:1029)
>         at 
> org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.advanceToNextRow(RecordReaderImpl.java:1062)
>         at 
> org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.next(RecordReaderImpl.java:1085){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to