Repository: spark
Updated Branches:
  refs/heads/master 7e7350285 -> cac9b1dea


[SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.

## What changes were proposed in this pull request?

This updates Parquet to 1.10.0 and updates the vectorized path for buffer 
management changes. Parquet 1.10.0 uses ByteBufferInputStream instead of byte 
arrays in encoders. This allows Parquet to break allocations into smaller 
chunks that are better for garbage collection.

## How was this patch tested?

Existing Parquet tests. Running in production at Netflix for about 3 months.

Author: Ryan Blue <b...@apache.org>

Closes #21070 from rdblue/SPARK-23972-update-parquet-to-1.10.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cac9b1de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cac9b1de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cac9b1de

Branch: refs/heads/master
Commit: cac9b1dea1bb44fa42abf77829c05bf93f70cf20
Parents: 7e73502
Author: Ryan Blue <b...@apache.org>
Authored: Wed May 9 12:27:32 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed May 9 12:27:32 2018 +0800

----------------------------------------------------------------------
 dev/deps/spark-deps-hadoop-2.6                  |  12 +-
 dev/deps/spark-deps-hadoop-2.7                  |  12 +-
 dev/deps/spark-deps-hadoop-3.1                  |  12 +-
 docs/sql-programming-guide.md                   |   2 +-
 pom.xml                                         |   8 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   2 +-
 .../SpecificParquetRecordReaderBase.java        |   2 +-
 .../parquet/VectorizedColumnReader.java         |  39 +++--
 .../parquet/VectorizedPlainValuesReader.java    | 166 ++++++++++++-------
 .../parquet/VectorizedRleValuesReader.java      | 163 +++++++++---------
 .../datasources/parquet/ParquetOptions.scala    |   5 +-
 .../results/describe-part-after-analyze.sql.out |  12 +-
 .../columnar/InMemoryColumnarQuerySuite.scala   |   4 +-
 13 files changed, 241 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cac9b1de/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index c3d1dd4..f479c13 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -162,13 +162,13 @@ orc-mapreduce-1.4.3-nohive.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.8.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
+parquet-column-1.10.0.jar
+parquet-common-1.10.0.jar
+parquet-encoding-1.10.0.jar
+parquet-format-2.4.0.jar
+parquet-hadoop-1.10.0.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
+parquet-jackson-1.10.0.jar
 protobuf-java-2.5.0.jar
 py4j-0.10.6.jar
 pyrolite-4.13.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/cac9b1de/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 2908670..e7c4599 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -163,13 +163,13 @@ orc-mapreduce-1.4.3-nohive.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.8.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
+parquet-column-1.10.0.jar
+parquet-common-1.10.0.jar
+parquet-encoding-1.10.0.jar
+parquet-format-2.4.0.jar
+parquet-hadoop-1.10.0.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
+parquet-jackson-1.10.0.jar
 protobuf-java-2.5.0.jar
 py4j-0.10.6.jar
 pyrolite-4.13.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/cac9b1de/dev/deps/spark-deps-hadoop-3.1
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1
index 97ad65a..3447cd7 100644
--- a/dev/deps/spark-deps-hadoop-3.1
+++ b/dev/deps/spark-deps-hadoop-3.1
@@ -181,13 +181,13 @@ orc-mapreduce-1.4.3-nohive.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.8.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
+parquet-column-1.10.0.jar
+parquet-common-1.10.0.jar
+parquet-encoding-1.10.0.jar
+parquet-format-2.4.0.jar
+parquet-hadoop-1.10.0.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
+parquet-jackson-1.10.0.jar
 protobuf-java-2.5.0.jar
 py4j-0.10.6.jar
 pyrolite-4.13.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/cac9b1de/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index c521f3c..3e8946e 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -964,7 +964,7 @@ Configuration of Parquet can be done using the `setConf` 
method on `SparkSession
     Sets the compression codec used when writing Parquet files. If either 
`compression` or
     `parquet.compression` is specified in the table-specific 
options/properties, the precedence would be
     `compression`, `parquet.compression`, 
`spark.sql.parquet.compression.codec`. Acceptable values include:
-    none, uncompressed, snappy, gzip, lzo.
+    none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/cac9b1de/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 88e77ff..6e37e51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,7 +129,7 @@
     <!-- Version used for internal directory structure -->
     <hive.version.short>1.2.1</hive.version.short>
     <derby.version>10.12.1.1</derby.version>
-    <parquet.version>1.8.2</parquet.version>
+    <parquet.version>1.10.0</parquet.version>
     <orc.version>1.4.3</orc.version>
     <orc.classifier>nohive</orc.classifier>
     <hive.parquet.version>1.6.0</hive.parquet.version>
@@ -1778,6 +1778,12 @@
         <artifactId>parquet-hadoop</artifactId>
         <version>${parquet.version}</version>
         <scope>${parquet.deps.scope}</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>commons-pool</groupId>
+            <artifactId>commons-pool</artifactId>
+          </exclusion>
+        </exclusions>
       </dependency>
       <dependency>
         <groupId>org.apache.parquet</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/cac9b1de/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 895e150..b00edca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -345,7 +345,7 @@ object SQLConf {
       "snappy, gzip, lzo.")
     .stringConf
     .transform(_.toLowerCase(Locale.ROOT))
-    .checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
+    .checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", "lz4", 
"brotli", "zstd"))
     .createWithDefault("snappy")
 
   val PARQUET_FILTER_PUSHDOWN_ENABLED = 
buildConf("spark.sql.parquet.filterPushdown")

http://git-wip-us.apache.org/repos/asf/spark/blob/cac9b1de/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index e65cd25..10d6ed8 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -293,7 +293,7 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
       return new RLEIntIterator(
           new RunLengthBitPackingHybridDecoder(
               BytesUtils.getWidthFromMaxInt(maxLevel),
-              new ByteArrayInputStream(bytes.toByteArray())));
+              bytes.toInputStream()));
     } catch (IOException e) {
       throw new IOException("could not read levels in page for col " + 
descriptor, e);
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/cac9b1de/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 72f1d02..d5969b5 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.TimeZone;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
@@ -388,7 +390,8 @@ public class VectorizedColumnReader {
    * is guaranteed that num is smaller than the number of values left in the 
current page.
    */
 
-  private void readBooleanBatch(int rowId, int num, WritableColumnVector 
column) {
+  private void readBooleanBatch(int rowId, int num, WritableColumnVector 
column)
+      throws IOException {
     if (column.dataType() != DataTypes.BooleanType) {
       throw constructConvertNotSupportedException(descriptor, column);
     }
@@ -396,7 +399,7 @@ public class VectorizedColumnReader {
         num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
   }
 
-  private void readIntBatch(int rowId, int num, WritableColumnVector column) {
+  private void readIntBatch(int rowId, int num, WritableColumnVector column) 
throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: implement remaining type conversions
     if (column.dataType() == DataTypes.IntegerType || column.dataType() == 
DataTypes.DateType ||
@@ -414,7 +417,7 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readLongBatch(int rowId, int num, WritableColumnVector column) {
+  private void readLongBatch(int rowId, int num, WritableColumnVector column) 
throws IOException {
     // This is where we implement support for the valid type conversions.
     if (column.dataType() == DataTypes.LongType ||
         DecimalType.is64BitDecimalType(column.dataType()) ||
@@ -434,7 +437,7 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readFloatBatch(int rowId, int num, WritableColumnVector column) 
{
+  private void readFloatBatch(int rowId, int num, WritableColumnVector column) 
throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: support implicit cast to double?
     if (column.dataType() == DataTypes.FloatType) {
@@ -445,7 +448,7 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readDoubleBatch(int rowId, int num, WritableColumnVector 
column) {
+  private void readDoubleBatch(int rowId, int num, WritableColumnVector 
column) throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: implement remaining type conversions
     if (column.dataType() == DataTypes.DoubleType) {
@@ -456,7 +459,7 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readBinaryBatch(int rowId, int num, WritableColumnVector 
column) {
+  private void readBinaryBatch(int rowId, int num, WritableColumnVector 
column) throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: implement remaining type conversions
     VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
@@ -556,7 +559,7 @@ public class VectorizedColumnReader {
     });
   }
 
-  private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) 
throws IOException {
+  private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) 
throws IOException {
     this.endOfPageValueCount = valuesRead + pageValueCount;
     if (dataEncoding.usesDictionary()) {
       this.dataColumn = null;
@@ -581,7 +584,7 @@ public class VectorizedColumnReader {
     }
 
     try {
-      dataColumn.initFromPage(pageValueCount, bytes, offset);
+      dataColumn.initFromPage(pageValueCount, in);
     } catch (IOException e) {
       throw new IOException("could not read page in col " + descriptor, e);
     }
@@ -602,12 +605,11 @@ public class VectorizedColumnReader {
     this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
     this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
     try {
-      byte[] bytes = page.getBytes().toByteArray();
-      rlReader.initFromPage(pageValueCount, bytes, 0);
-      int next = rlReader.getNextOffset();
-      dlReader.initFromPage(pageValueCount, bytes, next);
-      next = dlReader.getNextOffset();
-      initDataReader(page.getValueEncoding(), bytes, next);
+      BytesInput bytes = page.getBytes();
+      ByteBufferInputStream in = bytes.toInputStream();
+      rlReader.initFromPage(pageValueCount, in);
+      dlReader.initFromPage(pageValueCount, in);
+      initDataReader(page.getValueEncoding(), in);
     } catch (IOException e) {
       throw new IOException("could not read page " + page + " in col " + 
descriptor, e);
     }
@@ -619,12 +621,13 @@ public class VectorizedColumnReader {
         page.getRepetitionLevels(), descriptor);
 
     int bitWidth = 
BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
-    this.defColumn = new VectorizedRleValuesReader(bitWidth);
+    // do not read the length from the stream. v2 pages handle dividing the 
page bytes.
+    this.defColumn = new VectorizedRleValuesReader(bitWidth, false);
     this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
-    this.defColumn.initFromBuffer(
-        this.pageValueCount, page.getDefinitionLevels().toByteArray());
+    this.defColumn.initFromPage(
+        this.pageValueCount, page.getDefinitionLevels().toInputStream());
     try {
-      initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
+      initDataReader(page.getDataEncoding(), page.getData().toInputStream());
     } catch (IOException e) {
       throw new IOException("could not read page " + page + " in col " + 
descriptor, e);
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/cac9b1de/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 5b75f71..aacefac 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -20,34 +20,30 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
-import org.apache.spark.unsafe.Platform;
 
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.api.Binary;
+import org.apache.spark.unsafe.Platform;
 
 /**
  * An implementation of the Parquet PLAIN decoder that supports the vectorized 
interface.
  */
 public class VectorizedPlainValuesReader extends ValuesReader implements 
VectorizedValuesReader {
-  private byte[] buffer;
-  private int offset;
-  private int bitOffset; // Only used for booleans.
-  private ByteBuffer byteBuffer; // used to wrap the byte array buffer
+  private ByteBufferInputStream in = null;
 
-  private static final boolean bigEndianPlatform =
-    ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
+  // Only used for booleans.
+  private int bitOffset;
+  private byte currentByte = 0;
 
   public VectorizedPlainValuesReader() {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] bytes, int offset) throws 
IOException {
-    this.buffer = bytes;
-    this.offset = offset + Platform.BYTE_ARRAY_OFFSET;
-    if (bigEndianPlatform) {
-      byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
-    }
+  public void initFromPage(int valueCount, ByteBufferInputStream in) throws 
IOException {
+    this.in = in;
   }
 
   @Override
@@ -63,115 +59,157 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
     }
   }
 
+  private ByteBuffer getBuffer(int length) {
+    try {
+      return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+    }
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int rowId) 
{
-    c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-    offset += 4 * total;
+    int requiredBytes = total * 4;
+    ByteBuffer buffer = getBuffer(requiredBytes);
+
+    if (buffer.hasArray()) {
+      int offset = buffer.arrayOffset() + buffer.position();
+      c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
+    } else {
+      for (int i = 0; i < total; i += 1) {
+        c.putInt(rowId + i, buffer.getInt());
+      }
+    }
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int rowId) {
-    c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-    offset += 8 * total;
+    int requiredBytes = total * 8;
+    ByteBuffer buffer = getBuffer(requiredBytes);
+
+    if (buffer.hasArray()) {
+      int offset = buffer.arrayOffset() + buffer.position();
+      c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
+    } else {
+      for (int i = 0; i < total; i += 1) {
+        c.putLong(rowId + i, buffer.getLong());
+      }
+    }
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int rowId) {
-    c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-    offset += 4 * total;
+    int requiredBytes = total * 4;
+    ByteBuffer buffer = getBuffer(requiredBytes);
+
+    if (buffer.hasArray()) {
+      int offset = buffer.arrayOffset() + buffer.position();
+      c.putFloats(rowId, total, buffer.array(), offset);
+    } else {
+      for (int i = 0; i < total; i += 1) {
+        c.putFloat(rowId + i, buffer.getFloat());
+      }
+    }
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int rowId) {
-    c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-    offset += 8 * total;
+    int requiredBytes = total * 8;
+    ByteBuffer buffer = getBuffer(requiredBytes);
+
+    if (buffer.hasArray()) {
+      int offset = buffer.arrayOffset() + buffer.position();
+      c.putDoubles(rowId, total, buffer.array(), offset);
+    } else {
+      for (int i = 0; i < total; i += 1) {
+        c.putDouble(rowId + i, buffer.getDouble());
+      }
+    }
   }
 
   @Override
   public final void readBytes(int total, WritableColumnVector c, int rowId) {
-    for (int i = 0; i < total; i++) {
-      // Bytes are stored as a 4-byte little endian int. Just read the first 
byte.
-      // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
-      c.putByte(rowId + i, Platform.getByte(buffer, offset));
-      offset += 4;
+    // Bytes are stored as a 4-byte little endian int. Just read the first 
byte.
+    // TODO: consider pushing this in ColumnVector by adding a readBytes with 
a stride.
+    int requiredBytes = total * 4;
+    ByteBuffer buffer = getBuffer(requiredBytes);
+
+    for (int i = 0; i < total; i += 1) {
+      c.putByte(rowId + i, buffer.get());
+      // skip the next 3 bytes
+      buffer.position(buffer.position() + 3);
     }
   }
 
   @Override
   public final boolean readBoolean() {
-    byte b = Platform.getByte(buffer, offset);
-    boolean v = (b & (1 << bitOffset)) != 0;
+    // TODO: vectorize decoding and keep boolean[] instead of currentByte
+    if (bitOffset == 0) {
+      try {
+        currentByte = (byte) in.read();
+      } catch (IOException e) {
+        throw new ParquetDecodingException("Failed to read a byte", e);
+      }
+    }
+
+    boolean v = (currentByte & (1 << bitOffset)) != 0;
     bitOffset += 1;
     if (bitOffset == 8) {
       bitOffset = 0;
-      offset++;
     }
     return v;
   }
 
   @Override
   public final int readInteger() {
-    int v = Platform.getInt(buffer, offset);
-    if (bigEndianPlatform) {
-      v = java.lang.Integer.reverseBytes(v);
-    }
-    offset += 4;
-    return v;
+    return getBuffer(4).getInt();
   }
 
   @Override
   public final long readLong() {
-    long v = Platform.getLong(buffer, offset);
-    if (bigEndianPlatform) {
-      v = java.lang.Long.reverseBytes(v);
-    }
-    offset += 8;
-    return v;
+    return getBuffer(8).getLong();
   }
 
   @Override
   public final byte readByte() {
-    return (byte)readInteger();
+    return (byte) readInteger();
   }
 
   @Override
   public final float readFloat() {
-    float v;
-    if (!bigEndianPlatform) {
-      v = Platform.getFloat(buffer, offset);
-    } else {
-      v = byteBuffer.getFloat(offset - Platform.BYTE_ARRAY_OFFSET);
-    }
-    offset += 4;
-    return v;
+    return getBuffer(4).getFloat();
   }
 
   @Override
   public final double readDouble() {
-    double v;
-    if (!bigEndianPlatform) {
-      v = Platform.getDouble(buffer, offset);
-    } else {
-      v = byteBuffer.getDouble(offset - Platform.BYTE_ARRAY_OFFSET);
-    }
-    offset += 8;
-    return v;
+    return getBuffer(8).getDouble();
   }
 
   @Override
   public final void readBinary(int total, WritableColumnVector v, int rowId) {
     for (int i = 0; i < total; i++) {
       int len = readInteger();
-      int start = offset;
-      offset += len;
-      v.putByteArray(rowId + i, buffer, start - Platform.BYTE_ARRAY_OFFSET, 
len);
+      ByteBuffer buffer = getBuffer(len);
+      if (buffer.hasArray()) {
+        v.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + 
buffer.position(), len);
+      } else {
+        byte[] bytes = new byte[len];
+        buffer.get(bytes);
+        v.putByteArray(rowId + i, bytes);
+      }
     }
   }
 
   @Override
   public final Binary readBinary(int len) {
-    Binary result = Binary.fromConstantByteArray(buffer, offset - 
Platform.BYTE_ARRAY_OFFSET, len);
-    offset += len;
-    return result;
+    ByteBuffer buffer = getBuffer(len);
+    if (buffer.hasArray()) {
+      return Binary.fromConstantByteArray(
+          buffer.array(), buffer.arrayOffset() + buffer.position(), len);
+    } else {
+      byte[] bytes = new byte[len];
+      buffer.get(bytes);
+      return Binary.fromConstantByteArray(bytes);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cac9b1de/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index fc7fa70..fe3d31a 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.bitpacking.BytePacker;
@@ -27,6 +28,9 @@ import org.apache.parquet.io.api.Binary;
 
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 /**
  * A values reader for Parquet's run-length encoded data. This is based off of 
the version in
  * parquet-mr with these changes:
@@ -49,9 +53,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   }
 
   // Encoded data.
-  private byte[] in;
-  private int end;
-  private int offset;
+  private ByteBufferInputStream in;
 
   // bit/byte width of decoded data and utility to batch unpack them.
   private int bitWidth;
@@ -70,45 +72,40 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   // If true, the bit width is fixed. This decoder is used in different places 
and this also
   // controls if we need to read the bitwidth from the beginning of the data 
stream.
   private final boolean fixedWidth;
+  private final boolean readLength;
 
   public VectorizedRleValuesReader() {
-    fixedWidth = false;
+    this.fixedWidth = false;
+    this.readLength = false;
   }
 
   public VectorizedRleValuesReader(int bitWidth) {
-    fixedWidth = true;
+    this.fixedWidth = true;
+    this.readLength = bitWidth != 0;
+    init(bitWidth);
+  }
+
+  public VectorizedRleValuesReader(int bitWidth, boolean readLength) {
+    this.fixedWidth = true;
+    this.readLength = readLength;
     init(bitWidth);
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] page, int start) {
-    this.offset = start;
-    this.in = page;
+  public void initFromPage(int valueCount, ByteBufferInputStream in) throws 
IOException {
+    this.in = in;
     if (fixedWidth) {
-      if (bitWidth != 0) {
+      // initialize for repetition and definition levels
+      if (readLength) {
         int length = readIntLittleEndian();
-        this.end = this.offset + length;
+        this.in = in.sliceStream(length);
       }
     } else {
-      this.end = page.length;
-      if (this.end != this.offset) init(page[this.offset++] & 255);
-    }
-    if (bitWidth == 0) {
-      // 0 bit width, treat this as an RLE run of valueCount number of 0's.
-      this.mode = MODE.RLE;
-      this.currentCount = valueCount;
-      this.currentValue = 0;
-    } else {
-      this.currentCount = 0;
+      // initialize for values
+      if (in.available() > 0) {
+        init(in.read());
+      }
     }
-  }
-
-  // Initialize the reader from a buffer. This is used for the V2 page 
encoding where the
-  // definition are in its own buffer.
-  public void initFromBuffer(int valueCount, byte[] data) {
-    this.offset = 0;
-    this.in = data;
-    this.end = data.length;
     if (bitWidth == 0) {
       // 0 bit width, treat this as an RLE run of valueCount number of 0's.
       this.mode = MODE.RLE;
@@ -130,11 +127,6 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   }
 
   @Override
-  public int getNextOffset() {
-    return this.end;
-  }
-
-  @Override
   public boolean readBoolean() {
     return this.readInteger() != 0;
   }
@@ -182,7 +174,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       WritableColumnVector c,
       int rowId,
       int level,
-      VectorizedValuesReader data) {
+      VectorizedValuesReader data) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -217,7 +209,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       WritableColumnVector c,
       int rowId,
       int level,
-      VectorizedValuesReader data) {
+      VectorizedValuesReader data) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -251,7 +243,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       WritableColumnVector c,
       int rowId,
       int level,
-      VectorizedValuesReader data) {
+      VectorizedValuesReader data) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -285,7 +277,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       WritableColumnVector c,
       int rowId,
       int level,
-      VectorizedValuesReader data) {
+      VectorizedValuesReader data) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -321,7 +313,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       WritableColumnVector c,
       int rowId,
       int level,
-      VectorizedValuesReader data) {
+      VectorizedValuesReader data) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -355,7 +347,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       WritableColumnVector c,
       int rowId,
       int level,
-      VectorizedValuesReader data) {
+      VectorizedValuesReader data) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -389,7 +381,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       WritableColumnVector c,
       int rowId,
       int level,
-      VectorizedValuesReader data) {
+      VectorizedValuesReader data) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -423,7 +415,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       WritableColumnVector c,
       int rowId,
       int level,
-      VectorizedValuesReader data) {
+      VectorizedValuesReader data) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -462,7 +454,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       WritableColumnVector nulls,
       int rowId,
       int level,
-      VectorizedValuesReader data) {
+      VectorizedValuesReader data) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -559,12 +551,12 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   /**
    * Reads the next varint encoded int.
    */
-  private int readUnsignedVarInt() {
+  private int readUnsignedVarInt() throws IOException {
     int value = 0;
     int shift = 0;
     int b;
     do {
-      b = in[offset++] & 255;
+      b = in.read();
       value |= (b & 0x7F) << shift;
       shift += 7;
     } while ((b & 0x80) != 0);
@@ -574,35 +566,32 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   /**
    * Reads the next 4 byte little endian int.
    */
-  private int readIntLittleEndian() {
-    int ch4 = in[offset] & 255;
-    int ch3 = in[offset + 1] & 255;
-    int ch2 = in[offset + 2] & 255;
-    int ch1 = in[offset + 3] & 255;
-    offset += 4;
+  private int readIntLittleEndian() throws IOException {
+    int ch4 = in.read();
+    int ch3 = in.read();
+    int ch2 = in.read();
+    int ch1 = in.read();
     return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
   }
 
   /**
    * Reads the next byteWidth little endian int.
    */
-  private int readIntLittleEndianPaddedOnBitWidth() {
+  private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
     switch (bytesWidth) {
       case 0:
         return 0;
       case 1:
-        return in[offset++] & 255;
+        return in.read();
       case 2: {
-        int ch2 = in[offset] & 255;
-        int ch1 = in[offset + 1] & 255;
-        offset += 2;
+        int ch2 = in.read();
+        int ch1 = in.read();
         return (ch1 << 8) + ch2;
       }
       case 3: {
-        int ch3 = in[offset] & 255;
-        int ch2 = in[offset + 1] & 255;
-        int ch1 = in[offset + 2] & 255;
-        offset += 3;
+        int ch3 = in.read();
+        int ch2 = in.read();
+        int ch1 = in.read();
         return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
       }
       case 4: {
@@ -619,32 +608,36 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   /**
    * Reads the next group.
    */
-  private void readNextGroup()  {
-    int header = readUnsignedVarInt();
-    this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
-    switch (mode) {
-      case RLE:
-        this.currentCount = header >>> 1;
-        this.currentValue = readIntLittleEndianPaddedOnBitWidth();
-        return;
-      case PACKED:
-        int numGroups = header >>> 1;
-        this.currentCount = numGroups * 8;
-        int bytesToRead = ceil8(this.currentCount * this.bitWidth);
-
-        if (this.currentBuffer.length < this.currentCount) {
-          this.currentBuffer = new int[this.currentCount];
-        }
-        currentBufferIdx = 0;
-        int valueIndex = 0;
-        for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex 
+= this.bitWidth) {
-          this.packer.unpack8Values(in, byteIndex, this.currentBuffer, 
valueIndex);
-          valueIndex += 8;
-        }
-        offset += bytesToRead;
-        return;
-      default:
-        throw new ParquetDecodingException("not a valid mode " + this.mode);
+  private void readNextGroup() {
+    try {
+      int header = readUnsignedVarInt();
+      this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+      switch (mode) {
+        case RLE:
+          this.currentCount = header >>> 1;
+          this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+          return;
+        case PACKED:
+          int numGroups = header >>> 1;
+          this.currentCount = numGroups * 8;
+
+          if (this.currentBuffer.length < this.currentCount) {
+            this.currentBuffer = new int[this.currentCount];
+          }
+          currentBufferIdx = 0;
+          int valueIndex = 0;
+          while (valueIndex < this.currentCount) {
+            // values are bit packed 8 at a time, so reading bitWidth will 
always work
+            ByteBuffer buffer = in.slice(bitWidth);
+            this.packer.unpack8Values(buffer, buffer.position(), 
this.currentBuffer, valueIndex);
+            valueIndex += 8;
+          }
+          return;
+        default:
+          throw new ParquetDecodingException("not a valid mode " + this.mode);
+      }
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to read from input stream", 
e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cac9b1de/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index f36a89a..9cfc307 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -81,7 +81,10 @@ object ParquetOptions {
     "uncompressed" -> CompressionCodecName.UNCOMPRESSED,
     "snappy" -> CompressionCodecName.SNAPPY,
     "gzip" -> CompressionCodecName.GZIP,
-    "lzo" -> CompressionCodecName.LZO)
+    "lzo" -> CompressionCodecName.LZO,
+    "lz4" -> CompressionCodecName.LZ4,
+    "brotli" -> CompressionCodecName.BROTLI,
+    "zstd" -> CompressionCodecName.ZSTD)
 
   def getParquetCompressionCodecName(name: String): String = {
     shortParquetCompressionCodecNames(name).name()

http://git-wip-us.apache.org/repos/asf/spark/blob/cac9b1de/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
index 51dac11..58ed201 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
@@ -89,7 +89,7 @@ Database              default
 Table                  t                                           
 Partition Values       [ds=2017-08-01, hr=10]                      
 Location [not included in 
comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10                       
 
-Partition Statistics   1067 bytes, 3 rows                          
+Partition Statistics   1121 bytes, 3 rows                          
                                                                    
 # Storage Information                                              
 Location [not included in comparison]sql/core/spark-warehouse/t
@@ -122,7 +122,7 @@ Database                    default
 Table                  t                                           
 Partition Values       [ds=2017-08-01, hr=10]                      
 Location [not included in 
comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10                       
 
-Partition Statistics   1067 bytes, 3 rows                          
+Partition Statistics   1121 bytes, 3 rows                          
                                                                    
 # Storage Information                                              
 Location [not included in comparison]sql/core/spark-warehouse/t
@@ -147,7 +147,7 @@ Database                    default
 Table                  t                                           
 Partition Values       [ds=2017-08-01, hr=11]                      
 Location [not included in 
comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11                       
 
-Partition Statistics   1080 bytes, 4 rows                          
+Partition Statistics   1098 bytes, 4 rows                          
                                                                    
 # Storage Information                                              
 Location [not included in comparison]sql/core/spark-warehouse/t
@@ -180,7 +180,7 @@ Database                    default
 Table                  t                                           
 Partition Values       [ds=2017-08-01, hr=10]                      
 Location [not included in 
comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10                       
 
-Partition Statistics   1067 bytes, 3 rows                          
+Partition Statistics   1121 bytes, 3 rows                          
                                                                    
 # Storage Information                                              
 Location [not included in comparison]sql/core/spark-warehouse/t
@@ -205,7 +205,7 @@ Database                    default
 Table                  t                                           
 Partition Values       [ds=2017-08-01, hr=11]                      
 Location [not included in 
comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11                       
 
-Partition Statistics   1080 bytes, 4 rows                          
+Partition Statistics   1098 bytes, 4 rows                          
                                                                    
 # Storage Information                                              
 Location [not included in comparison]sql/core/spark-warehouse/t
@@ -230,7 +230,7 @@ Database                    default
 Table                  t                                           
 Partition Values       [ds=2017-09-01, hr=5]                       
 Location [not included in 
comparison]sql/core/spark-warehouse/t/ds=2017-09-01/hr=5                        
 
-Partition Statistics   1054 bytes, 2 rows                          
+Partition Statistics   1144 bytes, 2 rows                          
                                                                    
 # Storage Information                                              
 Location [not included in comparison]sql/core/spark-warehouse/t

http://git-wip-us.apache.org/repos/asf/spark/blob/cac9b1de/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 863703b..efc2f20 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
             case plan: InMemoryRelation => plan
           }.head
           // InMemoryRelation's stats is file size before the underlying RDD 
is materialized
-          assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+          assert(inMemoryRelation.computeStats().sizeInBytes === 800)
 
           // InMemoryRelation's stats is updated after materializing RDD
           dfFromFile.collect()
@@ -516,7 +516,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
 
           // Even CBO enabled, InMemoryRelation's stats keeps as the file size 
before table's stats
           // is calculated
-          assert(inMemoryRelation2.computeStats().sizeInBytes === 740)
+          assert(inMemoryRelation2.computeStats().sizeInBytes === 800)
 
           // InMemoryRelation's stats should be updated after calculating 
stats of the table
           // clear cache to simulate a fresh environment


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to