[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory
[ https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633823#comment-16633823 ] ASF GitHub Bot commented on FLINK-10339: NicoK closed pull request #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool URL: https://github.com/apache/flink/pull/6762 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java index c235999db91..48b9a20e9da 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java @@ -74,6 +74,19 @@ public static MemorySegment allocateUnpooledSegment(int size, Object owner) { return new HybridMemorySegment(new byte[size], owner); } + /** +* Allocates some unpooled off-heap memory and creates a new memory segment that +* represents that memory. +* +* @param size The size of the off-heap memory segment to allocate. +* @param owner The owner to associate with the off-heap memory segment. +* @return A new memory segment, backed by unpooled off-heap memory. +*/ + public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) { + ByteBuffer memory = ByteBuffer.allocateDirect(size); + return wrapPooledOffHeapMemory(memory, owner); + } + /** * Creates a memory segment that wraps the given byte array. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index a369ce5a5fb..1fddb612781 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -30,7 +30,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -89,8 +88,7 @@ public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) { try { for (int i = 0; i < numberOfSegmentsToAllocate; i++) { - ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize); - availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null)); + availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null)); } } catch (OutOfMemoryError err) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java index 2a6a71f05d6..f941e20846e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java @@ -257,7 +257,8 @@ public String toString() { synchronized (buffers) { for (int i = 0; i < numberOfBuffers; i++) { - buffers.add(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this)); + buffers.add(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledOffHeapMemory( + memorySegmentSize, null), this)); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java index c450880f98b..d1a304a4909 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java @@ -675,8 +675,7 @@ void clear() { @Override MemorySegment allocateNewSegment(Object owner) { - ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize); - return MemorySegmentFactory.wrapP
[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory
[ https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16629674#comment-16629674 ] ASF GitHub Bot commented on FLINK-10339: zhijiangW commented on issue #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool URL: https://github.com/apache/flink/pull/6762#issuecomment-424935982 Thanks for your reviews! @StephanEwen That is a good idea for adding `allocateUnpooledOffHeapMemory` in the factory, so it can also make easy for using in `MemoryManager` and `NetworkBufferPool`. I already update the codes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > SpillReadBufferPool cannot use off-heap memory > -- > > Key: FLINK-10339 > URL: https://issues.apache.org/jira/browse/FLINK-10339 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > > Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce > memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during > transporting on sender side. > > But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses > heap memory for caching. We can make it as off-heap by default similar with > {{NetworkBufferPool}} or decide the type by the current parameter > {{taskmanager.memory.off-heap.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory
[ https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628986#comment-16628986 ] ASF GitHub Bot commented on FLINK-10339: StephanEwen commented on issue #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool URL: https://github.com/apache/flink/pull/6762#issuecomment-424772751 - [X] Consensus that the contribution should go into to Flink - +1 from my side, change makes sense - [X] Does not need specific attention - [X] Contribution description - [X] Architectural approach - [X] Overall code quality Optional minor change, but no blockers This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > SpillReadBufferPool cannot use off-heap memory > -- > > Key: FLINK-10339 > URL: https://issues.apache.org/jira/browse/FLINK-10339 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > > Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce > memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during > transporting on sender side. > > But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses > heap memory for caching. We can make it as off-heap by default similar with > {{NetworkBufferPool}} or decide the type by the current parameter > {{taskmanager.memory.off-heap.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory
[ https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628982#comment-16628982 ] ASF GitHub Bot commented on FLINK-10339: StephanEwen commented on issue #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool URL: https://github.com/apache/flink/pull/6762#issuecomment-424771119 How about adding a method `allocateUnpooledOffHeapMemory` to the factory? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > SpillReadBufferPool cannot use off-heap memory > -- > > Key: FLINK-10339 > URL: https://issues.apache.org/jira/browse/FLINK-10339 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > > Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce > memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during > transporting on sender side. > > But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses > heap memory for caching. We can make it as off-heap by default similar with > {{NetworkBufferPool}} or decide the type by the current parameter > {{taskmanager.memory.off-heap.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory
[ https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628295#comment-16628295 ] ASF GitHub Bot commented on FLINK-10339: zhijiangW opened a new pull request #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool URL: https://github.com/apache/flink/pull/6762 ## What is the purpose of the change Currently the `NetworkBufferPool` already uses off-heap memory directly. But the `BufferPool` for `SpilledSubpartitionView` still uses heap memory by default. It should keep the same behavior with `NetworkBufferPool` and it may get benefit for reusing this off-heap memory in netty stack during transporting. ## Brief change log - *Use off-heap memory directly for `SpillReadBufferPool` in `SpilledSubpartitionView`.* ## Verifying this change This change is already covered by existing tests, such as *SpillableSubpartitionTest*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > SpillReadBufferPool cannot use off-heap memory > -- > > Key: FLINK-10339 > URL: https://issues.apache.org/jira/browse/FLINK-10339 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > > Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce > memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during > transporting on sender side. > > But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses > heap memory for caching. We can make it as off-heap by default similar with > {{NetworkBufferPool}} or decide the type by the current parameter > {{taskmanager.memory.off-heap.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory
[ https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623464#comment-16623464 ] Piotr Nowojski commented on FLINK-10339: I'm not aware about any concerns, but I'm also not very familiar with this aspects of network stack. [~NicoK] should be more familiar with this. > SpillReadBufferPool cannot use off-heap memory > -- > > Key: FLINK-10339 > URL: https://issues.apache.org/jira/browse/FLINK-10339 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce > memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during > transporting on sender side. > > But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses > heap memory for caching. We can make it as off-heap by default similar with > {{NetworkBufferPool}} or decide the type by the current parameter > {{taskmanager.memory.off-heap.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory
[ https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623153#comment-16623153 ] zhijiang commented on FLINK-10339: -- Hey [~NicoK] [~pnowojski], do you think we should make {{SpillReadBufferPool}} use direct memory as current {{NetworkBufferPool}} or are there any concerns to change this? > SpillReadBufferPool cannot use off-heap memory > -- > > Key: FLINK-10339 > URL: https://issues.apache.org/jira/browse/FLINK-10339 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce > memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during > transporting on sender side. > > But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses > heap memory for caching. We can make it as off-heap by default similar with > {{NetworkBufferPool}} or decide the type by the current parameter > {{taskmanager.memory.off-heap.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)