This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 4b09c3d Spark: Add limited support for vectorized reads for Parquet
V2 (#2749)
4b09c3d is described below
commit 4b09c3d89958714a69116b72bf7b8896f4880cbb
Author: Samarth Jain <[email protected]>
AuthorDate: Thu Jul 1 10:00:48 2021 -0700
Spark: Add limited support for vectorized reads for Parquet V2 (#2749)
With this change, we have added support for Parquet data written in V2
format.
The only data encodings we support are dictionary and plain.
Vectorized reads against data written using Delta/RLE and other encodings
are
not supported. As of this commit, note that the Spark Parquet vectorized
reads also don't
support vectorized reads for such encodings.
---
.../parquet/BaseVectorizedParquetValuesReader.java | 12 +++---
.../vectorized/parquet/VectorizedPageIterator.java | 22 +++++++----
.../VectorizedParquetDefinitionLevelReader.java | 5 +++
.../apache/iceberg/parquet/BasePageIterator.java | 3 +-
.../java/org/apache/iceberg/parquet/Parquet.java | 5 +++
.../vectorized/TestParquetVectorizedReads.java | 46 ++++++++++++++++++++++
6 files changed, 79 insertions(+), 14 deletions(-)
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/BaseVectorizedParquetValuesReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/BaseVectorizedParquetValuesReader.java
index 60799d0..5ef6efa 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/BaseVectorizedParquetValuesReader.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/BaseVectorizedParquetValuesReader.java
@@ -80,12 +80,14 @@ public class BaseVectorizedParquetValuesReader extends
ValuesReader {
this.setArrowValidityVector = setValidityVector;
}
- public BaseVectorizedParquetValuesReader(
- int bitWidth,
- int maxDefLevel,
- boolean setValidityVector) {
+ public BaseVectorizedParquetValuesReader(int bitWidth, int maxDefLevel,
boolean setValidityVector) {
+ this(bitWidth, maxDefLevel, bitWidth != 0, setValidityVector);
+ }
+
+ public BaseVectorizedParquetValuesReader(int bitWidth, int maxDefLevel,
boolean readLength,
+ boolean setValidityVector) {
this.fixedWidth = true;
- this.readLength = bitWidth != 0;
+ this.readLength = readLength;
this.maxDefLevel = maxDefLevel;
this.setArrowValidityVector = setValidityVector;
init(bitWidth);
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
index 381edda..01abdd1 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
@@ -512,6 +512,10 @@ public class VectorizedPageIterator extends
BasePageIterator {
throw new ParquetDecodingException("could not read page in col " +
desc, e);
}
} else {
+ if (dataEncoding != Encoding.PLAIN) {
+ throw new UnsupportedOperationException("Cannot support vectorized
reads for column " + desc + " with " +
+ "encoding " + dataEncoding + ". Disable vectorized reads to
read this table/file");
+ }
plainValuesReader = new ValuesAsBytesReader();
plainValuesReader.initFromPage(valueCount, in);
dictionaryDecodeMode = DictionaryDecodeMode.NONE;
@@ -526,18 +530,20 @@ public class VectorizedPageIterator extends
BasePageIterator {
@Override
protected void initDefinitionLevelsReader(DataPageV1 dataPageV1,
ColumnDescriptor desc, ByteBufferInputStream in,
int triplesCount) throws
IOException {
- this.vectorizedDefinitionLevelReader =
newVectorizedDefinitionLevelReader(desc);
+ int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
+ this.vectorizedDefinitionLevelReader = new
VectorizedParquetDefinitionLevelReader(bitWidth,
+ desc.getMaxDefinitionLevel(), setArrowValidityVector);
this.vectorizedDefinitionLevelReader.initFromPage(triplesCount, in);
}
@Override
- protected void initDefinitionLevelsReader(DataPageV2 dataPageV2,
ColumnDescriptor desc) {
- this.vectorizedDefinitionLevelReader =
newVectorizedDefinitionLevelReader(desc);
- }
-
- private VectorizedParquetDefinitionLevelReader
newVectorizedDefinitionLevelReader(ColumnDescriptor desc) {
- int bitwidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
- return new VectorizedParquetDefinitionLevelReader(bitwidth,
desc.getMaxDefinitionLevel(), setArrowValidityVector);
+ protected void initDefinitionLevelsReader(DataPageV2 dataPageV2,
ColumnDescriptor desc) throws IOException {
+ int bitWidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
+ // do not read the length from the stream. v2 pages handle dividing the
page bytes.
+ this.vectorizedDefinitionLevelReader = new
VectorizedParquetDefinitionLevelReader(bitWidth,
+ desc.getMaxDefinitionLevel(), false, setArrowValidityVector);
+ this.vectorizedDefinitionLevelReader.initFromPage(
+ dataPageV2.getValueCount(),
dataPageV2.getDefinitionLevels().toInputStream());
}
}
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index 34a996e..e38b908 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -39,6 +39,11 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
super(bitWidth, maxDefLevel, setArrowValidityVector);
}
+ public VectorizedParquetDefinitionLevelReader(int bitWidth, int maxDefLevel,
boolean readLength,
+ boolean
setArrowValidityVector) {
+ super(bitWidth, maxDefLevel, readLength, setArrowValidityVector);
+ }
+
public void readBatchOfDictionaryIds(
final IntVector vector,
final int startOffset,
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java
b/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java
index cf79418..7e5abfb 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java
@@ -77,7 +77,8 @@ public abstract class BasePageIterator {
protected abstract void initDefinitionLevelsReader(DataPageV1 dataPageV1,
ColumnDescriptor descriptor,
ByteBufferInputStream in,
int count) throws IOException;
- protected abstract void initDefinitionLevelsReader(DataPageV2 dataPageV2,
ColumnDescriptor descriptor);
+ protected abstract void initDefinitionLevelsReader(DataPageV2 dataPageV2,
ColumnDescriptor descriptor)
+ throws IOException;
public int currentPageCount() {
return triplesCount;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 099dbcf..d57ef5a 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -170,6 +170,11 @@ public class Parquet {
return this;
}
+ public WriteBuilder writerVersion(WriterVersion version) {
+ this.writerVersion = version;
+ return this;
+ }
+
@SuppressWarnings("unchecked")
private <T> WriteSupport<T> getWriteSupport(MessageType type) {
if (writeSupport != null) {
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
index d330673..48dcc94 100644
---
a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
+++
b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
@@ -110,6 +111,14 @@ public class TestParquetVectorizedReads extends
AvroDataTest {
.build();
}
+ FileAppender<GenericData.Record> getParquetV2Writer(Schema schema, File
testFile) throws IOException {
+ return Parquet.write(Files.localOutput(testFile))
+ .schema(schema)
+ .named("test")
+ .writerVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .build();
+ }
+
void assertRecordsMatch(
Schema schema, int expectedSize, Iterable<GenericData.Record> expected,
File testFile,
boolean setAndCheckArrowValidityBuffer, boolean reuseContainers, int
batchSize)
@@ -260,4 +269,41 @@ public class TestParquetVectorizedReads extends
AvroDataTest {
assertRecordsMatch(readSchema, 30000, data, dataFile, false,
true, BATCH_SIZE);
}
+
+ @Test
+ public void testSupportedReadsForParquetV2() throws Exception {
+ // Only float and double column types are written using plain encoding
with Parquet V2
+ Schema schema = new Schema(
+ optional(102, "float_data", Types.FloatType.get()),
+ optional(103, "double_data", Types.DoubleType.get()));
+
+ File dataFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", dataFile.delete());
+ Iterable<GenericData.Record> data = generateData(schema, 30000, 0L,
+ RandomData.DEFAULT_NULL_PERCENTAGE, IDENTITY);
+ try (FileAppender<GenericData.Record> writer = getParquetV2Writer(schema,
dataFile)) {
+ writer.addAll(data);
+ }
+ assertRecordsMatch(schema, 30000, data, dataFile, false,
+ true, BATCH_SIZE);
+ }
+
+ @Test
+ public void testUnsupportedReadsForParquetV2() throws Exception {
+ // Longs, ints, string types etc use delta encoding and which are not
supported for vectorized reads
+ Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields());
+ File dataFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", dataFile.delete());
+ Iterable<GenericData.Record> data = generateData(schema, 30000, 0L,
+ RandomData.DEFAULT_NULL_PERCENTAGE, IDENTITY);
+ try (FileAppender<GenericData.Record> writer = getParquetV2Writer(schema,
dataFile)) {
+ writer.addAll(data);
+ }
+ AssertHelpers.assertThrows("Vectorized reads not supported",
+ UnsupportedOperationException.class, "Cannot support vectorized reads
for column", () -> {
+ assertRecordsMatch(schema, 30000, data, dataFile, false,
+ true, BATCH_SIZE);
+ return null;
+ });
+ }
}