[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-10-10 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4485
  
merging.


---


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-09-21 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I missed that message of `verifyAllBuffersReturned()` issue before.
I have submitted the modifications of it. :)


---


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-09-21 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , sorry for the typos. I have submitted the updates.


---


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-09-15 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I have submitted the updates.


---


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-09-15 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , thank you for always helpful reviews!

I am very busy improving the runtime for our singles day these days. I will 
submit the updates for this PR later today. And I also plan to update the next 
PR which is based on this PR during weekends.


---


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-09-05 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I have submitted the updates based on last comments.

Your concerns of interaction between requesting memory segments and 
creating buffer pool is really necessary. I also noticed that after reviewed 
the process.  



---


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-08-30 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , thank you for so detail and helpful comments!

I will be on team outing tomorrow and come back on Sunday. I would consider 
you concerns carefully and may submit the updates next week.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-08-29 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I have submitted the updates based on the above comments. :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-08-28 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
Yes, this way also has some advantages, and recycling these exclusive 
buffers would be covered in next PR with some additional tests. 

I will consider your suggestions to supplement some tests in this PR and 
submit the modifications based on all the above comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-08-28 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4485
  
Hi @zhijiangW,
regarding the buffer pool implementation, I was just curious about why it 
was done that way. But it is fine to keep the logic in `RemoteInputChannel` if 
you make sure, that a recycler puts these buffers right (back) into the buffer 
queue (I guess, that's in one of the follow-up PRs). This way, we avoid an 
additional intermediate component (and the need to interact with it). To 
conclude, on a second thought, it is fine as it is.

The thing with `ResultPartitionType` is that without an (intermediate) way 
to set `isCreditBased` to `true`, we are not really able to test this code path 
on higher levels such as the `NetworkEnvironment` (or maybe I'll see that in 
the follow-up PRs as well).

Speaking of tests...I understand that with the switch to credit-based flow 
control, some parts will be covered by existing tests, but we also change the 
behaviour at some points and the current tests are already a bit sparse. Can 
you also add tests for
- the `NetworkEnvironment` changes (into `NetworkEnvironmentTest`),
- `NetworkBufferPool#requestMemorySegments`, 
`NetworkBufferPool#recycleMemorySegments` (into `NetworkBufferPoolTest` which 
currently is a bit sparse though)
- the changes in `SingleInputGate` (into `SingleInputGateTest`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-08-28 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
For `ResultPartitionType` comment, I expand to add the 'isCreditBased' 
field temporarily in order not to affect the current process. My initial idea 
is to remove this field after the whole feature is verified to enable. If you 
approve this as a formal way, I will add the new mode 
`PIPELINE_CREDIT_BASED(true, true, true, true)` and javadoc for it.

For `gate.setBufferPool(bufferPool)`, it is my carelessness to not add it.

For releasing segments when exception, it is actually better to process 
inside the method of `NetworkBufferPool`.

I will submit the modifications later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-08-28 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , thank you for giving this discussion and comments!

Actually I proposed the same way of using fixed-size `LocalBufferPool` for 
managing exclusive buffers for per `RemoteInputChannel` with stephan before 
implementation. I attached the dialogue below:

![dingtalk20170828140949](https://user-images.githubusercontent.com/12387855/29761347-767c7c84-8bfb-11e7-975b-706265766803.png)

Maybe I did not catch stephan's meaning from the above dialogue and took 
the current way to implement.  I also agree with the way you mentioned and the 
fixed-size buffer pool for `RemoteInputChannel` can be submitted in an separate 
PR.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-08-25 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4485
  




Reviewed 9 of 13 files at r2.
Review status: all files reviewed at latest revision, 7 unresolved 
discussions, some commit checks failed.

---

*[a 
discussion](https://reviewable.io:443/reviews/apache/flink/4485#-KsOs0jqeqTsAUTwWuFa:-KsOs0jqeqTsAUTwWuFb:b-kg45p7)
 (no related file):*
Depending on how you build on this in the other PRs, what do you think 
about using a fixed-size `LocalBufferPool` (or a customized sub-class) per 
`RemoteInputChannel` instead? This would solve potential issues with recycling 
and would also be a lot less code. Additionally, you will gain the buffer 
availability listener feature so that you will be notified when the buffer is 
released (which may be deep inside other code with no access to the 
`RemoteInputChannel` anymore.

FYI: This change of commits in the PR actually would qualify for a separate 
PR

---


*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java,
 line 216 at 
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOlN5wNvmcco4z2tGj:-KsOlN5xcf-lW0z1FpJo:b-ppkkjd)
 ([raw 
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java#L216)):*
> ```Java
>   if 
(gate.getConsumedPartitionType().isCreditBased()) {
>   // Create a fix size buffer 
pool for floating buffers and assign exclusive buffers to input channels 
directly
>   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
> ```

we still need to call `gate.setBufferPool(bufferPool)` in order for the 
gate to be aware (this call is common to both paths of the `if`)

---


*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java,
 line 164 at 
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOrPb1_aKuUAa_uyC6:-KsOrPb1_aKuUAa_uyC7:b3045fp)
 ([raw 
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java#L164)):*
> ```Java
>   }
> 
>   redistributeBuffers();
> ```

now here, you may need to add the try-catch releasing any already added 
segments back (see my comments in `SingleInputGate`

---


*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java,
 line 38 at 
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOhXwT2FF306uRbf5l:-KsOhXwU2y_hAFTh2tTM:b-mb3jxr)
 ([raw 
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java#L38)):*
> ```Java
>* no checkpoint barriers.
>*/
>   PIPELINED_BOUNDED(true, true, true, false);
> ```

Does it make sense, to already add an `PIPELINE_CREDIT_BASED(true, true, 
true, true)`? I guess, credit-based can be considered bounded as well

---


*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java,
 line 82 at 
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOi2-hM5IFzcjrGzin:-KsOi2-iXW_-MtjTxono:b-3woyzq)
 ([raw 
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java#L82)):*
> ```Java
>   return isBounded;
>   }
> ```

please add a (simple) javadoc similar to the `isBounded()`method

---


*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java,
 line 315 at 
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOoBbKr_kVVDxNIZhm:-KsOoBbKr_kVVDxNIZhn:b85dio4)
 ([raw 
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L315)):*
> ```Java
>   return segments;
>   } catch (Throwable t) {
>   if (segments != null && segments.size() > 0) {
> ```

Unfortunately, the cleanup will not work as documented - if 
`networkBufferPool.requestMemorySegments(networkBuffersPerChannel);` throws an 
exception, `segments` will be `null`. In order to handle all cases, e.g. 
successfully requested some and afterwards an exception was thrown, you need to 
handle this inside `NetworkBufferPool#requestMemorySegments()`.

I 

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-08-13 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I have submitted the updates:

- Create the fix size `LocalBufferPool` for floating buffers
- Assign the exclusive buffers for `InputChannel` directly
- The proposed `BufferPoolListener`  will be included in next PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---