Caizhi Weng created FLINK-18668:
-----------------------------------
Summary: BytesHashMap#growAndRehash should release newly allocated
segments before throwing the exception
Key: FLINK-18668
URL: https://issues.apache.org/jira/browse/FLINK-18668
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.11.0
Reporter: Caizhi Weng
In {{BytesHashMap#growAndRehash}} we have the following code.
{code:java}
List<MemorySegment> newBucketSegments = new ArrayList<>(required);
try {
int numAllocatedSegments = required - memoryPool.freePages();
if (numAllocatedSegments > 0) {
throw new MemoryAllocationException();
}
int needNumFromFreeSegments = required - newBucketSegments.size();
for (int end = needNumFromFreeSegments; end > 0; end--) {
newBucketSegments.add(memoryPool.nextSegment());
}
setBucketVariables(newBucketSegments);
} catch (MemoryAllocationException e) {
LOG.warn("BytesHashMap can't allocate {} pages, and now used {} pages",
required, reservedNumBuffers);
throw new EOFException();
}
{code}
Newly allocated memory segments are temporarily stored in {{newBucketSegments}}
before giving to the hash table. But if a {{MemoryAllocationException}}
happens, these segments are not returned to the memory pool, causing the
following exception stack trace.
{code}
java.lang.RuntimeException:
org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate
512 pages
at
org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:84)
~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.growAndRehash(BytesHashMap.java:393)
~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.append(BytesHashMap.java:313)
~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at HashAggregateWithKeys$360.processElement(Unknown Source) ~[?:?]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:560)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
Suppressed: java.lang.RuntimeException: Should return all used memory
before clean, page used: 2814
at
org.apache.flink.table.runtime.util.LazyMemorySegmentPool.close(LazyMemorySegmentPool.java:99)
~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.free(BytesHashMap.java:486)
~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.free(BytesHashMap.java:475)
~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at HashAggregateWithKeys$360.close(Unknown Source) ~[?:?]
at
org.apache.flink.table.runtime.operators.TableStreamOperator.dispose(TableStreamOperator.java:44)
~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:707)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:687)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:626)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:542)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not
allocate 512 pages
at
org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:229)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:82)
~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
... 15 more
Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could
not allocate 16777216 bytes, only 0 bytes are remaining
at
org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:159)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:85)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:227)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:82)
~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
... 15 more
{code}
We should first return these segments to the memory pool before throwing the
exception.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)