This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch 1.21
in repository https://gitbox.apache.org/repos/asf/drill.git

commit a0c255ff2b064f86aac788034c0aae71aa00327f
Author: Maksym Rymar <[email protected]>
AuthorDate: Wed Apr 10 16:44:14 2024 +0300

    DRILL-8486: fix handling of long variable length entries during bulk 
parquet reading (#2898)
---
 .../columnreaders/VarLenEntryDictionaryReader.java |  7 +++---
 .../parquet/columnreaders/VarLenEntryReader.java   |  6 ++---
 .../VarLenNullableDictionaryReader.java            | 26 +++++++++++++---------
 .../columnreaders/VarLenNullableEntryReader.java   |  6 ++---
 4 files changed, 26 insertions(+), 19 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
index d731450eb5..f380fb0c2f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
@@ -44,7 +44,7 @@ final class VarLenEntryDictionaryReader extends 
VarLenAbstractPageEntryReader {
     if (bulkProcess()) {
       return getEntryBulk(valuesToRead);
     }
-    return getEntrySingle(valuesToRead);
+    return getEntrySingle();
   }
 
   private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
@@ -82,7 +82,7 @@ final class VarLenEntryDictionaryReader extends 
VarLenAbstractPageEntryReader {
     // We're here either because a) the Parquet metadata is wrong (advertises 
more values than the real count)
     // or the first value being processed ended up to be too long for the 
buffer.
     if (numValues == 0) {
-      return getEntrySingle(valuesToRead);
+      return getEntrySingle();
     }
 
     // Now set the bulk entry
@@ -91,7 +91,7 @@ final class VarLenEntryDictionaryReader extends 
VarLenAbstractPageEntryReader {
     return entry;
   }
 
-  private final VarLenColumnBulkEntry getEntrySingle(int valsToReadWithinPage) 
{
+  private VarLenColumnBulkEntry getEntrySingle() {
     final ValuesReaderWrapper valueReader = pageInfo.encodedValueReader;
     final int[] valueLengths = entry.getValuesLength();
     final Binary currEntry = valueReader.getEntry();
@@ -99,6 +99,7 @@ final class VarLenEntryDictionaryReader extends 
VarLenAbstractPageEntryReader {
 
     // Is there enough memory to handle this large value?
     if (batchMemoryConstraintsReached(0, 4, dataLen)) {
+      valueReader.pushBack(currEntry);
       entry.set(0, 0, 0, 0); // no data to be consumed
       return entry;
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
index 88f5676979..0c6df3f481 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
@@ -43,7 +43,7 @@ final class VarLenEntryReader extends 
VarLenAbstractPageEntryReader {
     if (bulkProcess()) {
       return getEntryBulk(valuesToRead);
     }
-    return getEntrySingle(valuesToRead);
+    return getEntrySingle();
   }
 
   private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
@@ -92,7 +92,7 @@ final class VarLenEntryReader extends 
VarLenAbstractPageEntryReader {
     // We're here either because a) the Parquet metadata is wrong (advertises 
more values than the real count)
     // or the first value being processed ended up to be too long for the 
buffer.
     if (numValues == 0) {
-      return getEntrySingle(valuesToRead);
+      return getEntrySingle();
     }
 
     // Update the page data buffer offset
@@ -109,7 +109,7 @@ final class VarLenEntryReader extends 
VarLenAbstractPageEntryReader {
     return entry;
   }
 
-  private final VarLenColumnBulkEntry getEntrySingle(int valuesToRead) {
+  private VarLenColumnBulkEntry getEntrySingle() {
 
     if (remainingPageData() < 4) {
       final String message = String.format("Invalid Parquet page metadata; 
cannot process advertised page count..");
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
index 9a0a249bbf..c1a58c8b61 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
@@ -19,25 +19,30 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import java.nio.ByteBuffer;
+
 import 
org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
 import 
org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ValuesReaderWrapper;
 import 
org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
 import 
org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
 import org.apache.parquet.io.api.Binary;
 
-/** Handles nullable variable data types using a dictionary */
+/**
+ * Handles nullable variable data types using a dictionary
+ */
 final class VarLenNullableDictionaryReader extends 
VarLenAbstractPageEntryReader {
 
   VarLenNullableDictionaryReader(ByteBuffer buffer,
-    PageDataInfo pageInfo,
-    ColumnPrecisionInfo columnPrecInfo,
-    VarLenColumnBulkEntry entry,
-    VarLenColumnBulkInputCallback containerCallback) {
+                                 PageDataInfo pageInfo,
+                                 ColumnPrecisionInfo columnPrecInfo,
+                                 VarLenColumnBulkEntry entry,
+                                 VarLenColumnBulkInputCallback 
containerCallback) {
 
     super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
   }
 
-  /** {@inheritDoc} */
+  /**
+   * {@inheritDoc}
+   */
   @Override
   final VarLenColumnBulkEntry getEntry(int valuesToRead) {
     assert valuesToRead > 0;
@@ -46,7 +51,7 @@ final class VarLenNullableDictionaryReader extends 
VarLenAbstractPageEntryReader
     if (bulkProcess()) {
       return getEntryBulk(valuesToRead);
     }
-    return getEntrySingle(valuesToRead);
+    return getEntrySingle();
   }
 
   private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
@@ -66,7 +71,7 @@ final class VarLenNullableDictionaryReader extends 
VarLenAbstractPageEntryReader
     // Initialize the reader if needed
     pageInfo.definitionLevels.readFirstIntegerIfNeeded();
 
-    for (int idx = 0; idx < readBatch; ++idx ) {
+    for (int idx = 0; idx < readBatch; ++idx) {
       if (pageInfo.definitionLevels.readCurrInteger() == 1) {
         final Binary currEntry = valueReader.getEntry();
         final int dataLen = currEntry.length();
@@ -97,7 +102,7 @@ final class VarLenNullableDictionaryReader extends 
VarLenAbstractPageEntryReader
     // We're here either because a) the Parquet metadata is wrong (advertises 
more values than the real count)
     // or the first value being processed ended up to be too long for the 
buffer.
     if (numValues == 0) {
-      return getEntrySingle(valuesToRead);
+      return getEntrySingle();
     }
 
     entry.set(0, tgtPos, numValues, numValues - numNulls);
@@ -105,7 +110,7 @@ final class VarLenNullableDictionaryReader extends 
VarLenAbstractPageEntryReader
     return entry;
   }
 
-  private final VarLenColumnBulkEntry getEntrySingle(int valsToReadWithinPage) 
{
+  private VarLenColumnBulkEntry getEntrySingle() {
     final int[] valueLengths = entry.getValuesLength();
     final ValuesReaderWrapper valueReader = pageInfo.encodedValueReader;
 
@@ -118,6 +123,7 @@ final class VarLenNullableDictionaryReader extends 
VarLenAbstractPageEntryReader
 
       // Is there enough memory to handle this large value?
       if (batchMemoryConstraintsReached(1, 4, dataLen)) {
+        valueReader.pushBack(currEntry);
         entry.set(0, 0, 0, 0); // no data to be consumed
         return entry;
       }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
index e29b14c8b9..eacb858440 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
@@ -45,7 +45,7 @@ final class VarLenNullableEntryReader extends 
VarLenAbstractPageEntryReader {
     if (bulkProcess()) {
       return getEntryBulk(valuesToRead);
     }
-    return getEntrySingle(valuesToRead);
+    return getEntrySingle();
   }
 
   VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
@@ -108,7 +108,7 @@ final class VarLenNullableEntryReader extends 
VarLenAbstractPageEntryReader {
     // We're here either because a) the Parquet metadata is wrong (advertises 
more values than the real count)
     // or the first value being processed ended up to be too long for the 
buffer.
     if (numValues == 0) {
-      return getEntrySingle(valuesToRead);
+      return getEntrySingle();
     }
 
     // Update the page data buffer offset
@@ -126,7 +126,7 @@ final class VarLenNullableEntryReader extends 
VarLenAbstractPageEntryReader {
     return entry;
   }
 
-  VarLenColumnBulkEntry getEntrySingle(int valuesToRead) {
+  VarLenColumnBulkEntry getEntrySingle() {
 
     // Initialize the reader if needed
     pageInfo.definitionLevels.readFirstIntegerIfNeeded();

Reply via email to