This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 051d0595d1 DRILL-8478. HashPartition memory leak when
OutOfMemoryException is encountered (#2874) (#2875)
051d0595d1 is described below
commit 051d0595d1bccacca815587c66de491c843148d0
Author: shfshihuafeng <[email protected]>
AuthorDate: Tue Jan 23 23:06:49 2024 +0800
DRILL-8478. HashPartition memory leak when OutOfMemoryException is
encountered (#2874) (#2875)
---
.../drill/exec/physical/impl/common/HashPartition.java | 15 ++++++++++-----
.../physical/impl/join/AbstractHashBinaryRecordBatch.java | 4 +++-
2 files changed, 13 insertions(+), 6 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 5c845573ad..0be1ea4faf 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -147,6 +147,11 @@ public class HashPartition implements
HashJoinMemoryCalculator.PartitionStat {
try {
this.hashTable = baseHashTable.createAndSetupHashTable(null);
+ this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
+ tmpBatchesList = new ArrayList<>();
+ if (numPartitions > 1) {
+ allocateNewCurrentBatchAndHV();
+ }
} catch (ClassTransformationException e) {
throw UserException.unsupportedError(e)
.message("Code generation error - likely an error in the code.")
@@ -157,11 +162,11 @@ public class HashPartition implements
HashJoinMemoryCalculator.PartitionStat {
.build(logger);
} catch (SchemaChangeException sce) {
throw new IllegalStateException("Unexpected Schema Change while creating
a hash table",sce);
- }
- this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
- tmpBatchesList = new ArrayList<>();
- if (numPartitions > 1) {
- allocateNewCurrentBatchAndHV();
+ } catch (OutOfMemoryException oom) {
+ close();
+ throw UserException.memoryError(oom)
+ .message("Failed to allocate hash partition.")
+ .build(logger);
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
index 41217ca208..42dbc1cbfc 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
@@ -1312,7 +1312,9 @@ public abstract class AbstractHashBinaryRecordBatch<T
extends PhysicalOperator>
}
// clean (and deallocate) each partition, and delete its spill file
for (HashPartition partn : partitions) {
- partn.close();
+ if (partn != null) {
+ partn.close();
+ }
}
// delete any spill file left in unread spilled partitions