[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633823#comment-16633823
 ] 

ASF GitHub Bot commented on FLINK-10339:


NicoK closed pull request #6762: [FLINK-10339][network] Use off-heap memory for 
SpillReadBufferPool
URL: https://github.com/apache/flink/pull/6762
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index c235999db91..48b9a20e9da 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -74,6 +74,19 @@ public static MemorySegment allocateUnpooledSegment(int 
size, Object owner) {
return new HybridMemorySegment(new byte[size], owner);
}
 
+   /**
+* Allocates some unpooled off-heap memory and creates a new memory 
segment that
+* represents that memory.
+*
+* @param size The size of the off-heap memory segment to allocate.
+* @param owner The owner to associate with the off-heap memory segment.
+* @return A new memory segment, backed by unpooled off-heap memory.
+*/
+   public static MemorySegment allocateUnpooledOffHeapMemory(int size, 
Object owner) {
+   ByteBuffer memory = ByteBuffer.allocateDirect(size);
+   return wrapPooledOffHeapMemory(memory, owner);
+   }
+
/**
 * Creates a memory segment that wraps the given byte array.
 *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index a369ce5a5fb..1fddb612781 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -30,7 +30,6 @@
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -89,8 +88,7 @@ public NetworkBufferPool(int numberOfSegmentsToAllocate, int 
segmentSize) {
 
try {
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
-   ByteBuffer memory = 
ByteBuffer.allocateDirect(segmentSize);
-   
availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory,
 null));
+   
availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize,
 null));
}
}
catch (OutOfMemoryError err) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index 2a6a71f05d6..f941e20846e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -257,7 +257,8 @@ public String toString() {
 
synchronized (buffers) {
for (int i = 0; i < numberOfBuffers; i++) {
-   buffers.add(new 
NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), 
this));
+   buffers.add(new 
NetworkBuffer(MemorySegmentFactory.allocateUnpooledOffHeapMemory(
+   memorySegmentSize, null), 
this));
}
}
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index c450880f98b..d1a304a4909 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -675,8 +675,7 @@ void clear() {
 
@Override
MemorySegment allocateNewSegment(Object owner) {
-   ByteBuffer memory = 
ByteBuffer.allocateDirect(segmentSize);
-   return 
MemorySegmentFactory.wrapP

[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16629674#comment-16629674
 ] 

ASF GitHub Bot commented on FLINK-10339:


zhijiangW commented on issue #6762: [FLINK-10339][network] Use off-heap memory 
for SpillReadBufferPool
URL: https://github.com/apache/flink/pull/6762#issuecomment-424935982
 
 
   Thanks for your reviews! @StephanEwen
   
   That is a good idea for adding `allocateUnpooledOffHeapMemory` in the 
factory, so it can also make easy for using in `MemoryManager` and 
`NetworkBufferPool`. I already update the codes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SpillReadBufferPool cannot use off-heap memory
> --
>
> Key: FLINK-10339
> URL: https://issues.apache.org/jira/browse/FLINK-10339
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce 
> memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during 
> transporting on sender side.
>  
> But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses 
> heap memory for caching. We can make it as off-heap by default similar with 
> {{NetworkBufferPool}} or decide the type by the current parameter 
> {{taskmanager.memory.off-heap.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628986#comment-16628986
 ] 

ASF GitHub Bot commented on FLINK-10339:


StephanEwen commented on issue #6762: [FLINK-10339][network] Use off-heap 
memory for SpillReadBufferPool
URL: https://github.com/apache/flink/pull/6762#issuecomment-424772751
 
 
 - [X] Consensus that the contribution should go into to Flink
   - +1 from my side, change makes sense
 - [X] Does not need specific attention
 - [X] Contribution description
 - [X] Architectural approach
 - [X] Overall code quality
   
   Optional minor change, but no blockers


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SpillReadBufferPool cannot use off-heap memory
> --
>
> Key: FLINK-10339
> URL: https://issues.apache.org/jira/browse/FLINK-10339
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce 
> memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during 
> transporting on sender side.
>  
> But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses 
> heap memory for caching. We can make it as off-heap by default similar with 
> {{NetworkBufferPool}} or decide the type by the current parameter 
> {{taskmanager.memory.off-heap.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628982#comment-16628982
 ] 

ASF GitHub Bot commented on FLINK-10339:


StephanEwen commented on issue #6762: [FLINK-10339][network] Use off-heap 
memory for SpillReadBufferPool
URL: https://github.com/apache/flink/pull/6762#issuecomment-424771119
 
 
   How about adding a method `allocateUnpooledOffHeapMemory` to the factory?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SpillReadBufferPool cannot use off-heap memory
> --
>
> Key: FLINK-10339
> URL: https://issues.apache.org/jira/browse/FLINK-10339
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce 
> memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during 
> transporting on sender side.
>  
> But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses 
> heap memory for caching. We can make it as off-heap by default similar with 
> {{NetworkBufferPool}} or decide the type by the current parameter 
> {{taskmanager.memory.off-heap.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628295#comment-16628295
 ] 

ASF GitHub Bot commented on FLINK-10339:


zhijiangW opened a new pull request #6762: [FLINK-10339][network] Use off-heap 
memory for SpillReadBufferPool
URL: https://github.com/apache/flink/pull/6762
 
 
   ## What is the purpose of the change
   
   Currently the `NetworkBufferPool` already uses off-heap memory directly. But 
the `BufferPool` for `SpilledSubpartitionView` still uses heap memory by 
default. It should keep the same behavior with `NetworkBufferPool` and it may 
get benefit for reusing this off-heap memory in netty stack during transporting.
   
   ## Brief change log
   
 - *Use off-heap memory directly for `SpillReadBufferPool` in 
`SpilledSubpartitionView`.*
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*SpillableSubpartitionTest*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (yes)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SpillReadBufferPool cannot use off-heap memory
> --
>
> Key: FLINK-10339
> URL: https://issues.apache.org/jira/browse/FLINK-10339
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce 
> memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during 
> transporting on sender side.
>  
> But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses 
> heap memory for caching. We can make it as off-heap by default similar with 
> {{NetworkBufferPool}} or decide the type by the current parameter 
> {{taskmanager.memory.off-heap.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-09-21 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623464#comment-16623464
 ] 

Piotr Nowojski commented on FLINK-10339:


I'm not aware about any concerns, but I'm also not very familiar with this 
aspects of network stack. [~NicoK] should be more familiar with this.

> SpillReadBufferPool cannot use off-heap memory
> --
>
> Key: FLINK-10339
> URL: https://issues.apache.org/jira/browse/FLINK-10339
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce 
> memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during 
> transporting on sender side.
>  
> But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses 
> heap memory for caching. We can make it as off-heap by default similar with 
> {{NetworkBufferPool}} or decide the type by the current parameter 
> {{taskmanager.memory.off-heap.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-09-20 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623153#comment-16623153
 ] 

zhijiang commented on FLINK-10339:
--

Hey [~NicoK]  [~pnowojski], do you think we should make {{SpillReadBufferPool}} 
use direct memory as current {{NetworkBufferPool}} or are there any concerns to 
change this?

 

> SpillReadBufferPool cannot use off-heap memory
> --
>
> Key: FLINK-10339
> URL: https://issues.apache.org/jira/browse/FLINK-10339
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce 
> memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during 
> transporting on sender side.
>  
> But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses 
> heap memory for caching. We can make it as off-heap by default similar with 
> {{NetworkBufferPool}} or decide the type by the current parameter 
> {{taskmanager.memory.off-heap.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)