This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c401c61f1 DRILL-8416: Memory leak when the async Parquet reader skips 
empty pages (#2784)
9c401c61f1 is described below

commit 9c401c61f11d84514dab0f2e1397aa3959d82249
Author: James Turton <[email protected]>
AuthorDate: Tue Apr 4 14:15:21 2023 +0200

    DRILL-8416: Memory leak when the async Parquet reader skips empty pages 
(#2784)
---
 .../parquet/columnreaders/AsyncPageReader.java     |   5 ++++-
 .../drill/exec/store/parquet/TestEmptyParquet.java |   5 ++---
 .../store/parquet2/TestDrillParquetReader.java     |  23 +++++++++++++++++++++
 .../resources/parquet/empty_dict_pages.parquet     | Bin 0 -> 2896 bytes
 4 files changed, 29 insertions(+), 4 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index d882504989..a38c34dc5e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -21,6 +21,7 @@ import static org.apache.parquet.column.Encoding.valueOf;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
@@ -70,7 +71,7 @@ import org.slf4j.LoggerFactory;
  * invariant here is that there is space for at least one more page in the 
queue before the Future read task
  * is submitted to the pool). This sequence is important. Not doing so can 
lead to deadlocks - producer
  * threads may block on putting data into the queue which is full while the 
consumer threads might be
- * blocked trying to read from a queue that has no data.
+ * blocked trying to read from a queue that has no /data.
  * The first request to the page reader can be either to load a dictionary 
page or a data page; this leads
  * to the rather odd looking code in the constructor since the parent 
PageReader calls
  * loadDictionaryIfExists in the constructor.
@@ -305,6 +306,7 @@ class AsyncPageReader extends PageReader {
           pageHeader.compressed_page_size
         );
         skip(pageHeader.compressed_page_size);
+        Optional.ofNullable(readStatus.getPageData()).map(DrillBuf::release);
         return;
       }
 
@@ -325,6 +327,7 @@ class AsyncPageReader extends PageReader {
         default:
           logger.warn("skipping page of type {} of size {}", 
pageHeader.getType(), pageHeader.compressed_page_size);
           skip(pageHeader.compressed_page_size);
+          Optional.ofNullable(readStatus.getPageData()).map(DrillBuf::release);
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
index 780aa6e646..80602ab251 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
@@ -416,10 +416,9 @@ public class TestEmptyParquet extends ClusterTest {
   }
 
   /**
-   * Test a Parquet file containing a zero-byte dictionary page, c.f.
-   * DRILL-8023.
+   * Test a Parquet file containing a zero-byte dictionary page.
    */
-  @Test
+  @Test // DRILL-8023
   public void testEmptyDictPage() throws Exception {
     try {
       client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false);
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
index b5c192483b..a4c518a147 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
@@ -1358,4 +1358,27 @@ public class TestDrillParquetReader extends 
BaseTestQuery {
     testRunAndPrint(UserBitShared.QueryType.SQL, "select * from 
cp.`parquet2/allTypes.parquet`");
   }
 
+  @Test // DRILL-8416
+  public void testEmptyDictPages() throws Exception {
+    String query = "select " +
+        "`name`, `type`, `begin`, `end` " +
+        "from cp.`parquet/empty_dict_pages.parquet` t";
+    String[] columns = {"`name`", "`type`", "`begin`", "`end`"};
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns(columns)
+        .baselineValues("TP_001", "TP", null, null)
+        .baselineValues("TP_002", "TP", null, null)
+        .baselineValues("TP_003", "TP", null, null)
+        .baselineValues("TP_004", "TP", null, null)
+        .baselineValues("TP_005", "TP", null, null)
+        .baselineValues("TP_006", "TP", null, null)
+        .baselineValues("TP_007", "TP", null, null)
+        .baselineValues("TP_008", "TP", null, null)
+        .baselineValues("TP_009", "TP", null, null)
+        .baselineValues("TP_010", "TP", null, null)
+        .build()
+        .run();
+  }
 }
diff --git a/exec/java-exec/src/test/resources/parquet/empty_dict_pages.parquet 
b/exec/java-exec/src/test/resources/parquet/empty_dict_pages.parquet
new file mode 100644
index 0000000000..9a94f9aa08
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/empty_dict_pages.parquet differ

Reply via email to