This is an automated email from the ASF dual-hosted git repository. dzamo pushed a commit to branch 1.21 in repository https://gitbox.apache.org/repos/asf/drill.git
commit 9a2e1098315980efa6a67337a3f545080c5c7a5e Author: James Turton <[email protected]> AuthorDate: Tue Apr 4 14:15:21 2023 +0200 DRILL-8416: Memory leak when the async Parquet reader skips empty pages (#2784) --- .../parquet/columnreaders/AsyncPageReader.java | 5 ++++- .../drill/exec/store/parquet/TestEmptyParquet.java | 5 ++--- .../store/parquet2/TestDrillParquetReader.java | 23 +++++++++++++++++++++ .../resources/parquet/empty_dict_pages.parquet | Bin 0 -> 2896 bytes 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java index d882504989..a38c34dc5e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java @@ -21,6 +21,7 @@ import static org.apache.parquet.column.Encoding.valueOf; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; @@ -70,7 +71,7 @@ import org.slf4j.LoggerFactory; * invariant here is that there is space for at least one more page in the queue before the Future read task * is submitted to the pool). This sequence is important. Not doing so can lead to deadlocks - producer * threads may block on putting data into the queue which is full while the consumer threads might be - * blocked trying to read from a queue that has no data. + * blocked trying to read from a queue that has no /data. * The first request to the page reader can be either to load a dictionary page or a data page; this leads * to the rather odd looking code in the constructor since the parent PageReader calls * loadDictionaryIfExists in the constructor. @@ -305,6 +306,7 @@ class AsyncPageReader extends PageReader { pageHeader.compressed_page_size ); skip(pageHeader.compressed_page_size); + Optional.ofNullable(readStatus.getPageData()).map(DrillBuf::release); return; } @@ -325,6 +327,7 @@ class AsyncPageReader extends PageReader { default: logger.warn("skipping page of type {} of size {}", pageHeader.getType(), pageHeader.compressed_page_size); skip(pageHeader.compressed_page_size); + Optional.ofNullable(readStatus.getPageData()).map(DrillBuf::release); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java index 780aa6e646..80602ab251 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java @@ -416,10 +416,9 @@ public class TestEmptyParquet extends ClusterTest { } /** - * Test a Parquet file containing a zero-byte dictionary page, c.f. - * DRILL-8023. + * Test a Parquet file containing a zero-byte dictionary page. */ - @Test + @Test // DRILL-8023 public void testEmptyDictPage() throws Exception { try { client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java index b5c192483b..a4c518a147 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java @@ -1358,4 +1358,27 @@ public class TestDrillParquetReader extends BaseTestQuery { testRunAndPrint(UserBitShared.QueryType.SQL, "select * from cp.`parquet2/allTypes.parquet`"); } + @Test // DRILL-8416 + public void testEmptyDictPages() throws Exception { + String query = "select " + + "`name`, `type`, `begin`, `end` " + + "from cp.`parquet/empty_dict_pages.parquet` t"; + String[] columns = {"`name`", "`type`", "`begin`", "`end`"}; + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns(columns) + .baselineValues("TP_001", "TP", null, null) + .baselineValues("TP_002", "TP", null, null) + .baselineValues("TP_003", "TP", null, null) + .baselineValues("TP_004", "TP", null, null) + .baselineValues("TP_005", "TP", null, null) + .baselineValues("TP_006", "TP", null, null) + .baselineValues("TP_007", "TP", null, null) + .baselineValues("TP_008", "TP", null, null) + .baselineValues("TP_009", "TP", null, null) + .baselineValues("TP_010", "TP", null, null) + .build() + .run(); + } } diff --git a/exec/java-exec/src/test/resources/parquet/empty_dict_pages.parquet b/exec/java-exec/src/test/resources/parquet/empty_dict_pages.parquet new file mode 100644 index 0000000000..9a94f9aa08 Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/empty_dict_pages.parquet differ
