This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 051d0595d1 DRILL-8478. HashPartition memory leak when 
OutOfMemoryException is encountered (#2874) (#2875)
051d0595d1 is described below

commit 051d0595d1bccacca815587c66de491c843148d0
Author: shfshihuafeng <[email protected]>
AuthorDate: Tue Jan 23 23:06:49 2024 +0800

    DRILL-8478. HashPartition memory leak when OutOfMemoryException is 
encountered (#2874) (#2875)
---
 .../drill/exec/physical/impl/common/HashPartition.java    | 15 ++++++++++-----
 .../physical/impl/join/AbstractHashBinaryRecordBatch.java |  4 +++-
 2 files changed, 13 insertions(+), 6 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 5c845573ad..0be1ea4faf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -147,6 +147,11 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
 
     try {
       this.hashTable = baseHashTable.createAndSetupHashTable(null);
+      this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
+      tmpBatchesList = new ArrayList<>();
+      if (numPartitions > 1) {
+        allocateNewCurrentBatchAndHV();
+      }
     } catch (ClassTransformationException e) {
       throw UserException.unsupportedError(e)
         .message("Code generation error - likely an error in the code.")
@@ -157,11 +162,11 @@ public class HashPartition implements 
HashJoinMemoryCalculator.PartitionStat {
         .build(logger);
     } catch (SchemaChangeException sce) {
       throw new IllegalStateException("Unexpected Schema Change while creating 
a hash table",sce);
-    }
-    this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
-    tmpBatchesList = new ArrayList<>();
-    if (numPartitions > 1) {
-      allocateNewCurrentBatchAndHV();
+    } catch (OutOfMemoryException oom) {
+      close();
+      throw UserException.memoryError(oom)
+          .message("Failed to allocate hash partition.")
+          .build(logger);
     }
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
index 41217ca208..42dbc1cbfc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
@@ -1312,7 +1312,9 @@ public abstract class AbstractHashBinaryRecordBatch<T 
extends PhysicalOperator>
     }
     // clean (and deallocate) each partition, and delete its spill file
     for (HashPartition partn : partitions) {
-      partn.close();
+      if (partn != null) {
+        partn.close();
+      }
     }
 
     // delete any spill file left in unread spilled partitions

Reply via email to