This is an automated email from the ASF dual-hosted git repository. dzamo pushed a commit to branch 1.21 in repository https://gitbox.apache.org/repos/asf/drill.git
commit a0c255ff2b064f86aac788034c0aae71aa00327f Author: Maksym Rymar <[email protected]> AuthorDate: Wed Apr 10 16:44:14 2024 +0300 DRILL-8486: fix handling of long variable length entries during bulk parquet reading (#2898) --- .../columnreaders/VarLenEntryDictionaryReader.java | 7 +++--- .../parquet/columnreaders/VarLenEntryReader.java | 6 ++--- .../VarLenNullableDictionaryReader.java | 26 +++++++++++++--------- .../columnreaders/VarLenNullableEntryReader.java | 6 ++--- 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java index d731450eb5..f380fb0c2f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java @@ -44,7 +44,7 @@ final class VarLenEntryDictionaryReader extends VarLenAbstractPageEntryReader { if (bulkProcess()) { return getEntryBulk(valuesToRead); } - return getEntrySingle(valuesToRead); + return getEntrySingle(); } private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) { @@ -82,7 +82,7 @@ final class VarLenEntryDictionaryReader extends VarLenAbstractPageEntryReader { // We're here either because a) the Parquet metadata is wrong (advertises more values than the real count) // or the first value being processed ended up to be too long for the buffer. if (numValues == 0) { - return getEntrySingle(valuesToRead); + return getEntrySingle(); } // Now set the bulk entry @@ -91,7 +91,7 @@ final class VarLenEntryDictionaryReader extends VarLenAbstractPageEntryReader { return entry; } - private final VarLenColumnBulkEntry getEntrySingle(int valsToReadWithinPage) { + private VarLenColumnBulkEntry getEntrySingle() { final ValuesReaderWrapper valueReader = pageInfo.encodedValueReader; final int[] valueLengths = entry.getValuesLength(); final Binary currEntry = valueReader.getEntry(); @@ -99,6 +99,7 @@ final class VarLenEntryDictionaryReader extends VarLenAbstractPageEntryReader { // Is there enough memory to handle this large value? if (batchMemoryConstraintsReached(0, 4, dataLen)) { + valueReader.pushBack(currEntry); entry.set(0, 0, 0, 0); // no data to be consumed return entry; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java index 88f5676979..0c6df3f481 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java @@ -43,7 +43,7 @@ final class VarLenEntryReader extends VarLenAbstractPageEntryReader { if (bulkProcess()) { return getEntryBulk(valuesToRead); } - return getEntrySingle(valuesToRead); + return getEntrySingle(); } private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) { @@ -92,7 +92,7 @@ final class VarLenEntryReader extends VarLenAbstractPageEntryReader { // We're here either because a) the Parquet metadata is wrong (advertises more values than the real count) // or the first value being processed ended up to be too long for the buffer. if (numValues == 0) { - return getEntrySingle(valuesToRead); + return getEntrySingle(); } // Update the page data buffer offset @@ -109,7 +109,7 @@ final class VarLenEntryReader extends VarLenAbstractPageEntryReader { return entry; } - private final VarLenColumnBulkEntry getEntrySingle(int valuesToRead) { + private VarLenColumnBulkEntry getEntrySingle() { if (remainingPageData() < 4) { final String message = String.format("Invalid Parquet page metadata; cannot process advertised page count.."); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java index 9a0a249bbf..c1a58c8b61 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java @@ -19,25 +19,30 @@ package org.apache.drill.exec.store.parquet.columnreaders; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import java.nio.ByteBuffer; + import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo; import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ValuesReaderWrapper; import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo; import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback; import org.apache.parquet.io.api.Binary; -/** Handles nullable variable data types using a dictionary */ +/** + * Handles nullable variable data types using a dictionary + */ final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader { VarLenNullableDictionaryReader(ByteBuffer buffer, - PageDataInfo pageInfo, - ColumnPrecisionInfo columnPrecInfo, - VarLenColumnBulkEntry entry, - VarLenColumnBulkInputCallback containerCallback) { + PageDataInfo pageInfo, + ColumnPrecisionInfo columnPrecInfo, + VarLenColumnBulkEntry entry, + VarLenColumnBulkInputCallback containerCallback) { super(buffer, pageInfo, columnPrecInfo, entry, containerCallback); } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override final VarLenColumnBulkEntry getEntry(int valuesToRead) { assert valuesToRead > 0; @@ -46,7 +51,7 @@ final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader if (bulkProcess()) { return getEntryBulk(valuesToRead); } - return getEntrySingle(valuesToRead); + return getEntrySingle(); } private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) { @@ -66,7 +71,7 @@ final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader // Initialize the reader if needed pageInfo.definitionLevels.readFirstIntegerIfNeeded(); - for (int idx = 0; idx < readBatch; ++idx ) { + for (int idx = 0; idx < readBatch; ++idx) { if (pageInfo.definitionLevels.readCurrInteger() == 1) { final Binary currEntry = valueReader.getEntry(); final int dataLen = currEntry.length(); @@ -97,7 +102,7 @@ final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader // We're here either because a) the Parquet metadata is wrong (advertises more values than the real count) // or the first value being processed ended up to be too long for the buffer. if (numValues == 0) { - return getEntrySingle(valuesToRead); + return getEntrySingle(); } entry.set(0, tgtPos, numValues, numValues - numNulls); @@ -105,7 +110,7 @@ final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader return entry; } - private final VarLenColumnBulkEntry getEntrySingle(int valsToReadWithinPage) { + private VarLenColumnBulkEntry getEntrySingle() { final int[] valueLengths = entry.getValuesLength(); final ValuesReaderWrapper valueReader = pageInfo.encodedValueReader; @@ -118,6 +123,7 @@ final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader // Is there enough memory to handle this large value? if (batchMemoryConstraintsReached(1, 4, dataLen)) { + valueReader.pushBack(currEntry); entry.set(0, 0, 0, 0); // no data to be consumed return entry; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java index e29b14c8b9..eacb858440 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java @@ -45,7 +45,7 @@ final class VarLenNullableEntryReader extends VarLenAbstractPageEntryReader { if (bulkProcess()) { return getEntryBulk(valuesToRead); } - return getEntrySingle(valuesToRead); + return getEntrySingle(); } VarLenColumnBulkEntry getEntryBulk(int valuesToRead) { @@ -108,7 +108,7 @@ final class VarLenNullableEntryReader extends VarLenAbstractPageEntryReader { // We're here either because a) the Parquet metadata is wrong (advertises more values than the real count) // or the first value being processed ended up to be too long for the buffer. if (numValues == 0) { - return getEntrySingle(valuesToRead); + return getEntrySingle(); } // Update the page data buffer offset @@ -126,7 +126,7 @@ final class VarLenNullableEntryReader extends VarLenAbstractPageEntryReader { return entry; } - VarLenColumnBulkEntry getEntrySingle(int valuesToRead) { + VarLenColumnBulkEntry getEntrySingle() { // Initialize the reader if needed pageInfo.definitionLevels.readFirstIntegerIfNeeded();
