[jira] [Comment Edited] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-08 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880465#comment-16880465
 ] 

zhijiang edited comment on FLINK-13100 at 7/8/19 3:28 PM:
--

Yes, the previous SpilledSubpartitionView really has the deadlock issue based 
on two buffers. And now we have the similar issue as before, but throw the 
IOException instead.

Your understanding of above two parts is right. But there exists another case:
 * When the view is created, it would be enqueued into the netty thread loop at 
first time, because it has both data and credit now as you mentioned.
 * Then a buffer is fetched from view and writeAndFlush() is called. If it is 
still available in the queue, that means it must has credit because next buffer 
is always available for reading ahead. For this case it has no problem, because 
we only wait the previous writeAndFlush() done to trigger the next one if 
current queue is not empty.
 * But if it is not available (no credit) when writeAndFlush() is called, then 
the queue is empty in netty thread loop. Before the future of writeAndFlush() 
is done, the netty thread could still process the AddCredit message which would 
make the view become available again, then it would be added into queue to 
trigger the next writeAndFlush(). That means the previous buffer is not 
recycled but the next write is also triggered to cause the problem. The process 
might be like this : first writeAndFlush() pending -> addCredit(trigger second 
writeAndFlush) -> finish the first writeAndFlush() to recycle buffer.
 * I think it might be caused by the improvement of reusing flink buffer in 
netty stack from release-1.5. We could break writeAndFlush()  into write and 
flush two processes. In the before when the write process finishes, the flink 
buffer is copied into netty internal ByteBuffer to be recycled then, so it 
would not cause problem even though the second writeAndFlush is triggered 
before first pending done. But now the write process would still reference the 
flink buffer in netty stack until the flush is done.

I would try to mock this process in relevant unit tests to verify and I might 
submit this test tomorrow for understanding it easily.


was (Author: zjwang):
Yes, the previous SpilledSubpartitionView really has the deadlock issue based 
on two buffers. And now we have the similar issue as before, but throw the 
IOException instead.

Your understanding of above two parts is right. But there exists another case:
 * When the view is created, it would be enqueued into the netty thread loop at 
first time, because it has both data and credit now as you mentioned.
 * Then a buffer is fetched from view and writeAndFlush() is called. If it is 
still available in the queue, that means it must has credit because next buffer 
is always available for reading ahead. For this case it has no problem, because 
we only wait the previous writeAndFlush() done to trigger the next one.
 * But if it is not available (no credit) when writeAndFlush() is called, then 
the queue is empty in netty thread loop. Before the future of writeAndFlush() 
is done, the netty thread could still process the AddCredit message which would 
make the view become available again, then it would be added into queue to 
trigger the next writeAndFlush(). That means the previous buffer is not 
recycled but the next write is also triggered to cause the problem. The process 
might be like this : first writeAndFlush() pending -> addCredit(trigger second 
writeAndFlush) -> finish the first writeAndFlush() to recycle buffer.
 * I think it might be caused by the improvement of reusing flink buffer in 
netty stack from release-1.5. We could break writeAndFlush()  into write and 
flush two processes. In the before when the write process finishes, the flink 
buffer is copied into netty internal ByteBuffer to be recycled then, so it 
would not cause problem even though the second writeAndFlush is triggered 
before first pending done. But now the write process would still reference the 
flink buffer in netty stack until the flush is done.

I would try to mock this process in relevant unit tests to verify and I might 
submit this test tomorrow for understanding it easily.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would t

[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-08 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880465#comment-16880465
 ] 

zhijiang commented on FLINK-13100:
--

Yes, the previous SpilledSubpartitionView really has the deadlock issue based 
on two buffers. And now we have the similar issue as before, but throw the 
IOException instead.

Your understanding of above two parts is right. But there exists another case:
 * When the view is created, it would be enqueued into the netty thread loop at 
first time, because it has both data and credit now as you mentioned.
 * Then a buffer is fetched from view and writeAndFlush() is called. If it is 
still available in the queue, that means it must has credit because next buffer 
is always available for reading ahead. For this case it has no problem, because 
we only wait the previous writeAndFlush() done to trigger the next one.
 * But if it is not available (no credit) when writeAndFlush() is called, then 
the queue is empty in netty thread loop. Before the future of writeAndFlush() 
is done, the netty thread could still process the AddCredit message which would 
make the view become available again, then it would be added into queue to 
trigger the next writeAndFlush(). That means the previous buffer is not 
recycled but the next write is also triggered to cause the problem. The process 
might be like this : first writeAndFlush() pending -> addCredit(trigger second 
writeAndFlush) -> finish the first writeAndFlush() to recycle buffer.
 * I think it might be caused by the improvement of reusing flink buffer in 
netty stack from release-1.5. We could break writeAndFlush()  into write and 
flush two processes. In the before when the write process finishes, the flink 
buffer is copied into netty internal ByteBuffer to be recycled then, so it 
would not cause problem even though the second writeAndFlush is triggered 
before first pending done. But now the write process would still reference the 
flink buffer in netty stack until the flush is done.

I would try to mock this process in relevant unit tests to verify and I might 
submit this test tomorrow for understanding it easily.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Fix the calculation of backlog in PipelinedSubpartition

2019-07-08 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880427#comment-16880427
 ] 

zhijiang commented on FLINK-11082:
--

[~pnowojski], if the requested floating buffers are not used in idle channels, 
then they would be returned back to local buffer pool eventually with all the 
exclusive buffers recycled. E.g.
 * If the current backlog is 2 and only 1 exclusive buffer is available in 
remote channel, then it would request (initial credit + backlog - 
numAvailableBuffers) 3 floating buffers.
 * If the backlog is 1 and total 3 buffers available (all floating buffers) in 
remote channel.
 * If the backlog is 0, we would return 1 floating buffer back to local buffer 
pool, keep 2 available buffers(both floating) in remote channel.
 * If the backlog is 0, and one exclusive buffer is processed to recycle to 
remote channel, then it triggers returning one floating buffer back to local 
pool, still two buffers available in remote channel(one floating and one 
exclusive).
 * If the backlog is 0, and another exclusive buffer is processed to recycle to 
remote channel, then it also triggers returning one floating buffer back to 
local pool, still two buffers available in remote channel(both are exclusive).

So the floating buffers would not be occupied by remote channel long time if 
there are already (backlog + initial credit) available buffers in remote 
channel.

> Fix the calculation of backlog in PipelinedSubpartition
> ---
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.5.6, 1.7.1
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The backlog of subpartition should indicate how many buffers are consumable, 
> then the consumer could feedback the corresponding credits for transporting 
> these buffers. But in current PipelinedSubpartitionimplementation, the 
> backlog is increased by 1 when a BufferConsumer is added into 
> PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed 
> from PipelinedSubpartition. So the backlog only reflects how many buffers are 
> retained in PipelinedSubpartition, which is not always equivalent to the 
> number of consumable buffers.
> The backlog inconsistency might result in floating buffers misdistribution on 
> consumer side, because the consumer would request floating buffers based on 
> backlog value, then one floating buffer might not be used in 
> RemoteInputChannel long time after requesting.
> Considering the solution, the last buffer in PipelinedSubpartition could only 
> be consumable in the case of flush triggered or partition finished. So we 
> could calculate the backlog precisely based on partition flushed/finished 
> conditions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13141) Remove getBufferSize method from BufferPoolFactory

2019-07-08 Thread zhijiang (JIRA)
zhijiang created FLINK-13141:


 Summary: Remove getBufferSize method from BufferPoolFactory
 Key: FLINK-13141
 URL: https://issues.apache.org/jira/browse/FLINK-13141
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


This is just a refactor work to make the interfacer of BufferPoolFactory more 
simple and clean.

BufferPoolFactory#getBufferSize is only used for creating subpartitions in 
ResultPartitionFactory. We could pass the network buffer size from 
NettyShuffleEnvironmentConfiguration while constructing the 
ResultPartitionFactory, then the interface method getBufferSize could be 
removed form BufferPoolFactory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-07 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12852:
-
Comment: was deleted

(was: I thought of another option to keep the dynamic local pool as now and 
only need to fix the internal logic in LocalBufferPool to solve the deadlock 
issue. The basic idea is that the local pool only requests buffers within min 
size from global initially, and eventually requests dynamic buffers within max 
size based on the history recycle ability. The details are as follows:
 * The local pool still has min and max size in construction as now.
 * We need an internal counter for statistic how many buffers were recycled 
totally before, which is the critical factor for indicating how many extra 
buffers beyond min size this pool could request from global.
 * The currentPoolSize would not reach max value directly after 
LocalBufferPool#setNumBuffers, and it should be the min size plus above 
recycled counter. 
 * The assumption is that if the pool already recycled x buffers before, that 
means the downstream has the ability to consume x buffers and it could always 
recycle x buffers again, so we could request extra x buffers within max region 
from global pool. Then these x buffers must be returned to global finally after 
snarrowed in redistribution.

[~StephanEwen] [~pnowojski] [~gaoyunhaii])

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-07 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879997#comment-16879997
 ] 

zhijiang commented on FLINK-12852:
--

I thought of another option to keep the dynamic local pool as now and only need 
to fix the internal logic in LocalBufferPool to solve the deadlock issue. The 
basic idea is that the local pool only requests buffers within min size from 
global initially, and eventually requests dynamic buffers within max size based 
on the history recycle ability. The details are as follows:
 * The local pool still has min and max size in construction as now.
 * We need an internal counter for statistic how many buffers were recycled 
totally before, which is the critical factor for indicating how many extra 
buffers beyond min size this pool could request from global.
 * The currentPoolSize would not reach max value directly after 
LocalBufferPool#setNumBuffers, and it should be the min size plus above 
recycled counter. 
 * The assumption is that if the pool already recycled x buffers before, that 
means the downstream has the ability to consume x buffers and it could always 
recycle x buffers again, so we could request extra x buffers within max region 
from global pool. Then these x buffers must be returned to global finally after 
snarrowed in redistribution.

[~StephanEwen] [~pnowojski] [~gaoyunhaii]

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-05 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879396#comment-16879396
 ] 

zhijiang edited comment on FLINK-12852 at 7/5/19 3:58 PM:
--

Thanks for above detail conclusions [~StephanEwen]!

I agree that the option 4 is the most feasible way ATM. The logic of fixed-size 
pool is easy and direct to avoid sharing buffers among tasks compared with 
dynamic size as now, so tasks are independent with each other, not relying on 
any assumptions. We ever realized the fixed-size local pool to avoid the 
buffers redistribution in early Blink version.

I understand the option 1 has the same direction with option 4, both with 
fixed-size local pool. The difference is that option 4 takes the (2*numChannels 
+ floating) as the fixed size of pool for better performance, and the option 1 
takes the numChannels as the fixed size of pool. The option 1 is compatible 
with users when upgrading, but it might bring performance regression. The 
option 4 could keep the performance but might need user to adjust the config 
when upgrade failure.

The other three options are indeed complex to implement and might break/rely on 
some assumptions to bring potential risks.


was (Author: zjwang):
Thanks for above detail conclusions [~StephanEwen]!

I agree that the option 4 is the most feasible way ATM. The logic of fixed-size 
pool is easy and direct to avoid sharing buffers among tasks compared with 
dynamic size as now, so tasks are independent with each other, not relying on 
any assumptions. We ever realized the fixed-size local pool to avoid the 
buffers redistribution in early Blink version.

I understand the option 1 has the same direction with option 4, both with 
fixed-size local buffer. The difference is that option 4 takes the 
(2*numChannels + floating) as the fixed size of pool for better performance, 
and the option 1 takes the numChannels as the fixed size of pool. The option 1 
is compatible with users when upgrading, but it might bring performance 
regression. The option 4 could keep the performance but might need user to 
adjust the config when upgrade failure.

The other three options are indeed complex to implement and might break/rely on 
some assumptions to bring potential risks.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 1

[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-05 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879396#comment-16879396
 ] 

zhijiang commented on FLINK-12852:
--

Thanks for above detail conclusions [~StephanEwen]!

I agree that the option 4 is the most feasible way ATM. The logic of fixed-size 
pool is easy and direct to avoid sharing buffers among tasks compared with 
dynamic size as now, so tasks are independent with each other, not relying on 
any assumptions. We ever realized the fixed-size local pool to avoid the 
buffers redistribution in early Blink version.

I understand the option 1 has the same direction with option 4, both with 
fixed-size local buffer. The difference is that option 4 takes the 
(2*numChannels + floating) as the fixed size of pool for better performance, 
and the option 1 takes the numChannels as the fixed size of pool. The option 1 
is compatible with users when upgrading, but it might bring performance 
regression. The option 4 could keep the performance but might need user to 
adjust the config when upgrade failure.

The other three options are indeed complex to implement and might break/rely on 
some assumptions to bring potential risks.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13126) Construct special test/benchmark to verify the backlog effect

2019-07-05 Thread zhijiang (JIRA)
zhijiang created FLINK-13126:


 Summary: Construct special test/benchmark to verify the backlog 
effect
 Key: FLINK-13126
 URL: https://issues.apache.org/jira/browse/FLINK-13126
 Project: Flink
  Issue Type: Sub-task
  Components: Benchmarks, Runtime / Network, Tests
Reporter: zhijiang
Assignee: zhijiang


Based on Piotr's suggestion in reviewing 
[PR|[https://github.com/apache/flink/pull/7911]], it is better to construct 
relevant test or benchmark to further verify the backlog effect as a follow-up 
work for  [FLINK-11082|https://issues.apache.org/jira/browse/FLINK-11082].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11082) Fix the calculation of backlog in PipelinedSubpartition

2019-07-05 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-11082:
-
Affects Version/s: (was: 1.6.3)
   (was: 1.8.0)

> Fix the calculation of backlog in PipelinedSubpartition
> ---
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.5.6, 1.7.1
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The backlog of subpartition should indicate how many buffers are consumable, 
> then the consumer could feedback the corresponding credits for transporting 
> these buffers. But in current PipelinedSubpartitionimplementation, the 
> backlog is increased by 1 when a BufferConsumer is added into 
> PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed 
> from PipelinedSubpartition. So the backlog only reflects how many buffers are 
> retained in PipelinedSubpartition, which is not always equivalent to the 
> number of consumable buffers.
> The backlog inconsistency might result in floating buffers misdistribution on 
> consumer side, because the consumer would request floating buffers based on 
> backlog value, then one floating buffer might not be used in 
> RemoteInputChannel long time after requesting.
> Considering the solution, the last buffer in PipelinedSubpartition could only 
> be consumable in the case of flush triggered or partition finished. So we 
> could calculate the backlog precisely based on partition flushed/finished 
> conditions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11082) Fix the calculation of backlog in PipelinedSubpartition

2019-07-05 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-11082:
-
Description: 
The backlog of subpartition should indicate how many buffers are consumable, 
then the consumer could feedback the corresponding credits for transporting 
these buffers. But in current PipelinedSubpartitionimplementation, the backlog 
is increased by 1 when a BufferConsumer is added into PipelinedSubpartition, 
and decreased by 1 when a BufferConsumer is removed from PipelinedSubpartition. 
So the backlog only reflects how many buffers are retained in 
PipelinedSubpartition, which is not always equivalent to the number of 
consumable buffers.

The backlog inconsistency might result in floating buffers misdistribution on 
consumer side, because the consumer would request floating buffers based on 
backlog value, then one floating buffer might not be used in RemoteInputChannel 
long time after requesting.

Considering the solution, the last buffer in PipelinedSubpartition could only 
be consumable in the case of flush triggered or partition finished. So we could 
calculate the backlog precisely based on partition flushed/finished conditions.

  was:
The backlog should indicate how many buffers are available in subpartition for 
downstream's  consumption. The availability is considered from two factors. One 
is {{BufferConsumer}} finished, and the other is flush triggered.

In current implementation, when the {{BufferConsumer}} is added into the 
subpartition, then the backlog is increased as a result, but this 
{{BufferConsumer}} is not yet available for network transport.

Furthermore, the backlog would affect requesting floating buffers on downstream 
side. That means some floating buffers are fetched in advance but not be used 
for long time, so the floating buffers are not made use of efficiently.

We found this scenario extremely for rebalance selector on upstream side, so we 
want to change when to increase backlog by finishing {{BufferConsumer}} or 
flush triggered.


> Fix the calculation of backlog in PipelinedSubpartition
> ---
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The backlog of subpartition should indicate how many buffers are consumable, 
> then the consumer could feedback the corresponding credits for transporting 
> these buffers. But in current PipelinedSubpartitionimplementation, the 
> backlog is increased by 1 when a BufferConsumer is added into 
> PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed 
> from PipelinedSubpartition. So the backlog only reflects how many buffers are 
> retained in PipelinedSubpartition, which is not always equivalent to the 
> number of consumable buffers.
> The backlog inconsistency might result in floating buffers misdistribution on 
> consumer side, because the consumer would request floating buffers based on 
> backlog value, then one floating buffer might not be used in 
> RemoteInputChannel long time after requesting.
> Considering the solution, the last buffer in PipelinedSubpartition could only 
> be consumable in the case of flush triggered or partition finished. So we 
> could calculate the backlog precisely based on partition flushed/finished 
> conditions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11082) Fix the calculation of backlog in PipelinedSubpartition

2019-07-05 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-11082:
-
Summary: Fix the calculation of backlog in PipelinedSubpartition  (was: 
Increase backlog only if it is available for consumption)

> Fix the calculation of backlog in PipelinedSubpartition
> ---
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-04 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878692#comment-16878692
 ] 

zhijiang commented on FLINK-12852:
--

Thanks for concerning this issue [~aitozi]

We already have the check `numTotalRequiredBuffers + numberOfSegmentsToRequest 
> totalNumberOfMemorySegments` during `NetworkBufferPool#requestMemorySegments` 
and this check is mainly used for judging whether the global size could satisfy 
the total core size. If we replace the `numberOfSegmentsToRequest` with 
`numRequiredBuffers` as you suggested above, what is the value for 
`numRequiredBuffers` here?

During `NetworkBufferPool#createBufferPool`, the `numRequiredBuffers` in check 
is for the core size in local pool. I guess your suggestion is also throwing 
the IOException("Insufficient buffers...") after allocating the exclusive 
buffers failed. There are two concerns for this solution:
 * How long do you want to wait before throwing the exception. The extra 
buffers recycle from other local pool might take some time after 
redistribution, but we could not estimate the rough time to throw this 
exception properly.
 * This seems a bit tricky/hacky to reuse the check here. If the exception 
tells users the message of insufficient buffers, users might be confused 
because the total buffers are actually enough. If the exception tells users the 
message of deadlock, users might still do not know what to do. The only benefit 
is identifying the deadlock easily, not via jstack.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network 

[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-04 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878518#comment-16878518
 ] 

zhijiang commented on FLINK-12852:
--

We have two directions in general: buffers sharing between tasks or not.
 * The sharing direction seems more flexible and has more possibilities, but we 
need to confirm the buffers revocable mechanism to not bring downsides. ATM we 
have above the first and third ways for it.

 * The not-sharing direction could avoid potential issues completely, but it 
might bring resource waste in some cases.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-04 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878516#comment-16878516
 ] 

zhijiang commented on FLINK-12852:
--

[~pnowojski] "Network resource matching in slot" refers to the second one you 
mentioned, and  "Lazy allocation buffers on producer side" refers to the first 
one.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-04 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878513#comment-16878513
 ] 

zhijiang commented on FLINK-13100:
--

What do you think [~pnowojski] [~StephanEwen]

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-04 Thread zhijiang (JIRA)
zhijiang created FLINK-13100:


 Summary: Fix the unexpected IOException during 
FileBufferReader#nextBuffer
 Key: FLINK-13100
 URL: https://issues.apache.org/jira/browse/FLINK-13100
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang


In the implementation of FileBufferReader#nextBuffer, we expect the next memory 
segment always available based on the assumption that the nextBuffer call could 
only happen when the previous buffer was recycled before. Otherwise it would 
throw an IOException in current implementation.

In fact, the above assumption is not making sense based on the credit-based and 
zero-copy features in network. The detail processes are as follows:
 * The netty thread finishes calling the channel.writeAndFlush() in 
PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
Before future done, the corresponding buffer is not recycled because of 
zero-copy improvement.

 * Before the previous future done, the netty thread could trigger next 
writeAndFlush via processing addCredit message, then 
FileBufferReader#nextBuffer would throw exception because of previous buffer 
not recycled.

We thought of several ways for solving this potential bug:
 * It does not trigger the next writeAndFlush before the previous future done. 
To do so it has to maintain the future state and check it in relevant actions. 
I wonder it might bring performance regression in network throughput and bring 
extra state management.

 * Adjust the implementation of current FileBufferReader. We ever regarded the 
blocking partition view as always available based on the next buffer read 
ahead, so it would be always added into available queue in 
PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
availability could be judged based on available buffers in FileBufferReader 
instead of next buffer ahead. When the buffer is recycled into FileBufferReader 
after writeAndFlush done, it could call notifyDataAvailable to add this view 
into available queue in PartitionRequestQueue.

I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-12738) Remove abstract getPageSize method from InputGate

2019-07-04 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang resolved FLINK-12738.
--
Resolution: Fixed

Fixed in b6563e385263beb556ce19855ca6dfdbb7f6c853

> Remove abstract getPageSize method from InputGate
> -
>
> Key: FLINK-12738
> URL: https://issues.apache.org/jira/browse/FLINK-12738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently {{InputGate#getPageSize}} is only used for constructing 
> {{BarrierBuffer}}. In order to make abstract InputGate simple and clean, we 
> should remove unnecessary abstract methods from it.
> Considering the page size could be parsed directly from configuration which 
> could also visible while constructing {{BarrierBuffer}}, so it is reasonable 
> to do so.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-03 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878289#comment-16878289
 ] 

zhijiang edited comment on FLINK-12852 at 7/4/19 3:27 AM:
--

Network resource matching in slot has many unsetting issues which would be 
further discussed future, so we could not make it effect in short time.

Lazy allocation buffers on producer side seems a feasible way atm. It could 
still retain the current core and maximum mechanism in local pool. But it 
brings another two effects:
 * Higher time to ramp up to full throughput as Stephan mentioned, especially 
for some very short-time jobs (several seconds finish) and I remembered there 
exists such cases in Kurt's benchmark before. We change the previous concurrent 
production and consumption to sequential way. For short-time job, before the 
consumer requests partition, all the data set might already be emitted and 
cached in partition pool on producer side before.
 * We rely on another assumption that produced buffers could be recycled 
finally once subpartition view is established. This assumption might limit our 
new features/improvements future. ATM we need to adjust the action to trigger 
partition request, that means RemoteInputChannel could only send partition 
request if the correspond task has no result partition or the partition's view 
has already been established. In future the InputSelection might also destroy 
the above assumption. Although the partition was requested, but the OP could 
select not to consumer that partition long time.


was (Author: zjwang):
Network resource matching in slot has many unsetting issues which should be 
further discussed future, so we could not make it effect in short time.

Lazy allocation buffers on producer side seems a feasible way atm. It could 
still retain the current core and maximum mechanism in local pool. But it 
brings another two effects:
 * Higher time to ramp up to full throughput as Stephan mentioned, especially 
for some very short-time jobs (several seconds finish) and I remembered there 
exists such cases in Kurt's benchmark before. We change the previous concurrent 
production and consumption to sequential way. For short-time job, before the 
consumer requests partition, all the data set might already be emitted and 
cached in partition pool on producer side before.
 * We rely on another assumption that produced buffers could be recycled 
finally once subpartition view is established. This assumption might limit our 
new features/improvements future. ATM we need to adjust the action to trigger 
partition request, that means `RemoteInputChannel` could only send partition 
request if this task has no result partition or the partition's view has 
already been established. In future the InputSelection might also destroy the 
above assumption. Although the partition was requested, but the OP could select 
not to consumer that partition long time.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.run

[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-03 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878289#comment-16878289
 ] 

zhijiang commented on FLINK-12852:
--

Network resource matching in slot has many unsetting issues which should be 
further discussed future, so we could not make it effect in short time.

Lazy allocation buffers on producer side seems a feasible way atm. It could 
still retain the current core and maximum mechanism in local pool. But it 
brings another two effects:
 * Higher time to ramp up to full throughput as Stephan mentioned, especially 
for some very short-time jobs (several seconds finish) and I remembered there 
exists such cases in Kurt's benchmark before. We change the previous concurrent 
production and consumption to sequential way. For short-time job, before the 
consumer requests partition, all the data set might already be emitted and 
cached in partition pool on producer side before.
 * We rely on another assumption that produced buffers could be recycled 
finally once subpartition view is established. This assumption might limit our 
new features/improvements future. ATM we need to adjust the action to trigger 
partition request, that means `RemoteInputChannel` could only send partition 
request if this task has no result partition or the partition's view has 
already been established. In future the InputSelection might also destroy the 
above assumption. Although the partition was requested, but the OP could select 
not to consumer that partition long time.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not inclu

[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-03 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16877737#comment-16877737
 ] 

zhijiang commented on FLINK-12852:
--

It is hardly to say whether we could get performance benefits via overusing the 
extra buffers beyond core size temporarily.

In general the tasks would be deployed into one TM by sequence in very short 
internal time. If the first task occupies some buffers actually belong to other 
following tasks, it would also bring some cost/overhead to recycle these extra 
buffers afterwards. Especially this greedy mechanism might only have limited 
benefits in special cases, such as backpressure. Overall it is hard to evaluate 
the final benefits in different scenarios.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-03 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16877616#comment-16877616
 ] 

zhijiang commented on FLINK-12852:
--

Hey [~gaoyunhaii], considering the second point for long run, I have not 
thought it throughly.

I am not very sure whether it is still worth keeping dynamic local pool between 
core and max size based on slot resource matching. If this dynamic would bring 
more troubles in practice, another option is adjusting it to a fix-size local 
pool instead. The system could calculate a reasonable default size for the 
local pool as now, and users could also tune it to any size they want. E.g. 
only 2 total buffers in local pool could also work for 100 subpartitions if 
users are not caring about the performance.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-03 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16877605#comment-16877605
 ] 

zhijiang commented on FLINK-12852:
--

The spill way could solve the deadlock issue in function, which is very similar 
with previous SpillableSubpartition's  behavior. If we only want to spill the 
buffers between core size and max size after redistribution, it still could not 
solve another existing exception of  `insufficient number of network buffers' 
which was always experienced in Flink for large-scale job. If we also spill 
some buffers within core size to avoid that `IOException`, the performance 
regression might be serious but users are not aware of it. Users might prefer 
to increase buffer options to avoid performance regression if they know. 
Especially for streaming job, it is better not to touch disk unless necessary.

In contrast, if we agree that the slot resource matching would be the final way 
in future, then it could resolve both deadlock and `insufficient number of 
network buffers' issues. And users could decide to adjust the relevant buffer 
configs to make a tradeoff between performance and total resource usage. And we 
might further improve the internal mechanism for decreasing the requirements of 
core buffer sizes(including exclusive and core size in local buffer) to make 
job still run when given limited resource.

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-07-02 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16877073#comment-16877073
 ] 

zhijiang commented on FLINK-12852:
--

[~StephanEwen] thanks for concerning this issue and sharing the suggestions.

The root reason of this issue is the mechanism of distributing global buffers 
among LocalBufferPools. As we know the local pool has core size and max size 
for partition/gate. The core size must be satisfied otherwise an exception 
would be thrown. Every local pool could use max size at most if there are 
enough buffers in global pool to make better use of resource. But the 
precondition is the extra buffers beyond core size could be returned finally 
after redistribution.  This precondition/assumption is not always satisfied 
especially in credit-based mode for the following reasons:
 * In non-credit mode, the producer could always push data to network until 
backpressure, so the extra used buffers in partition's local pool could be 
recycled in time. But now the producer could not send buffer to network until 
the consumer requested the exclusive buffers as initial credits.
 * In non-credit mode, the gate's local pool only needs the core size equal to 
numChannels, but now we need 2*numChannels as exclusive core size by default in 
credit-based mode. So the probability is higher than before.
 * In non-credit mode, the buffer request from global pool is lazy by data 
driven, that means after consumer receives data from network, then the local 
pool would request buffer from global pool. But now the exclusive buffer 
request from global pool is eager during startup. So the probability is also 
higher than before. If we make exclusive request also lazy, then it might also 
relieve the deadlock issue.

I think the previous non-credit mode could not avoid deadlock completely in 
theory, not very sure, especially for some corner cases. But in credit-based 
mode, the above factors would make the probability of deadlock much higher than 
before. 

I agree with the implications mentioned by Stephan, the current blocking 
partition would not exist this issue. For streaming job it still exists this 
probability, but i think it should not be a blocker for release-1.9.

The current proposed PR makes the deadlock to fail instead. It seems a bit 
better than now to tell users what happens, but it does not solve this issue in 
root. And users might still have a bad experience. We ever avoided this issue 
in alibaba via network resource matching in ResourceProfile and fixed size in 
local pool.

I also agree with the general ideas Stephan proposed for solving this issue. 
The slot resource isolation (including shuffle resource) might be a right way 
to go in future. ATM we could make some improvements to decrease this 
probability, such as making exclusive size as 1 by default which Jira was 
already created before and making exclusive request lazy as floating buffers.

 

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.netwo

[jira] [Updated] (FLINK-12882) Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters

2019-06-26 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12882:
-
Description: The ResultPartitionID could be got directly from 
ResultPartitionDeploymentDescriptor, so it is no need to pass 
ExecutionAttemptID to construct new ResultPartitionID during creating 
ResultPartition in factory.  (was: The {{ExecutionAttemptID}} is only used for 
constructing {{ResultPartitionID}} during creating {{ResultPartitionWriters in 
ShuffleEnvironment}}.

Actually the {{ResultPartitionID}} could be got directly from 
{{ResultPartitionDeploymentDescriptor}} via 
{{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this 
field in the interface to make it simple.)

> Remove ExecutionAttemptID field from 
> ShuffleEnvironment#createResultPartitionWriters
> 
>
> Key: FLINK-12882
> URL: https://issues.apache.org/jira/browse/FLINK-12882
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The ResultPartitionID could be got directly from 
> ResultPartitionDeploymentDescriptor, so it is no need to pass 
> ExecutionAttemptID to construct new ResultPartitionID during creating 
> ResultPartition in factory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12882) Remove ExecutionAttemptID argument from ResultPartitionFactory#create

2019-06-26 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12882:
-
Summary: Remove ExecutionAttemptID argument from 
ResultPartitionFactory#create  (was: Remove ExecutionAttemptID field from 
ShuffleEnvironment#createResultPartitionWriters)

> Remove ExecutionAttemptID argument from ResultPartitionFactory#create
> -
>
> Key: FLINK-12882
> URL: https://issues.apache.org/jira/browse/FLINK-12882
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The ResultPartitionID could be got directly from 
> ResultPartitionDeploymentDescriptor, so it is no need to pass 
> ExecutionAttemptID to construct new ResultPartitionID during creating 
> ResultPartition in factory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13010) Refactor the process of SchedulerNG#requestPartitionState

2019-06-26 Thread zhijiang (JIRA)
zhijiang created FLINK-13010:


 Summary: Refactor the process of SchedulerNG#requestPartitionState
 Key: FLINK-13010
 URL: https://issues.apache.org/jira/browse/FLINK-13010
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently `requestPartitionState` is mainly used for querying partition state 
when the consumer receives `PartitionNotFoundException` during requesting 
partition. Actually we do not have the concept of partition state atm, and ` 
requestPartitionState` would return the corresponding producer's state as as 
result, so it exists a contradiction here.

My suggestion is refactoring the method as `requestPartitionProducerState` and 
we do not need to pass `IntermediateDataSetID` and `ResultPartitionID` 
arguments for finding the corresponding execution attempt. We could only pass 
the `ExecutionAttemptID` in method then the corresponding execution attempt 
could be easily found from the mapping in `ExecutionGraph`.

To do so, we could further remove ` IntermediateDataSetID` from 
`SingleInputGate` and might replace `IntermediateDataSetID` by `InputGateID` in 
`InputGateDeploymentDescriptor`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12284) InputBufferPoolUsage is incorrect in credit-based network control flow

2019-06-26 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12284:
-
Component/s: Runtime / Metrics

> InputBufferPoolUsage is incorrect in credit-based network control flow
> --
>
> Key: FLINK-12284
> URL: https://issues.apache.org/jira/browse/FLINK-12284
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / Network
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0
>Reporter: Xiaogang Shi
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When using credit-based network control flow, exclusive buffers are directly 
> assigned to {{RemoteInputChannel}} and are not counted in 
> {{LocalBufferPool}}, leading to incorrect InputBufferPoolUsage.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12889) Job keeps in FAILING state

2019-06-25 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16872933#comment-16872933
 ] 

zhijiang edited comment on FLINK-12889 at 6/26/19 5:33 AM:
---

My previous analysis in ML was as follows:
  
 Actually all the five "Source: ServiceLog" tasks are not in terminal state on 
JM view, the relevant processes shown in below: 
 * The checkpoint in task causes OOM issue which would call 
`Task#failExternally` as a result, we could see the log "Attempting to fail 
task externally" in TM.
 * The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in TM.
 * When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in TM.

 
 At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 
  
 For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.
  
 I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.
  
 [~till.rohrmann] do you think it is worth fixing or have other concerns?


was (Author: zjwang):
My previous analysis in ML was as follows:
  
 Actually all the five "Source: ServiceLog" tasks are not in terminal state on 
JM view, the relevant processes shown in below: 
 * The checkpoint in task causes OOM issue which would call 
`Task#failExternally` as a result, we could see the log "Attempting to fail 
task externally" in tm.
 * The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
 * When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

 
 At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 
  
 For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.
  
 I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.
  
 [~till.rohrmann] do you think it is worth fixing or have other concerns?

> Job keeps in FAILING state
> --
>
> Key: FLINK-12889
> URL: https://issues.apache.org/jira/browse/FLINK-12889
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Fan Xinpu
>Priority: Minor
> Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, 
> taskmanager.log
>
>
> There is a topology of 3 operator, such as, source, parser, and persist. 
> Occasionally, 5 subtasks of the source encounters exception and turns to 
> failed, at the same time, one subtask of the parser runs into exception and 
> turns to failed too. The jobmaster gets a message of the parser's failed. The 
> jobmaster then try to cancel all the subtask, most of the subtasks of the 
> three operator turns to canceled except the 5 subtasks of the source, because 
> the state of the 5 ones is already FAILED before jobmaster try to cancel it. 
> Then the jobmaster can not 

[jira] [Comment Edited] (FLINK-12889) Job keeps in FAILING state

2019-06-25 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16872933#comment-16872933
 ] 

zhijiang edited comment on FLINK-12889 at 6/26/19 5:32 AM:
---

My previous analysis in ML was as follows:
  
 Actually all the five "Source: ServiceLog" tasks are not in terminal state on 
JM view, the relevant processes shown in below: 
 * The checkpoint in task causes OOM issue which would call 
`Task#failExternally` as a result, we could see the log "Attempting to fail 
task externally" in tm.
 * The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
 * When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

 
 At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 
  
 For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.
  
 I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.
  
 [~till.rohrmann] do you think it is worth fixing or have other concerns?


was (Author: zjwang):
My previous analysis in ML was as follows:
 
Actually all the five "Source: ServiceLog" tasks are not in terminal state on 
JM view, the relevant processes shown in below: * The checkpoint in task causes 
OOM issue which would call `Task#failExternally` as a result, we could see the 
log "Attempting to fail task externally" in tm.
 * The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
 * When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

 
At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 
 
For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.
 
I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.
 
[~till.rohrmann] do you think it is worth fixing or have other concerns?

> Job keeps in FAILING state
> --
>
> Key: FLINK-12889
> URL: https://issues.apache.org/jira/browse/FLINK-12889
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Fan Xinpu
>Priority: Minor
> Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, 
> taskmanager.log
>
>
> There is a topology of 3 operator, such as, source, parser, and persist. 
> Occasionally, 5 subtasks of the source encounters exception and turns to 
> failed, at the same time, one subtask of the parser runs into exception and 
> turns to failed too. The jobmaster gets a message of the parser's failed. The 
> jobmaster then try to cancel all the subtask, most of the subtasks of the 
> three operator turns to canceled except the 5 subtasks of the source, because 
> the state of the 5 ones is already FAILED before jobmaster try to cancel it. 
> Then the jobmaster can not reach a fin

[jira] [Updated] (FLINK-12889) Job keeps in FAILING state

2019-06-25 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12889:
-
Priority: Minor  (was: Blocker)

> Job keeps in FAILING state
> --
>
> Key: FLINK-12889
> URL: https://issues.apache.org/jira/browse/FLINK-12889
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Fan Xinpu
>Priority: Minor
> Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, 
> taskmanager.log
>
>
> There is a topology of 3 operator, such as, source, parser, and persist. 
> Occasionally, 5 subtasks of the source encounters exception and turns to 
> failed, at the same time, one subtask of the parser runs into exception and 
> turns to failed too. The jobmaster gets a message of the parser's failed. The 
> jobmaster then try to cancel all the subtask, most of the subtasks of the 
> three operator turns to canceled except the 5 subtasks of the source, because 
> the state of the 5 ones is already FAILED before jobmaster try to cancel it. 
> Then the jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state. 
>  
> The job run on a flink 1.7 cluster on yarn, and there is only one tm with 10 
> slots.
>  
> The attached files contains a jm log , tm log and the ui picture.
>  
> The exception timestamp is about 2019-06-16 13:42:28.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12889) Job keeps in FAILING state

2019-06-25 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12889:
-
Affects Version/s: (was: 1.7.2)

> Job keeps in FAILING state
> --
>
> Key: FLINK-12889
> URL: https://issues.apache.org/jira/browse/FLINK-12889
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Fan Xinpu
>Priority: Blocker
> Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, 
> taskmanager.log
>
>
> There is a topology of 3 operator, such as, source, parser, and persist. 
> Occasionally, 5 subtasks of the source encounters exception and turns to 
> failed, at the same time, one subtask of the parser runs into exception and 
> turns to failed too. The jobmaster gets a message of the parser's failed. The 
> jobmaster then try to cancel all the subtask, most of the subtasks of the 
> three operator turns to canceled except the 5 subtasks of the source, because 
> the state of the 5 ones is already FAILED before jobmaster try to cancel it. 
> Then the jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state. 
>  
> The job run on a flink 1.7 cluster on yarn, and there is only one tm with 10 
> slots.
>  
> The attached files contains a jm log , tm log and the ui picture.
>  
> The exception timestamp is about 2019-06-16 13:42:28.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12889) Job keeps in FAILING state

2019-06-25 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12889:
-
Component/s: (was: API / DataStream)
 Runtime / Coordination

> Job keeps in FAILING state
> --
>
> Key: FLINK-12889
> URL: https://issues.apache.org/jira/browse/FLINK-12889
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2
>Reporter: Fan Xinpu
>Priority: Blocker
> Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, 
> taskmanager.log
>
>
> There is a topology of 3 operator, such as, source, parser, and persist. 
> Occasionally, 5 subtasks of the source encounters exception and turns to 
> failed, at the same time, one subtask of the parser runs into exception and 
> turns to failed too. The jobmaster gets a message of the parser's failed. The 
> jobmaster then try to cancel all the subtask, most of the subtasks of the 
> three operator turns to canceled except the 5 subtasks of the source, because 
> the state of the 5 ones is already FAILED before jobmaster try to cancel it. 
> Then the jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state. 
>  
> The job run on a flink 1.7 cluster on yarn, and there is only one tm with 10 
> slots.
>  
> The attached files contains a jm log , tm log and the ui picture.
>  
> The exception timestamp is about 2019-06-16 13:42:28.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12889) Job keeps in FAILING state

2019-06-25 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16872933#comment-16872933
 ] 

zhijiang commented on FLINK-12889:
--

My previous analysis in ML was as follows:
 
Actually all the five "Source: ServiceLog" tasks are not in terminal state on 
JM view, the relevant processes shown in below: * The checkpoint in task causes 
OOM issue which would call `Task#failExternally` as a result, we could see the 
log "Attempting to fail task externally" in tm.
 * The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
 * When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

 
At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 
 
For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.
 
I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.
 
[~till.rohrmann] do you think it is worth fixing or have other concerns?

> Job keeps in FAILING state
> --
>
> Key: FLINK-12889
> URL: https://issues.apache.org/jira/browse/FLINK-12889
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: Fan Xinpu
>Priority: Blocker
> Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, 
> taskmanager.log
>
>
> There is a topology of 3 operator, such as, source, parser, and persist. 
> Occasionally, 5 subtasks of the source encounters exception and turns to 
> failed, at the same time, one subtask of the parser runs into exception and 
> turns to failed too. The jobmaster gets a message of the parser's failed. The 
> jobmaster then try to cancel all the subtask, most of the subtasks of the 
> three operator turns to canceled except the 5 subtasks of the source, because 
> the state of the 5 ones is already FAILED before jobmaster try to cancel it. 
> Then the jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state. 
>  
> The job run on a flink 1.7 cluster on yarn, and there is only one tm with 10 
> slots.
>  
> The attached files contains a jm log , tm log and the ui picture.
>  
> The exception timestamp is about 2019-06-16 13:42:28.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12909) add try catch when find a unique file name for the spilling channel

2019-06-24 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16871942#comment-16871942
 ] 

zhijiang commented on FLINK-12909:
--

Also you should connect the PR #7191 to this Jira ticket [~xymaqingxiang].

> add try catch when find a unique file name for the spilling channel
> ---
>
> Key: FLINK-12909
> URL: https://issues.apache.org/jira/browse/FLINK-12909
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: xymaqingxiang
>Priority: Major
>
> h2. What is the purpose of the change
> Catch exceptions thrown due to disk loss, try to find a unique file name for 
> the spilling channel again.
> Modify the createSpillingChannel() method of the 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer
>  class to solve this problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12909) add try catch when find a unique file name for the spilling channel

2019-06-24 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16871941#comment-16871941
 ] 

zhijiang commented on FLINK-12909:
--

The ticket does not describes the issue well.

Any IOException thrown by `createNewFile` would make the attempt loop end to 
jump out, so the initial motivation was ignoring the intermediate exceptions to 
confirm all the attempts finish, otherwise the attempt logic seems useless. 
Also the new added warn log could show the specific failure reason for helping 
trace.

It might be an option to wait some time for each attempt as Nico mentioned to 
enhance it, because in most cases the exceptions might still exist during these 
attempts, or we could remove the attempt loop directly to make it clear.

 

> add try catch when find a unique file name for the spilling channel
> ---
>
> Key: FLINK-12909
> URL: https://issues.apache.org/jira/browse/FLINK-12909
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: xymaqingxiang
>Priority: Major
>
> h2. What is the purpose of the change
> Catch exceptions thrown due to disk loss, try to find a unique file name for 
> the spilling channel again.
> Modify the createSpillingChannel() method of the 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer
>  class to solve this problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12882) Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters

2019-06-18 Thread zhijiang (JIRA)
zhijiang created FLINK-12882:


 Summary: Remove ExecutionAttemptID field from 
ShuffleEnvironment#createResultPartitionWriters
 Key: FLINK-12882
 URL: https://issues.apache.org/jira/browse/FLINK-12882
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The {{ExecutionAttemptID}} is only used for constructing {{ResultPartitionID}} 
during creating {{ResultPartitionWriter}}s in {{ShuffleEnvironment}}.

Actually the {{ResultPartitionID}} could be got directly from 
{{ResultPartitionDeploymentDescriptor}} via 
{{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this 
field in the interface to make it simple.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12882) Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters

2019-06-18 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12882:
-
Description: 
The {{ExecutionAttemptID}} is only used for constructing {{ResultPartitionID}} 
during creating {{ResultPartitionWriters in ShuffleEnvironment}}.

Actually the {{ResultPartitionID}} could be got directly from 
{{ResultPartitionDeploymentDescriptor}} via 
{{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this 
field in the interface to make it simple.

  was:
The {{ExecutionAttemptID}} is only used for constructing {{ResultPartitionID}} 
during creating {{ResultPartitionWriter}}s in {{ShuffleEnvironment}}.

Actually the {{ResultPartitionID}} could be got directly from 
{{ResultPartitionDeploymentDescriptor}} via 
{{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this 
field in the interface to make it simple.


> Remove ExecutionAttemptID field from 
> ShuffleEnvironment#createResultPartitionWriters
> 
>
> Key: FLINK-12882
> URL: https://issues.apache.org/jira/browse/FLINK-12882
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> The {{ExecutionAttemptID}} is only used for constructing 
> {{ResultPartitionID}} during creating {{ResultPartitionWriters in 
> ShuffleEnvironment}}.
> Actually the {{ResultPartitionID}} could be got directly from 
> {{ResultPartitionDeploymentDescriptor}} via 
> {{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this 
> field in the interface to make it simple.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12843) Refactor the pin logic in ReleaseOnConsumptionResultPartition

2019-06-17 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12843:
-
Description: 
The pin logic is for adding the reference counter based on number of 
subpartitions for {{ReleaseOnConsumptionResultPartition}}.

It seems not necessary to do it in while loop as now, because the atomic 
counter would not be accessed by other threads during pin. If the 
{{ReleaseOnConsumptionResultPartition}} was not created yet, the 
{{createSubpartitionView}} would not be called actually resulting in 
{{PartitionNotFoundException}}. 

So we could simple increase the reference counter in 
ReleaseOnConsumptionResultPartition constructor directly.

  was:
The pin logic is for adding the reference counter based on number of 
subpartitions in {{ResultPartition}}. It seems not necessary to do it in while 
loop as now, because the atomic counter would not be accessed by other threads 
during pin. If the `ResultPartition` is not created yet, the 
{{ResultPartition#createSubpartitionView}} would not be called and it would 
response {{ResultPartitionNotFoundException}} in {{ResultPartitionManager}}. 

So we could simple increase the reference counter in {{ResultPartition}} 
constructor directly.


> Refactor the pin logic in ReleaseOnConsumptionResultPartition
> -
>
> Key: FLINK-12843
> URL: https://issues.apache.org/jira/browse/FLINK-12843
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> The pin logic is for adding the reference counter based on number of 
> subpartitions for {{ReleaseOnConsumptionResultPartition}}.
> It seems not necessary to do it in while loop as now, because the atomic 
> counter would not be accessed by other threads during pin. If the 
> {{ReleaseOnConsumptionResultPartition}} was not created yet, the 
> {{createSubpartitionView}} would not be called actually resulting in 
> {{PartitionNotFoundException}}. 
> So we could simple increase the reference counter in 
> ReleaseOnConsumptionResultPartition constructor directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12843) Refactor the pin logic in ReleaseOnConsumptionResultPartition

2019-06-17 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12843:
-
Summary: Refactor the pin logic in ReleaseOnConsumptionResultPartition  
(was: Refactor the pin logic in ResultPartition)

> Refactor the pin logic in ReleaseOnConsumptionResultPartition
> -
>
> Key: FLINK-12843
> URL: https://issues.apache.org/jira/browse/FLINK-12843
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> The pin logic is for adding the reference counter based on number of 
> subpartitions in {{ResultPartition}}. It seems not necessary to do it in 
> while loop as now, because the atomic counter would not be accessed by other 
> threads during pin. If the `ResultPartition` is not created yet, the 
> {{ResultPartition#createSubpartitionView}} would not be called and it would 
> response {{ResultPartitionNotFoundException}} in {{ResultPartitionManager}}. 
> So we could simple increase the reference counter in {{ResultPartition}} 
> constructor directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-06-14 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863947#comment-16863947
 ] 

zhijiang edited comment on FLINK-12070 at 6/14/19 11:09 AM:


In Yingjie's testing based on session mode, the container is started with large 
memory by considering many slots per TM with less number of containers, instead 
of less num slots with more num containers.

In common cases for per-job mode, the required container resource should be 1 
cpu core with 4GB memory which might have 1GB network buffers then.

I agree with the above assumption Stephan mentioned.  We do not expect the 
previous SpillableSubpartition occupying network memory because it is difficult 
to be consumed multiple times and has many other issues. If the previous 
SpillableSubpartition is transformed to spilled mode because of limited network 
memory, then the performance comparing with new BlockingPartition might be same 
more or less. Even though comparing with pure memory mode in  
SpillableSubpartition, as long as we could confirm the new BlockingPartition 
implementation writes the data into OS cache and not waits for flushing onto 
disk, then the performance should also in the same level.

I remembered the Berkeley DB provides three levels for writing as the tradeoff 
in performance and safety. The first is writing into process memory as success. 
The second is writing into OS cache as success. And the third is writing and 
flushing into disk as success. I am not sure if it exists the similar mechanism 
to take the second way in our case.

I would also take a look at the alternative ways Stephan provides.


was (Author: zjwang):
In Yingjie's testing based on session mode, the container is started with large 
memory by considering many slots per TM with less number of containers, instead 
of less num slots with more num containers.

In common cases for per-job mode, the required container resource should be 1 
cpu core with 4GB memory which might have 1GB network buffers then.

I agree with the above assumption Stephan mentioned.  We do not expect the 
previous SpillableSubpartition occupying network memory because it is difficult 
to be consumed multiple times and has many other issues. If the previous 
SpillableSubpartition is transformed to spilled mode because of limited network 
memory, then the performance comparing with new BlockingPartition might be same 
more or less. Even though comparing with pure memory mode in  
SpillableSubpartition, as long as we could confirm the new BlockingPartition 
implementation writes the data into OS cache and not waits for flushing onto 
disk, then the performance should also in the same level.

I remembered the Berkeley DB provides three levels for writing __ as the 
tradeoff in performance and safety. The first is writing into process memory as 
success. The second is writing into OS cache as success. And the third is 
writing and flushing into disk as success. I am not sure if it exists the 
similar mechanism to take the second way in our case.

I would also take a look at the alternative ways Stephan provides.

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: image-2019-04-18-17-38-24-949.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-06-14 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863947#comment-16863947
 ] 

zhijiang commented on FLINK-12070:
--

In Yingjie's testing based on session mode, the container is started with large 
memory by considering many slots per TM with less number of containers, instead 
of less num slots with more num containers.

In common cases for per-job mode, the required container resource should be 1 
cpu core with 4GB memory which might have 1GB network buffers then.

I agree with the above assumption Stephan mentioned.  We do not expect the 
previous SpillableSubpartition occupying network memory because it is difficult 
to be consumed multiple times and has many other issues. If the previous 
SpillableSubpartition is transformed to spilled mode because of limited network 
memory, then the performance comparing with new BlockingPartition might be same 
more or less. Even though comparing with pure memory mode in  
SpillableSubpartition, as long as we could confirm the new BlockingPartition 
implementation writes the data into OS cache and not waits for flushing onto 
disk, then the performance should also in the same level.

I remembered the Berkeley DB provides three levels for writing __ as the 
tradeoff in performance and safety. The first is writing into process memory as 
success. The second is writing into OS cache as success. And the third is 
writing and flushing into disk as success. I am not sure if it exists the 
similar mechanism to take the second way in our case.

I would also take a look at the alternative ways Stephan provides.

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: image-2019-04-18-17-38-24-949.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12843) Refactor the pin logic in ResultPartition

2019-06-13 Thread zhijiang (JIRA)
zhijiang created FLINK-12843:


 Summary: Refactor the pin logic in ResultPartition
 Key: FLINK-12843
 URL: https://issues.apache.org/jira/browse/FLINK-12843
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The pin logic is for adding the reference counter based on number of 
subpartitions in {{ResultPartition}}. It seems not necessary to do it in while 
loop as now, because the atomic counter would not be accessed by other threads 
during pin. If the `ResultPartition` is not created yet, the 
{{ResultPartition#createSubpartitionView}} would not be called and it would 
response {{ResultPartitionNotFoundException}} in {{ResultPartitionManager}}. 

So we could simple increase the reference counter in {{ResultPartition}} 
constructor directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12842) Fix invalid check released state during ResultPartition#createSubpartitionView

2019-06-13 Thread zhijiang (JIRA)
zhijiang created FLINK-12842:


 Summary: Fix invalid check released state during 
ResultPartition#createSubpartitionView
 Key: FLINK-12842
 URL: https://issues.apache.org/jira/browse/FLINK-12842
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently in {{ResultPartition#createSubpartitionView}} it would check whether 
this partition is released before creating view. But this check is based on 
{{refCnt != -1}} which seems invalid, because the reference counter would not 
always reflect the released state.

In the case of {{ResultPartition#release/fail}}, the reference counter is not 
set to -1. Even if in the case of {{ResultPartition#onConsumedSubpartition}}, 
the reference counter seems also no chance to be -1.

So we could check the real {{isReleased}} state during creating view instead of 
reference counter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12761) Fine grained resource management

2019-06-06 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12761:
-
Summary: Fine grained resource management  (was: Fine grained resource 
managedment)

> Fine grained resource management
> 
>
> Key: FLINK-12761
> URL: https://issues.apache.org/jira/browse/FLINK-12761
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.8.0, 1.9.0
>Reporter: Tony Xintong Song
>Assignee: Tony Xintong Song
>Priority: Major
>  Labels: Umbrella
>
> This is an umbrella issue for enabling fine grained resource management in 
> Flink.
> Fine grained resource management is a big topic that requires long term 
> efforts. There are many issues to be addressed and designing decisions to be 
> made, some of which may not be resolved in short time. Here we propose our 
> design and implementation plan for the upcoming release 1.9, as well as our 
> thoughts and ideas on the long term road map on this topic.
> A practical short term target is to enable fine grained resource management 
> for batch sql jobs only in the upcoming Flink 1.9. This is necessary for 
> batch operators added from blink to achieve good performance.
> Please find detailed design and implementation plan in attached docs. Any 
> comment and feedback are welcomed and appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12738) Remove abstract getPageSize method from InputGate

2019-06-05 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12738:
-
Summary: Remove abstract getPageSize method from InputGate  (was: Remove 
abstract getPageSize from InputGate)

> Remove abstract getPageSize method from InputGate
> -
>
> Key: FLINK-12738
> URL: https://issues.apache.org/jira/browse/FLINK-12738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> Currently {{InputGate#getPageSize}} is only used for constructing 
> {{BarrierBuffer}}. In order to make abstract InputGate simple and clean, we 
> should remove unnecessary abstract methods from it.
> Considering the page size could be parsed directly from configuration which 
> could also visible while constructing {{BarrierBuffer}}, so it is reasonable 
> to do so.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12740) SpillableSubpartitionTest deadlocks on Travis

2019-06-05 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16856643#comment-16856643
 ] 

zhijiang commented on FLINK-12740:
--

Yes, the new added uint test for FLINK-12544 might be not stable. It runs 
always successful in the first time on my side. But if it runs repeatedly in a 
loop, it would stuck at the third time. I would double check this test and 
might submit a fix later today.

> SpillableSubpartitionTest deadlocks on Travis
> -
>
> Key: FLINK-12740
> URL: https://issues.apache.org/jira/browse/FLINK-12740
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.8.1
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.8.1
>
>
> https://travis-ci.org/apache/flink/jobs/541225542



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12740) SpillableSubpartitionTest deadlocks on Travis

2019-06-05 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang reassigned FLINK-12740:


Assignee: zhijiang

> SpillableSubpartitionTest deadlocks on Travis
> -
>
> Key: FLINK-12740
> URL: https://issues.apache.org/jira/browse/FLINK-12740
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.8.1
>Reporter: Chesnay Schepler
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.8.1
>
>
> https://travis-ci.org/apache/flink/jobs/541225542



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12738) Remove abstract getPageSize from InputGate

2019-06-05 Thread zhijiang (JIRA)
zhijiang created FLINK-12738:


 Summary: Remove abstract getPageSize from InputGate
 Key: FLINK-12738
 URL: https://issues.apache.org/jira/browse/FLINK-12738
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently {{InputGate#getPageSize}} is only used for constructing 
{{BarrierBuffer}}. In order to make abstract InputGate simple and clean, we 
should remove unnecessary abstract methods from it.

Considering the page size could be parsed directly from configuration which 
could also visible while constructing {{BarrierBuffer}}, so it is reasonable to 
do so.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12735) Make shuffle environment implementation independent with IOManager

2019-06-04 Thread zhijiang (JIRA)
zhijiang created FLINK-12735:


 Summary: Make shuffle environment implementation independent with 
IOManager
 Key: FLINK-12735
 URL: https://issues.apache.org/jira/browse/FLINK-12735
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The current creation of {{NetworkEnvironment}} is relying on {{IOManager}} from 
{{TaskManagerServices}}. In order not to rely on external specific components 
for implementing shuffle environment, and let the specific implementation 
creates internal components if required.

The current abstract {{IOManager}} has two roles, one is for file channel 
management based on temp directories configuration, and the other is providing 
abstract methods for reading/writing files. We could further extract the file 
channel management as a separate internal class which could be reused for all 
the required components, like current {{BoundedBlockingSubpartition}}. To do 
so, the shuffle environment should also creates its internal channel manager to 
break dependency with {{IOManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11604) Extend the necessary methods in ResultPartitionWriter interface

2019-06-04 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang closed FLINK-11604.

Resolution: Duplicate

Already done in FLINK-12570

> Extend the necessary methods in ResultPartitionWriter interface
> ---
>
> Key: FLINK-11604
> URL: https://issues.apache.org/jira/browse/FLINK-11604
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This is a preparation work for future creating {{ResultPartitionWriter}} via 
> proposed {{ShuffleService}}.
> Currently there exists only one {{ResultPartition}} implementation for 
> {{ResultPartitionWriter}} interface, so the specific {{ResultPartition}} 
> instance is easily referenced in many other classes such as {{Task}}, 
> {{NetworkEnvironment}}, etc. Even some private methods in {{ResultPartition}} 
> would be called directly in these reference classes.
> Considering {{ShuffleService}} might create multiple different 
> {{ResultPartitionWriter}} implementations future, then all the other classes 
> should only reference with the interface and call the common methods. 
> Therefore we extend the related methods in {{ResultPartitionWriter}} 
> interface in order to cover existing logics in {{ResultPartition}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12544) Deadlock while releasing memory and requesting segment concurrent in SpillableSubpartition

2019-06-04 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16855696#comment-16855696
 ] 

zhijiang commented on FLINK-12544:
--

Merged into release-1.8 : 61fa1005adc61b6fc3439fdd127e5f25adea

> Deadlock while releasing memory and requesting segment concurrent in 
> SpillableSubpartition
> --
>
> Key: FLINK-12544
> URL: https://issues.apache.org/jira/browse/FLINK-12544
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.1
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> It is reported by flink user, and the original jstack is as following:
>  
> {code:java}
> // "CoGroup (2/2)":
>     at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
>     - waiting to lock <0x00062bf859b8> (a java.lang.Object)
>     at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
>     at java.lang.Thread.run(Thread.java:745)
> "CoGroup (1/2)":
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
>     - waiting to lock <0x00063fdf4888> (a 
> java.util.ArrayDeque)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>     at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
>     at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
>     at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
>     - locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
>     at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
>     - locked <0x00063c785350> (a java.lang.Object)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
>     at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
>     at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
>     - locked <0x00062bf859b8> (a java.lang.Object)
>     at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
>     at java.lang.Thread.run(Thread.java:745)
> "DataSource  (1/1)":
>     at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)
>     - waiting to lock <0x00063fdf4ac8> (a 
> java.util.ArrayDeque)
>     at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)
>     - locked <0x00063fdf4888> (a java.util.ArrayDeque)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>     at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>  

[jira] [Closed] (FLINK-12544) Deadlock while releasing memory and requesting segment concurrent in SpillableSubpartition

2019-06-04 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang closed FLINK-12544.

   Resolution: Fixed
Fix Version/s: 1.8.1

> Deadlock while releasing memory and requesting segment concurrent in 
> SpillableSubpartition
> --
>
> Key: FLINK-12544
> URL: https://issues.apache.org/jira/browse/FLINK-12544
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.1
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> It is reported by flink user, and the original jstack is as following:
>  
> {code:java}
> // "CoGroup (2/2)":
>     at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
>     - waiting to lock <0x00062bf859b8> (a java.lang.Object)
>     at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
>     at java.lang.Thread.run(Thread.java:745)
> "CoGroup (1/2)":
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
>     - waiting to lock <0x00063fdf4888> (a 
> java.util.ArrayDeque)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>     at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
>     at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
>     at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
>     - locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
>     at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
>     - locked <0x00063c785350> (a java.lang.Object)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
>     at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
>     at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
>     - locked <0x00062bf859b8> (a java.lang.Object)
>     at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
>     at java.lang.Thread.run(Thread.java:745)
> "DataSource  (1/1)":
>     at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)
>     - waiting to lock <0x00063fdf4ac8> (a 
> java.util.ArrayDeque)
>     at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)
>     - locked <0x00063fdf4888> (a java.util.ArrayDeque)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>     at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>     at 
> org.apache.flink.runtime.operators.util.metrics.CountingColl

[jira] [Commented] (FLINK-12201) Introduce InputGateWithMetrics in Task to increment numBytesIn metric

2019-06-04 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1684#comment-1684
 ] 

zhijiang commented on FLINK-12201:
--

master : 3351ca11a366713328a6d0cb0e87100cb8c24200

> Introduce InputGateWithMetrics in Task to increment numBytesIn metric
> -
>
> Key: FLINK-12201
> URL: https://issues.apache.org/jira/browse/FLINK-12201
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Andrey Zagrebin
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Incrementing of numBytesIn metric in SingleInputGate does not depend on 
> shuffle service and can be moved out of network internals into Task. Task 
> could wrap InputGate provided by ShuffleService with InputGateWithMetrics 
> which would increment numBytesIn metric.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12201) Introduce InputGateWithMetrics in Task to increment numBytesIn metric

2019-06-04 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang closed FLINK-12201.

Resolution: Fixed

> Introduce InputGateWithMetrics in Task to increment numBytesIn metric
> -
>
> Key: FLINK-12201
> URL: https://issues.apache.org/jira/browse/FLINK-12201
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Andrey Zagrebin
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Incrementing of numBytesIn metric in SingleInputGate does not depend on 
> shuffle service and can be moved out of network internals into Task. Task 
> could wrap InputGate provided by ShuffleService with InputGateWithMetrics 
> which would increment numBytesIn metric.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-06-03 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16855252#comment-16855252
 ] 

zhijiang edited comment on FLINK-12070 at 6/4/19 2:59 AM:
--

Agree with Stephan and Stefan's opinions.

We could add the batch blocking case for micro benchmark, then it could verify 
the performance changes for every merged commit for flink-1.9.

I also think it is no need to write to mapped region as now. We actually do not 
confirm the mmap regions would be consumed immediately by downstream side after 
producer finishes, because it is up to scheduler decision and whether it has 
enough resource to schedule consumers.

It is necessary to maintain different ways for reading files. Based on my 
previous lucene index experience, it also provides three ways for reading index 
files.
 * Files.newByteChannel for simple way.
 * Java nio FileChannel way.
 * Mmap way for large files in 64bit system, and with more free physical memory 
for mmap as Stefan mentioned.

Then we could compare the behaviors for different ways and also provide more 
choices for users.


was (Author: zjwang):
Agree with Stephan and Stefan's opinions.

We could add the batch blocking case for micro benchmark, then it could verify 
the performance changes for every merged commit for flink-1.9.

I also think it is no need to write to mapped region as now. We actually do not 
confirm the mmap regions would be consumed immediately by downstream side after 
producer finishes, becuase it is up to scheduler decision and whether it has 
enough resource to schedule consumers.

It is necessary to maintain different ways for reading files. Based on my 
previous lucene index experience, it also provides three ways for reading index 
files.
 * Files.newByteChannel for simple way.
 * Java nio FileChannel way.
 * Mmap way for large files in 64bit system, and with more free physical memory 
for mmap as Stefan mentioned.

Then we could compare the behaviors for different ways and also provide more 
choices for users.

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: image-2019-04-18-17-38-24-949.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-06-03 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16855252#comment-16855252
 ] 

zhijiang edited comment on FLINK-12070 at 6/4/19 2:58 AM:
--

Agree with Stephan and Stefan's opinions.

We could add the batch blocking case for micro benchmark, then it could verify 
the performance changes for every merged commit for flink-1.9.

I also think it is no need to write to mapped region as now. We actually do not 
confirm the mmap regions would be consumed immediately by downstream side after 
producer finishes, becuase it is up to scheduler decision and whether it has 
enough resource to schedule consumers.

It is necessary to maintain different ways for reading files. Based on my 
previous lucene index experience, it also provides three ways for reading index 
files.
 * Files.newByteChannel for simple way.
 * Java nio FileChannel way.
 * Mmap way for large files in 64bit system, and with more free physical memory 
for mmap as Stefan mentioned.

Then we could compare the behaviors for different ways and also provide more 
choices for users.


was (Author: zjwang):
Agree with Stephan and Stefan's opinions.

We could add the batch blocking case for micro benchmark, then it could verify 
the performance changes for every merged commit for flink-1.9.

I also think it is no need to write to mapped region as now, because we 
actually do not confirm the mmap regions would be consumed immediately by 
downstream side after producer finishes, and it is up to scheduler decision and 
whether it has enough resource to schedule consumers.

It is necessary to maintain different ways for reading files. Based on my 
previous lucene index experience, it also provides three ways for reading index 
files.
 * Files.newByteChannel for simple way.
 * Java nio FileChannel way.
 * Mmap way for large files in 64bit system, and with more free physical memory 
for mmap as Stefan mentioned.

Then we could compare the behaviors for different ways and also provide more 
choices for users.

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: image-2019-04-18-17-38-24-949.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-06-03 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16855252#comment-16855252
 ] 

zhijiang commented on FLINK-12070:
--

Agree with Stephan and Stefan's opinions.

We could add the batch blocking case for micro benchmark, then it could verify 
the performance changes for every merged commit for flink-1.9.

I also think it is no need to write to mapped region as now, because we 
actually do not confirm the mmap regions would be consumed immediately by 
downstream side after producer finishes, and it is up to scheduler decision and 
whether it has enough resource to schedule consumers.

It is necessary to maintain different ways for reading files. Based on my 
previous lucene index experience, it also provides three ways for reading index 
files.
 * Files.newByteChannel for simple way.
 * Java nio FileChannel way.
 * Mmap way for large files in 64bit system, and with more free physical memory 
for mmap as Stefan mentioned.

Then we could compare the behaviors for different ways and also provide more 
choices for users.

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: image-2019-04-18-17-38-24-949.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12603) Remove getOwningTaskName method from InputGate

2019-06-03 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12603:
-
Summary: Remove getOwningTaskName method from InputGate  (was: Refactor 
InputGate interface to remove unnecessary methods)

> Remove getOwningTaskName method from InputGate
> --
>
> Key: FLINK-12603
> URL: https://issues.apache.org/jira/browse/FLINK-12603
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to make abstract InputGate simple for extending new implementations 
> in shuffle service architecture, we could remove unnecessary methods from it.
> Currently InputGate#getOwningTaskName is only used for debugging log in 
> BarrierBuffer and StreamInputProcessor. This task name could be got from 
> Environment#getTaskInfo and then be passed into the constructor of 
> BarrierBuffer/StreamInputProcessor for use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12603) Refactor InputGate interface to remove unnecessary methods

2019-06-03 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12603:
-
Description: 
In order to make abstract InputGate simple for extending new implementations in 
shuffle service architecture, we could remove unnecessary methods from it.

Currently InputGate#getOwningTaskName is only used for debugging log in 
BarrierBuffer and StreamInputProcessor. This task name could be got from 
Environment#getTaskInfo and then be passed into the constructor of 
BarrierBuffer/StreamInputProcessor for use.

  was:
In order to make abstract InputGate simple for extending new implementations in 
shuffle service architecture, we could remove unnecessary methods from it.

Currently InputGate#getOwningTaskName is only used for debugging log in 
BarrierBuffer and StreamInputProcessor. This task name could be got from 
Environment#getTaskInfo and then be passed into the constructor of 
BarrierBuffer/StreamInputProcessor for use.

InputGate#getPageSize is only used for BarrierBuffer constructor. We could 
remove this method and the page size could also be got form task manager 
configuration instead.


> Refactor InputGate interface to remove unnecessary methods
> --
>
> Key: FLINK-12603
> URL: https://issues.apache.org/jira/browse/FLINK-12603
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to make abstract InputGate simple for extending new implementations 
> in shuffle service architecture, we could remove unnecessary methods from it.
> Currently InputGate#getOwningTaskName is only used for debugging log in 
> BarrierBuffer and StreamInputProcessor. This task name could be got from 
> Environment#getTaskInfo and then be passed into the constructor of 
> BarrierBuffer/StreamInputProcessor for use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12642) OutputBufferPoolUsageGauge can fail with NPE

2019-05-29 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16851462#comment-16851462
 ] 

zhijiang commented on FLINK-12642:
--

Yes, that is the ideal way which we confirmed before to make creation and setup 
together. If the task thread model is already ready, we could try this way.

> OutputBufferPoolUsageGauge can fail with NPE
> 
>
> Key: FLINK-12642
> URL: https://issues.apache.org/jira/browse/FLINK-12642
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Priority: Critical
> Fix For: 1.9.0
>
>
> The result partition metrics are initialized before 
> {{ResultPartitiion#setup}} was called. If a reporter tries to access a 
> {{OutputBufferPoolUsageGauge}} in between it will fail with an NPE since the 
> bufferpool of the partition is still null.
> {code}
> 2019-05-27 14:49:47,031 WARN  
> org.apache.flink.runtime.metrics.MetricRegistryImpl   - Error while 
> reporting metrics
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.getValue(OutputBufferPoolUsageGauge.java:41)
>   at 
> org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.getValue(OutputBufferPoolUsageGauge.java:27)
>   at 
> org.apache.flink.metrics.slf4j.Slf4jReporter.tryReport(Slf4jReporter.java:114)
>   at 
> org.apache.flink.metrics.slf4j.Slf4jReporter.report(Slf4jReporter.java:80)
>   at 
> org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:436)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
>   at 
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>   at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:300)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
>   at java.base/java.lang.Thread.run(Thread.java:844)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12544) Deadlock while releasing memory and requesting segment concurrent in SpillableSubpartition

2019-05-28 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12544:
-
Description: 
It is reported by flink user, and the original jstack is as following:

 
{code:java}
// "CoGroup (2/2)":
    at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
    - waiting to lock <0x00062bf859b8> (a java.lang.Object)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
    at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
    - waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
    at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
    at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
    at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
    at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
    at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
    - locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
    at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
    - locked <0x00063c785350> (a java.lang.Object)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
    at 
org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
    at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
    - locked <0x00062bf859b8> (a java.lang.Object)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
    at java.lang.Thread.run(Thread.java:745)
"DataSource  (1/1)":
    at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)
    - waiting to lock <0x00063fdf4ac8> (a java.util.ArrayDeque)
    at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)
    - locked <0x00063fdf4888> (a java.util.ArrayDeque)
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
    at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
    at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
    at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
    at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
    at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
    at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:745)
{code}
Based on the above stack, it happens in the following scenario:
 * taskA: emit -> requestBufferBuilder -> synchronized in LocalBufferPool -> 
SpillableSubpartition#releaseMemory -> wait for  synchronized in  
SpillableSubpartition
 * submit TaskB:  trigger taskA releaseMemory ->  
SpillableSubpartition#releaseMemory ->  synchronized in  SpillableSubpartition 
->  SpillableSubpartition#spillFinishedBufferConsumers -> bufferConsumer#close 
-> LocalBufferPool#recycle -> wait for synchronized in LocalBufferPo

[jira] [Updated] (FLINK-12544) Deadlock while releasing memory and requesting segment concurrent in SpillableSubpartition

2019-05-28 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12544:
-
Summary: Deadlock while releasing memory and requesting segment concurrent 
in SpillableSubpartition  (was: Deadlock during releasing memory in 
SpillableSubpartition)

> Deadlock while releasing memory and requesting segment concurrent in 
> SpillableSubpartition
> --
>
> Key: FLINK-12544
> URL: https://issues.apache.org/jira/browse/FLINK-12544
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> It is reported by flink user, and the original jstack is as following:
>  
> {code:java}
> // "CoGroup (2/2)":
>     at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
>     - waiting to lock <0x00062bf859b8> (a java.lang.Object)
>     at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
>     at java.lang.Thread.run(Thread.java:745)
> "CoGroup (1/2)":
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
>     - waiting to lock <0x00063fdf4888> (a 
> java.util.ArrayDeque)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>     at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
>     at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
>     at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
>     - locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
>     at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
>     - locked <0x00063c785350> (a java.lang.Object)
>     at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
>     at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
>     at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
>     - locked <0x00062bf859b8> (a java.lang.Object)
>     at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
>     at java.lang.Thread.run(Thread.java:745)
> "DataSource  (1/1)":
>     at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)
>     - waiting to lock <0x00063fdf4ac8> (a 
> java.util.ArrayDeque)
>     at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)
>     - locked <0x00063fdf4888> (a java.util.ArrayDeque)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>     at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.

[jira] [Commented] (FLINK-12642) OutputBufferPoolUsageGauge can fail with NPE

2019-05-28 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16849508#comment-16849508
 ] 

zhijiang commented on FLINK-12642:
--

Thanks for reporting this [~Zentol]!

I think we could check whether the `BufferPool` is null before calling detail 
getter methods in `OutputBufferPoolUsageGauge`.

If you agree I could take it to submit the PR.

> OutputBufferPoolUsageGauge can fail with NPE
> 
>
> Key: FLINK-12642
> URL: https://issues.apache.org/jira/browse/FLINK-12642
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Priority: Critical
> Fix For: 1.9.0
>
>
> The result partition metrics are initialized before 
> {{ResultPartitiion#setup}} was called. If a reporter tries to access a 
> {{OutputBufferPoolUsageGauge}} in between it will fail with an NPE since the 
> bufferpool of the partition is still null.
> {code}
> 2019-05-27 14:49:47,031 WARN  
> org.apache.flink.runtime.metrics.MetricRegistryImpl   - Error while 
> reporting metrics
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.getValue(OutputBufferPoolUsageGauge.java:41)
>   at 
> org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.getValue(OutputBufferPoolUsageGauge.java:27)
>   at 
> org.apache.flink.metrics.slf4j.Slf4jReporter.tryReport(Slf4jReporter.java:114)
>   at 
> org.apache.flink.metrics.slf4j.Slf4jReporter.report(Slf4jReporter.java:80)
>   at 
> org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:436)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
>   at 
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>   at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:300)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
>   at java.base/java.lang.Thread.run(Thread.java:844)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12630) Refactor abstract InputGate to general interface

2019-05-27 Thread zhijiang (JIRA)
zhijiang created FLINK-12630:


 Summary: Refactor abstract InputGate to general interface
 Key: FLINK-12630
 URL: https://issues.apache.org/jira/browse/FLINK-12630
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


`InputGate` is currently defined as an abstract class which extracts the common 
codes for checking data availability for subclasses `SingleInputGate` and 
`UnionInputGate`, but it might bring limits for further extending `InputGate` 
implementations in shuffle service architecture.

`SingleInputGate` is created from shuffle service so it belongs to the scope of 
shuffle service, while `UnionInputGate` is a wrapper of some `SingleInputGate`s 
so it should be in the task/processor stack.

In order to make a new `InputGate` implementation from another new shuffle 
service could be directly pitched in, we should define a more clean `InputGate` 
interface to decouple the implementation of checking data available logic. In 
detail we could define the `isAvailable` method in `InputGate` interface and 
extract the current implementation as a separate class 
`FutureBasedAvailability` which could still be extent and reused for both 
`SingleInputGate` and `UnionInputGate`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12201) Introduce InputGateWithMetrics in Task to increment numBytesIn metric

2019-05-24 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12201:
-
Description: Incrementing of numBytesIn metric in SingleInputGate does not 
depend on shuffle service and can be moved out of network internals into Task. 
Task could wrap InputGate provided by ShuffleService with InputGateWithMetrics 
which would increment numBytesIn metric.  (was: Incrementing of in/out byte 
metrics in local/remote input channel does not depend on shuffle service and 
can be moved out of network internals into Task. Task could wrap InputGate 
provided by ShuffleService with InputGateWithMetrics which would increment 
in/out byte metrics.)

> Introduce InputGateWithMetrics in Task to increment numBytesIn metric
> -
>
> Key: FLINK-12201
> URL: https://issues.apache.org/jira/browse/FLINK-12201
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Andrey Zagrebin
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Incrementing of numBytesIn metric in SingleInputGate does not depend on 
> shuffle service and can be moved out of network internals into Task. Task 
> could wrap InputGate provided by ShuffleService with InputGateWithMetrics 
> which would increment numBytesIn metric.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12201) Introduce InputGateWithMetrics in Task to increment numBytesIn metric

2019-05-24 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12201:
-
Summary: Introduce InputGateWithMetrics in Task to increment numBytesIn 
metric  (was: Introduce InputGateWithMetrics in Task to increment in/out byte 
metrics)

> Introduce InputGateWithMetrics in Task to increment numBytesIn metric
> -
>
> Key: FLINK-12201
> URL: https://issues.apache.org/jira/browse/FLINK-12201
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Andrey Zagrebin
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Incrementing of in/out byte metrics in local/remote input channel does not 
> depend on shuffle service and can be moved out of network internals into 
> Task. Task could wrap InputGate provided by ShuffleService with 
> InputGateWithMetrics which would increment in/out byte metrics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12603) Refactor InputGate interface to remove unnecessary methods

2019-05-23 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12603:
-
Description: 
In order to make abstract InputGate simple for extending new implementations in 
shuffle service architecture, we could remove unnecessary methods from it.

Currently InputGate#getOwningTaskName is only used for debugging log in 
BarrierBuffer and StreamInputProcessor. This task name could be got from 
Environment#getTaskInfo and then be passed into the constructor of 
BarrierBuffer/StreamInputProcessor for use.

InputGate#getPageSize is only used for BarrierBuffer constructor. We could 
remove this method and the page size could also be got form task manager 
configuration instead.

  was:
Current `InputGate#getOwningTaskName` is only used for logging in related 
components such as `BarrierBuffer`, `StreamInputProcessor`, etc. We could put 
this name in the structure of `TaskInfo`,  then the related components could 
get task name directly from `RuntimeEnvironment#getTaskInfo`.

To do so, we could simplify the interface of `InputGate`.


> Refactor InputGate interface to remove unnecessary methods
> --
>
> Key: FLINK-12603
> URL: https://issues.apache.org/jira/browse/FLINK-12603
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to make abstract InputGate simple for extending new implementations 
> in shuffle service architecture, we could remove unnecessary methods from it.
> Currently InputGate#getOwningTaskName is only used for debugging log in 
> BarrierBuffer and StreamInputProcessor. This task name could be got from 
> Environment#getTaskInfo and then be passed into the constructor of 
> BarrierBuffer/StreamInputProcessor for use.
> InputGate#getPageSize is only used for BarrierBuffer constructor. We could 
> remove this method and the page size could also be got form task manager 
> configuration instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12603) Refactor InputGate interface to remove unnecessary methods

2019-05-23 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12603:
-
Summary: Refactor InputGate interface to remove unnecessary methods  (was: 
Remove getOwningTaskName method from InputGate)

> Refactor InputGate interface to remove unnecessary methods
> --
>
> Key: FLINK-12603
> URL: https://issues.apache.org/jira/browse/FLINK-12603
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Current `InputGate#getOwningTaskName` is only used for logging in related 
> components such as `BarrierBuffer`, `StreamInputProcessor`, etc. We could put 
> this name in the structure of `TaskInfo`,  then the related components could 
> get task name directly from `RuntimeEnvironment#getTaskInfo`.
> To do so, we could simplify the interface of `InputGate`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12603) Remove getOwningTaskName method from InputGate

2019-05-23 Thread zhijiang (JIRA)
zhijiang created FLINK-12603:


 Summary: Remove getOwningTaskName method from InputGate
 Key: FLINK-12603
 URL: https://issues.apache.org/jira/browse/FLINK-12603
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Current `InputGate#getOwningTaskName` is only used for logging in related 
components such as `BarrierBuffer`, `StreamInputProcessor`, etc. We could put 
this name in the structure of `TaskInfo`,  then the related components could 
get task name directly from `RuntimeEnvironment#getTaskInfo`.

To do so, we could simplify the interface of `InputGate`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6227) Introduce the abstract PartitionException for downstream task failure

2019-05-22 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-6227:

Summary: Introduce the abstract PartitionException for downstream task 
failure  (was: Introduce the DataConsumptionException for downstream task 
failure)

> Introduce the abstract PartitionException for downstream task failure
> -
>
> Key: FLINK-6227
> URL: https://issues.apache.org/jira/browse/FLINK-6227
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> It is part of FLIP-1.
>  
> We define a new abstract special exception called PartitionException to 
> indicate the downstream task failure in consuming upstream partition. This 
> special exception is reported to JobMaster as a hint for unavailable 
> partition data, and then JobMaster could track the upstream executions to 
> restart for reproducing the partition data.
> We make current PartitionNotFoundException extend PartitionException, because 
> it is reasonable to restart producer task when the requested partition is 
> already released.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6227) Introduce the DataConsumptionException for downstream task failure

2019-05-22 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-6227:

Description: 
It is part of FLIP-1.

 

We define a new abstract special exception called PartitionException to 
indicate the downstream task failure in consuming upstream partition. This 
special exception is reported to JobMaster as a hint for unavailable partition 
data, and then JobMaster could track the upstream executions to restart for 
reproducing the partition data.

We make current PartitionNotFoundException extend PartitionException, because 
it is reasonable to restart producer task when the requested partition is 
already released.

  was:
It is part of FLIP-1.

We define a new special exception to indicate the downstream task failure in 
consuming upstream data. 

The {{JobManager}} will receive and consider this special exception to 
calculate the minimum connected sub-graph which is called {{FailoverRegion}}. 
So the {{DataConsumptionException}} should contain {{ResultPartitionID}} 
information for {{JobMaster}} tracking upstream executions.




> Introduce the DataConsumptionException for downstream task failure
> --
>
> Key: FLINK-6227
> URL: https://issues.apache.org/jira/browse/FLINK-6227
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> It is part of FLIP-1.
>  
> We define a new abstract special exception called PartitionException to 
> indicate the downstream task failure in consuming upstream partition. This 
> special exception is reported to JobMaster as a hint for unavailable 
> partition data, and then JobMaster could track the upstream executions to 
> restart for reproducing the partition data.
> We make current PartitionNotFoundException extend PartitionException, because 
> it is reasonable to restart producer task when the requested partition is 
> already released.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12458) Introduce PartitionConnectionException for unreachable producer

2019-05-21 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12458:
-
Description: If the consumer can not establish a connection to remote task 
executor, which might indicate the remote task executor is not reachable. We 
could wrap this connection exception into new proposed 
`PartitionConnectionException` which also extends `PartitionException`, then 
the job master would decide whether to restart the upstream region to 
re-producer partition data.  (was: If the consumer can not establish a 
connection to remote task executor, which might indicate the remote task 
executor is not reachable. We could wrap this connection exception into 
existing `PartitionNotFoundException`, then the job master would decide whether 
to restart the upstream region to re-producer partition data.)

> Introduce PartitionConnectionException for unreachable producer
> ---
>
> Key: FLINK-12458
> URL: https://issues.apache.org/jira/browse/FLINK-12458
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> If the consumer can not establish a connection to remote task executor, which 
> might indicate the remote task executor is not reachable. We could wrap this 
> connection exception into new proposed `PartitionConnectionException` which 
> also extends `PartitionException`, then the job master would decide whether 
> to restart the upstream region to re-producer partition data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12458) Introduce PartitionConnectionException for unreachable producer

2019-05-21 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12458:
-
Summary: Introduce PartitionConnectionException for unreachable producer  
(was: Throw PartitionNotFoundException if consumer can not establish a 
connection to remote TM)

> Introduce PartitionConnectionException for unreachable producer
> ---
>
> Key: FLINK-12458
> URL: https://issues.apache.org/jira/browse/FLINK-12458
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> If the consumer can not establish a connection to remote task executor, which 
> might indicate the remote task executor is not reachable. We could wrap this 
> connection exception into existing `PartitionNotFoundException`, then the job 
> master would decide whether to restart the upstream region to re-producer 
> partition data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12571) Make NetworkEnvironment#start() return the binded data port

2019-05-21 Thread zhijiang (JIRA)
zhijiang created FLINK-12571:


 Summary: Make NetworkEnvironment#start() return the binded data 
port
 Key: FLINK-12571
 URL: https://issues.apache.org/jira/browse/FLINK-12571
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently `NetworkEnvironment#getConnectionManager()` is mainly used for 
`TaskManagerServices` for getting binded data port from 
`NettyConnectionManager`. Considering the `ConnectionManager` as an internal 
component of `NetworkEnvironment`, it should not be exposed for outsides. For 
other ShuffleService implementations, it might have no `ConnectionManager` at 
all.

We could make `ShuffleService#start()` return the binded data port to replace 
the `getConnectionManager`. For the `LocalConnectionManager` or other shuffle 
service implementations which have no binded data port, it could return a 
simple default value and it would have no harm.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10653) Introduce Pluggable Shuffle Manager Architecture

2019-05-21 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16844642#comment-16844642
 ] 

zhijiang commented on FLINK-10653:
--

Thanks for the concerns. [~prudenko]

Actually we ever also researched the RDMA shuffle manager in Spark and the 
performance seems really increased much. I think after the basic architecture 
of shuffle manager in flink is ready, it could be feasible for expanding other 
shuffle implementations like RDMA. We could keep in touch and see then. :)

> Introduce Pluggable Shuffle Manager Architecture
> 
>
> Key: FLINK-10653
> URL: https://issues.apache.org/jira/browse/FLINK-10653
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> This is the umbrella issue for improving shuffle architecture.
> Shuffle is the process of data transfer between stages, which involves in 
> writing outputs on sender side and reading data on receiver side. In flink 
> implementation, it covers three parts of writer, transport layer and reader 
> separately which are uniformed for both streaming and batch jobs.
> In detail, the current ResultPartitionWriter interface on upstream side only 
> supports in-memory outputs for streaming job and local persistent file 
> outputs for batch job. If we extend to implement another writer such as 
> DfsWriter, RdmaWriter, SortMergeWriter, etc based on ResultPartitionWriter 
> interface, it has not the unified mechanism to extend the reader side 
> accordingly. 
> In order to make the shuffle architecture more flexible and support more 
> scenarios especially for batch jobs, a high level shuffle architecture is 
> necessary to manage and extend both writer and reader sides together.
> Refer to the design doc for more details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12564) Remove getBufferProvider method from ResultPartitionWriter interface

2019-05-20 Thread zhijiang (JIRA)
zhijiang created FLINK-12564:


 Summary: Remove getBufferProvider method from 
ResultPartitionWriter interface
 Key: FLINK-12564
 URL: https://issues.apache.org/jira/browse/FLINK-12564
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently `ResultPartitionWriter#getBufferProvider` is used for requesting 
`BufferBuilder` in `RecordWriter`, then the `BufferConsumer` created from 
`BufferBuilder` is added into `ResultPartitionWriter` via `addBufferConsumer` 
method.

We could merge these two methods in `ResultPartitionWriter` in order not to 
expose `getBufferProvider`. `ResultPartitionWriter` could internally request 
`BufferBuilder` and  add the created `BufferConsumer` into one sub partition, 
then return the `BufferBuilder` for `RecordWriter` writing serialized data.

Since we also change the `ResultPartitionWriter#addBufferConsumer` to 
`ResultPartitionWriter#requestBufferBuilder`, then another new method 
`ResultPartitionWriter#broadcastEvents` should be introduced for handling the 
case of events.

In future it might worth further abstracting the `ResultPartitionWriter` to be 
not only related to  `BufferBuilder`. We could provide `writeRecord(int 
targetIndex)` to replace `requestBufferBuilder`, then the serialization process 
could be done inside specific `ResultPartitionWriter` instance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12544) Deadlock during releasing memory in SpillableSubpartition

2019-05-17 Thread zhijiang (JIRA)
zhijiang created FLINK-12544:


 Summary: Deadlock during releasing memory in SpillableSubpartition
 Key: FLINK-12544
 URL: https://issues.apache.org/jira/browse/FLINK-12544
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


It is reported by flink user, and the original jstack is as following:

 
{code:java}
// "CoGroup (2/2)":
    at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
    - waiting to lock <0x00062bf859b8> (a java.lang.Object)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
    at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
    - waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
    at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
    at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
    at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
    at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
    at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
    - locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
    at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
    - locked <0x00063c785350> (a java.lang.Object)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
    at 
org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
    at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
    - locked <0x00062bf859b8> (a java.lang.Object)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
    at java.lang.Thread.run(Thread.java:745)
"DataSource  (1/1)":
    at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)
    - waiting to lock <0x00063fdf4ac8> (a java.util.ArrayDeque)
    at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)
    - locked <0x00063fdf4888> (a java.util.ArrayDeque)
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
    at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
    at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
    at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
    at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
    at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
    at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:745)
{code}
Based on the analysis, it happens when one task A is trying to release sub 
partition memory, then it occupies the lock in `LocalBufferPool`, and trying to 
get the lock in `SpillSubpartition`. Meanwhile, another task B is submitted to 
TM to trigger previous task to release memory, then it would already occu

[jira] [Commented] (FLINK-12202) Consider introducing batch metric register in NetworkEnviroment

2019-05-14 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839963#comment-16839963
 ] 

zhijiang commented on FLINK-12202:
--

Hey [~azagrebin], as we confirmed before, this ticket is not needed because we 
would register metrics while creating batch of partitions/gates. So should we 
close this jira?

> Consider introducing batch metric register in NetworkEnviroment
> ---
>
> Key: FLINK-12202
> URL: https://issues.apache.org/jira/browse/FLINK-12202
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / Network
>Reporter: Andrey Zagrebin
>Assignee: zhijiang
>Priority: Major
>
> As we have some network specific metrics registered in TaskIOMetricGroup 
> (In/OutputBuffersGauge, In/OutputBufferPoolUsageGauge), we can introduce 
> batch metric registering in 
> NetworkEnviroment.registerMetrics(ProxyMetricGroup, partitions, gates), where 
> task passes its TaskIOMetricGroup into ProxyMetricGroup. This way we could 
> break a tie between task and NetworkEnviroment. 
> TaskIOMetricGroup.initializeBufferMetrics, In/OutputBuffersGauge, 
> In/OutputBufferPoolUsageGauge could be moved into 
> NetworkEnviroment.registerMetrics and network code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12474) UnionInputGate should be notified when closing SingleInputGate by canceler thread

2019-05-14 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang closed FLINK-12474.

Resolution: Invalid

> UnionInputGate should be notified when closing SingleInputGate by canceler 
> thread
> -
>
> Key: FLINK-12474
> URL: https://issues.apache.org/jira/browse/FLINK-12474
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Minor
>
> If task is being canceled, the `SingleInputGate` would be closed by canceler 
> thread. If the `SingleInputGate` is waiting for buffer, 
> `inputChannelWithData` would be notified to wake task thread to exit early. 
> But if the `UnionInputGate` is waiting for buffer, task thread is still 
> stucking in wait when `SingleInputGate` is closed until cancel timeout.
> To make task exit early in this case, we could make `SingleInputGate` further 
> notify `UnionInputGate` after it is closed, then it could also wake task 
> thread to exit during canceling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12474) UnionInputGate should be notified when closing SingleInputGate by canceler thread

2019-05-14 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839298#comment-16839298
 ] 

zhijiang commented on FLINK-12474:
--

Yes, you are right after double checking the codes.

Because the `SingleInputGate#close` would call 
`inputChannelsWithData.notifyAll()` which could wake task thread, but I did not 
find the similar logic in `UnionInputGate`, so it brought my doubts. The 
`executor.interrupt()` could handle this case, then it seems unnecessary to 
call `inputChannelsWithData.notifyAll()` in `SingleInputGate#close`.

I should double review the whole process. Sorry for the disturbing and I will 
close this jira.

> UnionInputGate should be notified when closing SingleInputGate by canceler 
> thread
> -
>
> Key: FLINK-12474
> URL: https://issues.apache.org/jira/browse/FLINK-12474
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Minor
>
> If task is being canceled, the `SingleInputGate` would be closed by canceler 
> thread. If the `SingleInputGate` is waiting for buffer, 
> `inputChannelWithData` would be notified to wake task thread to exit early. 
> But if the `UnionInputGate` is waiting for buffer, task thread is still 
> stucking in wait when `SingleInputGate` is closed until cancel timeout.
> To make task exit early in this case, we could make `SingleInputGate` further 
> notify `UnionInputGate` after it is closed, then it could also wake task 
> thread to exit during canceling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12474) UnionInputGate should be notified when closing SingleInputGate by canceler thread

2019-05-14 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839230#comment-16839230
 ] 

zhijiang commented on FLINK-12474:
--

[~pnowojski] thanks for the confirmation!

Yes, the `SingleInputGate` would be notified by canceler thread after calling 
`SingleInputGate#close()`. But for the case of `UnionInputGate` if the task is 
blocked by `wait()` because of  no available data, I think it has to be wait 
the canceling timeout or I missed other parts to unblock task thread. 

Anyway you could solve this issue in `CheckpointBarrierHandler`, then I could 
close this jira. :)

> UnionInputGate should be notified when closing SingleInputGate by canceler 
> thread
> -
>
> Key: FLINK-12474
> URL: https://issues.apache.org/jira/browse/FLINK-12474
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Minor
>
> If task is being canceled, the `SingleInputGate` would be closed by canceler 
> thread. If the `SingleInputGate` is waiting for buffer, 
> `inputChannelWithData` would be notified to wake task thread to exit early. 
> But if the `UnionInputGate` is waiting for buffer, task thread is still 
> stucking in wait when `SingleInputGate` is closed until cancel timeout.
> To make task exit early in this case, we could make `SingleInputGate` further 
> notify `UnionInputGate` after it is closed, then it could also wake task 
> thread to exit during canceling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12497) Refactor the start method of ConnectionManager

2019-05-13 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12497:
-
Description: 
In current {{ConnectionManager#start(ResultPartitionProvider, 
TaskEventDispatcher)}}, the parameters in start are only reasonable for 
{{NettyConnectionManager}} implementation, reductant for 
{{LocalConnectionManager}}. 

We could put these parameters in the constructor of {{NettyConnectionManager}}, 
then {{ConnectionManager#start()}} would be more cleaner for both 
implementations. And it could also bring benefits for calling start in 
{{NetworkEnvironment}} which does not need to maintain private 
{{TaskEventDispatcher}} in future.

  was:
In current `ConnectionManager#start(ResultPartitionProvider, 
TaskEventDispatcher)`, the parameters in start are only reasonable for 
`NettyConnectionManager` implementation, reductant for 
`LocalConnectionManager`. 

We could put these parameters in the constructor of `NettyConnectionManager`, 
then `ConnectionManager#start()` would be more cleaner for both 
implementations. And it also bring benefits for calling start in 
`NetworkEnvironment` which does not need to maintain private 
`TaskEventDispatcher`.


> Refactor the start method of ConnectionManager
> --
>
> Key: FLINK-12497
> URL: https://issues.apache.org/jira/browse/FLINK-12497
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In current {{ConnectionManager#start(ResultPartitionProvider, 
> TaskEventDispatcher)}}, the parameters in start are only reasonable for 
> {{NettyConnectionManager}} implementation, reductant for 
> {{LocalConnectionManager}}. 
> We could put these parameters in the constructor of 
> {{NettyConnectionManager}}, then {{ConnectionManager#start()}} would be more 
> cleaner for both implementations. And it could also bring benefits for 
> calling start in {{NetworkEnvironment}} which does not need to maintain 
> private {{TaskEventDispatcher}} in future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12497) Refactor the start method of ConnectionManager

2019-05-13 Thread zhijiang (JIRA)
zhijiang created FLINK-12497:


 Summary: Refactor the start method of ConnectionManager
 Key: FLINK-12497
 URL: https://issues.apache.org/jira/browse/FLINK-12497
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


In current `ConnectionManager#start(ResultPartitionProvider, 
TaskEventDispatcher)`, the parameters in start are only reasonable for 
`NettyConnectionManager` implementation, reductant for 
`LocalConnectionManager`. 

We could put these parameters in the constructor of `NettyConnectionManager`, 
then `ConnectionManager#start()` would be more cleaner for both 
implementations. And it also bring benefits for calling start in 
`NetworkEnvironment` which does not need to maintain private 
`TaskEventDispatcher`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11403) Introduce ResultPartitionWithConsumableNotifier in task for notifying consumable result partition

2019-05-10 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-11403:
-
Summary:  Introduce ResultPartitionWithConsumableNotifier in task for 
notifying consumable result partition  (was: Remove 
ResultPartitionConsumableNotifier from ResultPartition)

>  Introduce ResultPartitionWithConsumableNotifier in task for notifying 
> consumable result partition
> --
>
> Key: FLINK-11403
> URL: https://issues.apache.org/jira/browse/FLINK-11403
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The creation of {{ResultPartitionWriter}} from {{NetworkEnvironment}} relies 
> on {{TaskAction}}, {{ResultPartitionConsumableNotifier}}. For breaking this 
> tie, {{ResultPartitionWithConsumableNotifier}} is introduced for wrapping the 
> logic of notification.
> In this way the later interface method 
> {{ShuffleService#createResultPartitionWriter}} would be simple.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11403) Remove ResultPartitionConsumableNotifier from ResultPartition

2019-05-10 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-11403:
-
Description: 
The creation of {{ResultPartitionWriter}} from {{NetworkEnvironment}} relies on 
{{TaskAction}}, {{ResultPartitionConsumableNotifier}}. For breaking this tie, 
{{ResultPartitionWithConsumableNotifier}} is introduced for wrapping the logic 
of notification.

In this way the later interface method 
{{ShuffleService#createResultPartitionWriter}} would be simple.

  was:
This is the precondition for introducing pluggable {{ShuffleService}} on TM 
side.

In current process of creating {{ResultPartition}}, the 
{{ResultPartitionConsumableNotifier}} regarded as TM level component has to be 
passed into the constructor. In order to create {{ResultPartition}} easily from 
{{ShuffleService}}, the required information should be covered by 
{{ResultPartitionDeploymentDescriptor}} as much as possible, then we could 
remove this notifier from the constructor. And it is also reasonable for 
notifying consumable partition via {{TaskActions}} which is already covered in 
{{ResultPartition}}.


> Remove ResultPartitionConsumableNotifier from ResultPartition
> -
>
> Key: FLINK-11403
> URL: https://issues.apache.org/jira/browse/FLINK-11403
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The creation of {{ResultPartitionWriter}} from {{NetworkEnvironment}} relies 
> on {{TaskAction}}, {{ResultPartitionConsumableNotifier}}. For breaking this 
> tie, {{ResultPartitionWithConsumableNotifier}} is introduced for wrapping the 
> logic of notification.
> In this way the later interface method 
> {{ShuffleService#createResultPartitionWriter}} would be simple.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12474) UnionInputGate should be notified when closing SingleInputGate by canceler thread

2019-05-10 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836964#comment-16836964
 ] 

zhijiang commented on FLINK-12474:
--

[~pnowojski], do you think this jira is worth fixing or my understanding is 
right?

> UnionInputGate should be notified when closing SingleInputGate by canceler 
> thread
> -
>
> Key: FLINK-12474
> URL: https://issues.apache.org/jira/browse/FLINK-12474
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Minor
>
> If task is being canceled, the `SingleInputGate` would be closed by canceler 
> thread. If the `SingleInputGate` is waiting for buffer, 
> `inputChannelWithData` would be notified to wake task thread to exit early. 
> But if the `UnionInputGate` is waiting for buffer, task thread is still 
> stucking in wait when `SingleInputGate` is closed until cancel timeout.
> To make task exit early in this case, we could make `SingleInputGate` further 
> notify `UnionInputGate` after it is closed, then it could also wake task 
> thread to exit during canceling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12474) UnionInputGate should be notified when closing SingleInputGate by canceler thread

2019-05-10 Thread zhijiang (JIRA)
zhijiang created FLINK-12474:


 Summary: UnionInputGate should be notified when closing 
SingleInputGate by canceler thread
 Key: FLINK-12474
 URL: https://issues.apache.org/jira/browse/FLINK-12474
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: zhijiang


If task is being canceled, the `SingleInputGate` would be closed by canceler 
thread. If the `SingleInputGate` is waiting for buffer, `inputChannelWithData` 
would be notified to wake task thread to exit early. But if the 
`UnionInputGate` is waiting for buffer, task thread is still stucking in wait 
when `SingleInputGate` is closed until cancel timeout.

To make task exit early in this case, we could make `SingleInputGate` further 
notify `UnionInputGate` after it is closed, then it could also wake task thread 
to exit during canceling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12458) Throw PartitionNotFoundException if consumer can not establish a connection to remote TM

2019-05-09 Thread zhijiang (JIRA)
zhijiang created FLINK-12458:


 Summary: Throw PartitionNotFoundException if consumer can not 
establish a connection to remote TM
 Key: FLINK-12458
 URL: https://issues.apache.org/jira/browse/FLINK-12458
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


If the consumer can not establish a connection to remote task executor, which 
might indicate the remote task executor is not reachable. We could wrap this 
connection exception into existing `PartitionNotFoundException`, then the job 
master would decide whether to restart the upstream region to re-producer 
partition data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12331) Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-04-25 Thread zhijiang (JIRA)
zhijiang created FLINK-12331:


 Summary: Introduce partition/gate setup to decouple task 
registration with NetworkEnvironment
 Key: FLINK-12331
 URL: https://issues.apache.org/jira/browse/FLINK-12331
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang
 Fix For: 1.9.0


In order to decouple task with {{NetworkEnvironment}} completely, we introduce 
interface methods for {{InputGate#setup}} and {{ResultPartitionWriter#setup}}. 
Then the task could call the {{setup}} method for the created partitions/gates 
directly instead of calling current {{registerTask}} via {{NetworkEnvironment}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6227) Introduce the DataConsumptionException for downstream task failure

2019-04-22 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-6227:

Component/s: Runtime / Coordination

> Introduce the DataConsumptionException for downstream task failure
> --
>
> Key: FLINK-6227
> URL: https://issues.apache.org/jira/browse/FLINK-6227
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> It is part of FLIP-1.
> We define a new special exception to indicate the downstream task failure in 
> consuming upstream data. 
> The {{JobManager}} will receive and consider this special exception to 
> calculate the minimum connected sub-graph which is called {{FailoverRegion}}. 
> So the {{DataConsumptionException}} should contain {{ResultPartitionID}} 
> information for {{JobMaster}} tracking upstream executions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12213) Pass TaskManagerMetricGroup into constructor of NetworkEnvironment

2019-04-16 Thread zhijiang (JIRA)
zhijiang created FLINK-12213:


 Summary: Pass TaskManagerMetricGroup into constructor of 
NetworkEnvironment
 Key: FLINK-12213
 URL: https://issues.apache.org/jira/browse/FLINK-12213
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang
 Fix For: 1.9.0


At the moment {{NetworkEnvironment#getNetworkBufferPool}} is called to add 
network related {{MetricGroup}}. In order to simplify the public API in 
{{NetworkEnvironment}} which is regarded as default {{ShuffleService}} 
implementation, we could pass the {{TaskManagerMetricGroup}} into constructor 
of {{NetworkEnvironment}}, then the related network {{MetricGroup}} could be 
added internally. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-04-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16818619#comment-16818619
 ] 

zhijiang edited comment on FLINK-12070 at 4/16/19 4:25 AM:
---

Thanks for providing the POC for this feature [~StephanEwen]

I really approve the ways of making {{spillableSubpartition}} simple write to 
persistent file directly and make use of OS cache efficiently via mmap. 
Although the previous behavior of intermediate memory state for 
{{SpillableSubpartition}} could bring performance benefits in small-scale data, 
it makes the logic more complex to maintain, even other tasks might be impacted 
when it evicts to disk file to release memory segment. And the partition data 
is not determined considering the FLIP-1 requirements. These points are also my 
previous concerns and planning to be focused. 

Considering the issue of caching intermediate results, we ever implemented the 
similar function in previous Blink version, but finally it is abandoned for 
some reasons. Actually this issue is valuable in production, because it can 
both satisfy the speed of memory shuffle like streaming pipelined way and also 
consider the failover requirements via asynchronous eviction to disk. The 
mmappartition might bring more possibilities for this issue.

BTW, what is your plan for making this POC into practice? Do you need hands for 
supplementing the unit tests and verifying benchmarks? The partition release 
issue after consumption could be considered together in the proposal of 
partition lifecycle management.


was (Author: zjwang):
Thanks for providing the POC for this feature [~StephanEwen]

I really approve the ways of making {{spillableSubpartition}} simple write to 
persistent file directly and make use of OS cache efficiently via mmap. 
Although the previous behavior of intermediate memory state for 
{{SpillableSubpartition}} could bring performance benefits in small-scale data, 
it makes the logic more complex to maintain, even other tasks might be impacted 
when it evicts to disk file to release memory segment. And the partition data 
is not determined considering the FLIP-1 requirements. These points are also my 
previous concerns and planning to be focused. 

Considering the issue of caching intermediate results, we ever implemented the 
similar function in previous Blink version, but finally it is abandoned for 
some reasons. Actually this issue is valuable in production, because it can 
both satisfy the speed of memory shuffle like streaming pipelined way and also 
consider the failover requirements via asynchronous eviction to disk. The 
mmappartition might bring more possibilities for this issue.

BTW, what is your plan for making this POC into practice? Do you need hands for 
supplementing the unit tests and verifying benchmarks? The partition release 
issue after consumption could be considered together in the proposal of 
partition lifecycle management before.

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Till Rohrmann
>Assignee: BoWang
>Priority: Major
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-04-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16818619#comment-16818619
 ] 

zhijiang edited comment on FLINK-12070 at 4/16/19 4:24 AM:
---

Thanks for providing the POC for this feature [~StephanEwen]

I really approve the ways of making {{spillableSubpartition}} simple write to 
persistent file directly and make use of OS cache efficiently via mmap. 
Although the previous behavior of intermediate memory state for 
{{SpillableSubpartition}} could bring performance benefits in small-scale data, 
it makes the logic more complex to maintain, even other tasks might be impacted 
when it evicts to disk file to release memory segment. And the partition data 
is not determined considering the FLIP-1 requirements. These points are also my 
previous concerns and planning to be focused. 

Considering the issue of caching intermediate results, we ever implemented the 
similar function in previous Blink version, but finally it is abandoned for 
some reasons. Actually this issue is valuable in production, because it can 
both satisfy the speed of memory shuffle like streaming pipelined way and also 
consider the failover requirements via asynchronous eviction to disk. The 
mmappartition might bring more possibilities for this issue.

BTW, what is your plan for making this POC into practice? Do you need hands for 
supplementing the unit tests and verifying benchmarks? The partition release 
issue after consumption could be considered together in the proposal of 
partition lifecycle management before.


was (Author: zjwang):
Thanks for providing the POC for this feature [~StephanEwen]

I really approve the ways of making {{spillableSubpartition}} simple write to 
persistent file directly and make use of OS cache efficiently via mmap. 
Although the previous behavior of intermediate memory state for 
{{SpillableSubpartition}} could bring performance benefits in small-scale data, 
it makes the logic more complex to maintain, even other tasks might be impacted 
when it evicts to disk file to release memory segment. And the partition data 
is not determined considering the FLIP-1 requirements. These points are also my 
previous concerns and planning to be focused. 

Considering the issue of caching intermediate results, we also implement the 
similar function in previous Blink version before, but finally it is abandoned 
for some reasons. Actually this issue is valuable in production, because it can 
both satisfy the speed of memory shuffle like streaming pipelined way and also 
consider the failover requirements via asynchronous eviction to disk. The 
mmappartition might bring more possibilities for this issue.

BTW, what is your plan for making this POC into practice? Do you need hands for 
supplementing the unit tests and verifying benchmarks? The partition release 
issue after consumption could be considered together in the proposal of 
partition lifecycle management before.

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Till Rohrmann
>Assignee: BoWang
>Priority: Major
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-04-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16818619#comment-16818619
 ] 

zhijiang commented on FLINK-12070:
--

Thanks for providing the POC for this feature [~StephanEwen]

I really approve the ways of making {{spillableSubpartition}} simple write to 
persistent file directly and make use of OS cache efficiently via mmap. 
Although the previous behavior of intermediate memory state for 
{{SpillableSubpartition}} could bring performance benefits in small-scale data, 
it makes the logic more complex to maintain, even other tasks might be impacted 
when it evicts to disk file to release memory segment. And the partition data 
is not determined considering the FLIP-1 requirements. These points are also my 
previous concerns and planning to be focused. 

Considering the issue of caching intermediate results, we also implement the 
similar function in previous Blink version before, but finally it is abandoned 
for some reasons. Actually this issue is valuable in production, because it can 
both satisfy the speed of memory shuffle like streaming pipelined way and also 
consider the failover requirements via asynchronous eviction to disk. The 
mmappartition might bring more possibilities for this issue.

BTW, what is your plan for making this POC into practice? Do you need hands for 
supplementing the unit tests and verifying benchmarks? The partition release 
issue after consumption could be considered together in the proposal of 
partition lifecycle management before.

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Till Rohrmann
>Assignee: BoWang
>Priority: Major
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6227) Introduce the DataConsumptionException for downstream task failure

2019-04-15 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16818018#comment-16818018
 ] 

zhijiang commented on FLINK-6227:
-

Ok, we would focus on the implementation soon.

Also we could fully consider the related parts besides FLINK-10289. :)

> Introduce the DataConsumptionException for downstream task failure
> --
>
> Key: FLINK-6227
> URL: https://issues.apache.org/jira/browse/FLINK-6227
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> It is part of FLIP-1.
> We define a new special exception to indicate the downstream task failure in 
> consuming upstream data. 
> The {{JobManager}} will receive and consider this special exception to 
> calculate the minimum connected sub-graph which is called {{FailoverRegion}}. 
> So the {{DataConsumptionException}} should contain {{ResultPartitionID}} 
> information for {{JobMaster}} tracking upstream executions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12154) Remove legacy fields for SingleInputGate

2019-04-10 Thread zhijiang (JIRA)
zhijiang created FLINK-12154:


 Summary: Remove legacy fields for SingleInputGate
 Key: FLINK-12154
 URL: https://issues.apache.org/jira/browse/FLINK-12154
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


In {{SingleInputGate#create}}, we could remove unused parameter 
{{ExecutionAttemptID}}.

And for the constructor of {{SingleInputGate}}, we could remove unused 
parameter {{TaskIOMetricGroup}}.

Then we introduce {{createSingleInputGate}} for reusing the process of creating 
{{SingleInputGate}} in related tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12146) Remove unregister task from NetworkEnvironment to simplify the interface of ShuffleService

2019-04-09 Thread zhijiang (JIRA)
zhijiang created FLINK-12146:


 Summary: Remove unregister task from NetworkEnvironment to 
simplify the interface of ShuffleService
 Key: FLINK-12146
 URL: https://issues.apache.org/jira/browse/FLINK-12146
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The current {{NetworkEnvironment}} would be the default {{ShuffleService}} 
implementation in task manager. In order to make the interface simple, we try 
to avoid more interactive with {{NetworkEnvironment}}.

{{NetworkEnvironment#unregisterTask}} is used for closing partition/gate and 
releasing partition from {{ResultPartitionManager}}. partition/gate close could 
be done in task which already maintains the arrays of them. Further we could 
release partition from {{ResultPartitionManager}} inside {{ResultPartition}} 
via introducing {{ResultPartition#close(Throwable)}}. To do so, the 
{{NetworkEnvironment#unregisterTask}} could be totally replaced to remove.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6227) Introduce the DataConsumptionException for downstream task failure

2019-04-09 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813215#comment-16813215
 ] 

zhijiang commented on FLINK-6227:
-

Exactly, this exception is received on consumer side via network stack and then 
is {{setError}} on specific input channel. When this input channel is read by 
task, it would {{checkError}} to interrupt task running. So the whole process 
would not be through operator/user code to change this special exception.

> Introduce the DataConsumptionException for downstream task failure
> --
>
> Key: FLINK-6227
> URL: https://issues.apache.org/jira/browse/FLINK-6227
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> It is part of FLIP-1.
> We define a new special exception to indicate the downstream task failure in 
> consuming upstream data. 
> The {{JobManager}} will receive and consider this special exception to 
> calculate the minimum connected sub-graph which is called {{FailoverRegion}}. 
> So the {{DataConsumptionException}} should contain {{ResultPartitionID}} 
> information for {{JobMaster}} tracking upstream executions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12127) Move network related options to NetworkEnvironmentOptions

2019-04-08 Thread zhijiang (JIRA)
zhijiang created FLINK-12127:


 Summary: Move network related options to NetworkEnvironmentOptions
 Key: FLINK-12127
 URL: https://issues.apache.org/jira/browse/FLINK-12127
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Some network related options in TaskManagerOptions could be moved into new 
introduced `NetworkEnvironmentOptions` which would be used for different 
shuffle services.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


<    4   5   6   7   8   9   10   11   12   >