This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b09c3d  Spark: Add limited support for vectorized reads for Parquet 
V2 (#2749)
4b09c3d is described below

commit 4b09c3d89958714a69116b72bf7b8896f4880cbb
Author: Samarth Jain <[email protected]>
AuthorDate: Thu Jul 1 10:00:48 2021 -0700

    Spark: Add limited support for vectorized reads for Parquet V2 (#2749)
    
    With this change, we have added support for Parquet data written in V2 
format.
    The only data encodings we support are dictionary and plain.
    Vectorized reads against data written using Delta/RLE and other encodings 
are
    not supported. As of this commit, note that the Spark Parquet vectorized 
reads also don't
    support vectorized reads for such encodings.
---
 .../parquet/BaseVectorizedParquetValuesReader.java | 12 +++---
 .../vectorized/parquet/VectorizedPageIterator.java | 22 +++++++----
 .../VectorizedParquetDefinitionLevelReader.java    |  5 +++
 .../apache/iceberg/parquet/BasePageIterator.java   |  3 +-
 .../java/org/apache/iceberg/parquet/Parquet.java   |  5 +++
 .../vectorized/TestParquetVectorizedReads.java     | 46 ++++++++++++++++++++++
 6 files changed, 79 insertions(+), 14 deletions(-)

diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/BaseVectorizedParquetValuesReader.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/BaseVectorizedParquetValuesReader.java
index 60799d0..5ef6efa 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/BaseVectorizedParquetValuesReader.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/BaseVectorizedParquetValuesReader.java
@@ -80,12 +80,14 @@ public class BaseVectorizedParquetValuesReader extends 
ValuesReader {
     this.setArrowValidityVector = setValidityVector;
   }
 
-  public BaseVectorizedParquetValuesReader(
-      int bitWidth,
-      int maxDefLevel,
-      boolean setValidityVector) {
+  public BaseVectorizedParquetValuesReader(int bitWidth, int maxDefLevel, 
boolean setValidityVector) {
+    this(bitWidth, maxDefLevel, bitWidth != 0, setValidityVector);
+  }
+
+  public BaseVectorizedParquetValuesReader(int bitWidth, int maxDefLevel, 
boolean readLength,
+                                           boolean setValidityVector) {
     this.fixedWidth = true;
-    this.readLength = bitWidth != 0;
+    this.readLength = readLength;
     this.maxDefLevel = maxDefLevel;
     this.setArrowValidityVector = setValidityVector;
     init(bitWidth);
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
index 381edda..01abdd1 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
@@ -512,6 +512,10 @@ public class VectorizedPageIterator extends 
BasePageIterator {
         throw new ParquetDecodingException("could not read page in col " + 
desc, e);
       }
     } else {
+      if (dataEncoding != Encoding.PLAIN) {
+        throw new UnsupportedOperationException("Cannot support vectorized 
reads for column " + desc + " with " +
+                "encoding " + dataEncoding + ". Disable vectorized reads to 
read this table/file");
+      }
       plainValuesReader = new ValuesAsBytesReader();
       plainValuesReader.initFromPage(valueCount, in);
       dictionaryDecodeMode = DictionaryDecodeMode.NONE;
@@ -526,18 +530,20 @@ public class VectorizedPageIterator extends 
BasePageIterator {
   @Override
   protected void initDefinitionLevelsReader(DataPageV1 dataPageV1, 
ColumnDescriptor desc, ByteBufferInputStream in,
                                             int triplesCount) throws 
IOException {
-    this.vectorizedDefinitionLevelReader = 
newVectorizedDefinitionLevelReader(desc);
+    int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
+    this.vectorizedDefinitionLevelReader = new 
VectorizedParquetDefinitionLevelReader(bitWidth,
+            desc.getMaxDefinitionLevel(), setArrowValidityVector);
     this.vectorizedDefinitionLevelReader.initFromPage(triplesCount, in);
   }
 
   @Override
-  protected void initDefinitionLevelsReader(DataPageV2 dataPageV2, 
ColumnDescriptor desc) {
-    this.vectorizedDefinitionLevelReader = 
newVectorizedDefinitionLevelReader(desc);
-  }
-
-  private VectorizedParquetDefinitionLevelReader 
newVectorizedDefinitionLevelReader(ColumnDescriptor desc) {
-    int bitwidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
-    return new VectorizedParquetDefinitionLevelReader(bitwidth, 
desc.getMaxDefinitionLevel(), setArrowValidityVector);
+  protected void initDefinitionLevelsReader(DataPageV2 dataPageV2, 
ColumnDescriptor desc) throws IOException {
+    int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
+    // do not read the length from the stream. v2 pages handle dividing the 
page bytes.
+    this.vectorizedDefinitionLevelReader = new 
VectorizedParquetDefinitionLevelReader(bitWidth,
+            desc.getMaxDefinitionLevel(), false, setArrowValidityVector);
+    this.vectorizedDefinitionLevelReader.initFromPage(
+            dataPageV2.getValueCount(), 
dataPageV2.getDefinitionLevels().toInputStream());
   }
 
 }
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index 34a996e..e38b908 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -39,6 +39,11 @@ public final class VectorizedParquetDefinitionLevelReader 
extends BaseVectorized
     super(bitWidth, maxDefLevel, setArrowValidityVector);
   }
 
+  public VectorizedParquetDefinitionLevelReader(int bitWidth, int maxDefLevel, 
boolean readLength,
+                                                boolean 
setArrowValidityVector) {
+    super(bitWidth, maxDefLevel, readLength, setArrowValidityVector);
+  }
+
   public void readBatchOfDictionaryIds(
       final IntVector vector,
       final int startOffset,
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java
index cf79418..7e5abfb 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java
@@ -77,7 +77,8 @@ public abstract class BasePageIterator {
   protected abstract void initDefinitionLevelsReader(DataPageV1 dataPageV1, 
ColumnDescriptor descriptor,
                                                      ByteBufferInputStream in, 
int count) throws IOException;
 
-  protected abstract void initDefinitionLevelsReader(DataPageV2 dataPageV2, 
ColumnDescriptor descriptor);
+  protected abstract void initDefinitionLevelsReader(DataPageV2 dataPageV2, 
ColumnDescriptor descriptor)
+          throws IOException;
 
   public int currentPageCount() {
     return triplesCount;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 099dbcf..d57ef5a 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -170,6 +170,11 @@ public class Parquet {
       return this;
     }
 
+    public WriteBuilder writerVersion(WriterVersion version) {
+      this.writerVersion = version;
+      return this;
+    }
+
     @SuppressWarnings("unchecked")
     private <T> WriteSupport<T> getWriteSupport(MessageType type) {
       if (writeSupport != null) {
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
 
b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
index d330673..48dcc94 100644
--- 
a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
+++ 
b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
@@ -110,6 +111,14 @@ public class TestParquetVectorizedReads extends 
AvroDataTest {
         .build();
   }
 
+  FileAppender<GenericData.Record> getParquetV2Writer(Schema schema, File 
testFile) throws IOException {
+    return Parquet.write(Files.localOutput(testFile))
+            .schema(schema)
+            .named("test")
+            .writerVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+            .build();
+  }
+
   void assertRecordsMatch(
       Schema schema, int expectedSize, Iterable<GenericData.Record> expected, 
File testFile,
       boolean setAndCheckArrowValidityBuffer, boolean reuseContainers, int 
batchSize)
@@ -260,4 +269,41 @@ public class TestParquetVectorizedReads extends 
AvroDataTest {
     assertRecordsMatch(readSchema, 30000, data, dataFile, false,
         true, BATCH_SIZE);
   }
+
+  @Test
+  public void testSupportedReadsForParquetV2() throws Exception {
+    // Only float and double column types are written using plain encoding 
with Parquet V2
+    Schema schema = new Schema(
+            optional(102, "float_data", Types.FloatType.get()),
+            optional(103, "double_data", Types.DoubleType.get()));
+
+    File dataFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", dataFile.delete());
+    Iterable<GenericData.Record> data = generateData(schema, 30000, 0L,
+            RandomData.DEFAULT_NULL_PERCENTAGE, IDENTITY);
+    try (FileAppender<GenericData.Record> writer = getParquetV2Writer(schema, 
dataFile)) {
+      writer.addAll(data);
+    }
+    assertRecordsMatch(schema, 30000, data, dataFile, false,
+            true, BATCH_SIZE);
+  }
+
+  @Test
+  public void testUnsupportedReadsForParquetV2() throws Exception {
+    // Longs, ints, string types etc use delta encoding and which are not 
supported for vectorized reads
+    Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields());
+    File dataFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", dataFile.delete());
+    Iterable<GenericData.Record> data = generateData(schema, 30000, 0L,
+        RandomData.DEFAULT_NULL_PERCENTAGE, IDENTITY);
+    try (FileAppender<GenericData.Record> writer = getParquetV2Writer(schema, 
dataFile)) {
+      writer.addAll(data);
+    }
+    AssertHelpers.assertThrows("Vectorized reads not supported",
+        UnsupportedOperationException.class, "Cannot support vectorized reads 
for column", () -> {
+          assertRecordsMatch(schema, 30000, data, dataFile, false,
+              true, BATCH_SIZE);
+          return null;
+        });
+  }
 }

Reply via email to