This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 03fd1f0b8c DRILL-8458: Use correct size of definition level bytes
slice when reading Parquet v2 data page (#2838)
03fd1f0b8c is described below
commit 03fd1f0b8c5d01fa4befc5df122714b51f8d9ce8
Author: Peter Franzen <[email protected]>
AuthorDate: Tue Oct 31 15:53:34 2023 +0100
DRILL-8458: Use correct size of definition level bytes slice when reading
Parquet v2 data page (#2838)
---
.../parquet/hadoop/ColumnChunkIncReadStore.java | 4 +--
.../parquet/ParquetSimpleTestFileGenerator.java | 39 ++++++++++++++++++++-
.../exec/store/parquet/TestParquetComplex.java | 12 +++++++
.../parquet/parquet_v2_repeated_int.parquet | Bin 0 -> 602 bytes
4 files changed, 52 insertions(+), 3 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
index 773a861213..7834eaa816 100644
---
a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
+++
b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -217,13 +217,13 @@ public class ColumnChunkIncReadStore implements
PageReadStore {
int pageBufOffset = 0;
ByteBuffer bb = (ByteBuffer) pageBuf.position(pageBufOffset);
BytesInput repLevelBytes = BytesInput.from(
- (ByteBuffer) bb.slice().limit(pageBufOffset + repLevelSize)
+ (ByteBuffer) bb.slice().limit(repLevelSize)
);
pageBufOffset += repLevelSize;
bb = (ByteBuffer) pageBuf.position(pageBufOffset);
final BytesInput defLevelBytes = BytesInput.from(
- (ByteBuffer) bb.slice().limit(pageBufOffset + defLevelSize)
+ (ByteBuffer) bb.slice().limit(defLevelSize)
);
pageBufOffset += defLevelSize;
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
index 232aec9a6a..efd1b4fd17 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
@@ -206,11 +206,17 @@ public class ParquetSimpleTestFileGenerator {
" } \n" +
" } \n" +
"} \n";
+ public static String repeatedIntSchemaMsg =
+ "message ParquetRepeated { \n" +
+ " required int32 rowKey; \n" +
+ " repeated int32 repeatedInt ( INTEGER(32,true) ) ; \n" +
+ "} \n";
public static MessageType simpleSchema =
MessageTypeParser.parseMessageType(simpleSchemaMsg);
public static MessageType complexSchema =
MessageTypeParser.parseMessageType(complexSchemaMsg);
public static MessageType simpleNullableSchema =
MessageTypeParser.parseMessageType(simpleNullableSchemaMsg);
public static MessageType complexNullableSchema =
MessageTypeParser.parseMessageType(complexNullableSchemaMsg);
+ public static MessageType repeatedIntSchema =
MessageTypeParser.parseMessageType(repeatedIntSchemaMsg);
public static Path initFile(String fileName) {
@@ -218,6 +224,14 @@ public class ParquetSimpleTestFileGenerator {
}
public static ParquetWriter<Group> initWriter(MessageType schema, String
fileName, boolean dictEncoding) throws IOException {
+ return initWriter(schema, fileName,
ParquetProperties.WriterVersion.PARQUET_1_0, dictEncoding);
+ }
+
+ public static ParquetWriter<Group> initWriter(
+ MessageType schema,
+ String fileName,
+ ParquetProperties.WriterVersion version,
+ boolean dictEncoding) throws IOException {
GroupWriteSupport.setSchema(schema, conf);
@@ -228,7 +242,7 @@ public class ParquetSimpleTestFileGenerator {
.withPageSize(1024)
.withDictionaryPageSize(512)
.withValidation(false)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
+ .withWriterVersion(version)
.withConf(conf)
.build();
}
@@ -455,12 +469,32 @@ public class ParquetSimpleTestFileGenerator {
}
}
+ public static void writeRepeatedIntValues(
+ SimpleGroupFactory groupFactory,
+ ParquetWriter<Group> writer,
+ int numRows) throws IOException {
+
+ int[] repeatedValues = {666, 1492, 4711};
+
+ for (int i = 0; i< numRows; i++) {
+
+ Group g = groupFactory.newGroup();
+ g.append("rowKey", i+1);
+ for (int r :repeatedValues) {
+ g.append("repeatedInt", r);
+ }
+
+ writer.write(g);
+ }
+ }
+
public static void main(String[] args) throws IOException {
SimpleGroupFactory sgf = new SimpleGroupFactory(simpleSchema);
GroupFactory gf = new SimpleGroupFactory(complexSchema);
SimpleGroupFactory sngf = new SimpleGroupFactory(simpleNullableSchema);
GroupFactory ngf = new SimpleGroupFactory(complexNullableSchema);
+ SimpleGroupFactory repeatedIntGroupFactory = new
SimpleGroupFactory(repeatedIntSchema);
// Generate files with dictionary encoding enabled and disabled
ParquetWriter<Group> simpleWriter = initWriter(simpleSchema,
"drill/parquet_test_file_simple", true);
@@ -471,6 +505,7 @@ public class ParquetSimpleTestFileGenerator {
ParquetWriter<Group> complexNoDictWriter = initWriter(complexSchema,
"drill/parquet_test_file_complex_nodict", false);
ParquetWriter<Group> simpleNullableNoDictWriter =
initWriter(simpleNullableSchema,
"drill/parquet_test_file_simple_nullable_nodict", false);
ParquetWriter<Group> complexNullableNoDictWriter =
initWriter(complexNullableSchema,
"drill/parquet_test_file_complex_nullable_nodict", false);
+ ParquetWriter<Group> repeatedIntV2Writer = initWriter(repeatedIntSchema,
"drill/parquet_v2_repeated_int.parquet",
ParquetProperties.WriterVersion.PARQUET_2_0, true);
ParquetSimpleTestFileGenerator.writeSimpleValues(sgf, simpleWriter, false);
ParquetSimpleTestFileGenerator.writeSimpleValues(sngf,
simpleNullableWriter, true);
@@ -480,6 +515,7 @@ public class ParquetSimpleTestFileGenerator {
ParquetSimpleTestFileGenerator.writeSimpleValues(sngf,
simpleNullableNoDictWriter, true);
ParquetSimpleTestFileGenerator.writeComplexValues(gf, complexNoDictWriter,
false);
ParquetSimpleTestFileGenerator.writeComplexValues(ngf,
complexNullableNoDictWriter, true);
+
ParquetSimpleTestFileGenerator.writeRepeatedIntValues(repeatedIntGroupFactory,
repeatedIntV2Writer, 100);
simpleWriter.close();
complexWriter.close();
@@ -489,6 +525,7 @@ public class ParquetSimpleTestFileGenerator {
complexNoDictWriter.close();
simpleNullableNoDictWriter.close();
complexNullableNoDictWriter.close();
+ repeatedIntV2Writer.close();
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
index e03af04e2f..579f3ff2ad 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
@@ -901,4 +901,16 @@ public class TestParquetComplex extends BaseTestQuery {
.baselineValues(firstValue, null, secondValue)
.go();
}
+
+
+ @Test
+ public void testSelectRepeatedInt() throws Exception {
+ // DRILL-8458
+ String query = "select repeatedInt as r from %s";
+ testBuilder()
+ .sqlQuery(query, "cp.`parquet/parquet_v2_repeated_int.parquet`")
+ .unOrdered()
+ .expectsNumRecords(100)
+ .go();
+ }
}
diff --git
a/exec/java-exec/src/test/resources/parquet/parquet_v2_repeated_int.parquet
b/exec/java-exec/src/test/resources/parquet/parquet_v2_repeated_int.parquet
new file mode 100644
index 0000000000..91fed9b844
Binary files /dev/null and
b/exec/java-exec/src/test/resources/parquet/parquet_v2_repeated_int.parquet
differ