[jira] [Comment Edited] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880465#comment-16880465 ] zhijiang edited comment on FLINK-13100 at 7/8/19 3:28 PM: -- Yes, the previous SpilledSubpartitionView really has the deadlock issue based on two buffers. And now we have the similar issue as before, but throw the IOException instead. Your understanding of above two parts is right. But there exists another case: * When the view is created, it would be enqueued into the netty thread loop at first time, because it has both data and credit now as you mentioned. * Then a buffer is fetched from view and writeAndFlush() is called. If it is still available in the queue, that means it must has credit because next buffer is always available for reading ahead. For this case it has no problem, because we only wait the previous writeAndFlush() done to trigger the next one if current queue is not empty. * But if it is not available (no credit) when writeAndFlush() is called, then the queue is empty in netty thread loop. Before the future of writeAndFlush() is done, the netty thread could still process the AddCredit message which would make the view become available again, then it would be added into queue to trigger the next writeAndFlush(). That means the previous buffer is not recycled but the next write is also triggered to cause the problem. The process might be like this : first writeAndFlush() pending -> addCredit(trigger second writeAndFlush) -> finish the first writeAndFlush() to recycle buffer. * I think it might be caused by the improvement of reusing flink buffer in netty stack from release-1.5. We could break writeAndFlush() into write and flush two processes. In the before when the write process finishes, the flink buffer is copied into netty internal ByteBuffer to be recycled then, so it would not cause problem even though the second writeAndFlush is triggered before first pending done. But now the write process would still reference the flink buffer in netty stack until the flush is done. I would try to mock this process in relevant unit tests to verify and I might submit this test tomorrow for understanding it easily. was (Author: zjwang): Yes, the previous SpilledSubpartitionView really has the deadlock issue based on two buffers. And now we have the similar issue as before, but throw the IOException instead. Your understanding of above two parts is right. But there exists another case: * When the view is created, it would be enqueued into the netty thread loop at first time, because it has both data and credit now as you mentioned. * Then a buffer is fetched from view and writeAndFlush() is called. If it is still available in the queue, that means it must has credit because next buffer is always available for reading ahead. For this case it has no problem, because we only wait the previous writeAndFlush() done to trigger the next one. * But if it is not available (no credit) when writeAndFlush() is called, then the queue is empty in netty thread loop. Before the future of writeAndFlush() is done, the netty thread could still process the AddCredit message which would make the view become available again, then it would be added into queue to trigger the next writeAndFlush(). That means the previous buffer is not recycled but the next write is also triggered to cause the problem. The process might be like this : first writeAndFlush() pending -> addCredit(trigger second writeAndFlush) -> finish the first writeAndFlush() to recycle buffer. * I think it might be caused by the improvement of reusing flink buffer in netty stack from release-1.5. We could break writeAndFlush() into write and flush two processes. In the before when the write process finishes, the flink buffer is copied into netty internal ByteBuffer to be recycled then, so it would not cause problem even though the second writeAndFlush is triggered before first pending done. But now the write process would still reference the flink buffer in netty stack until the flush is done. I would try to mock this process in relevant unit tests to verify and I might submit this test tomorrow for understanding it easily. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would t
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880465#comment-16880465 ] zhijiang commented on FLINK-13100: -- Yes, the previous SpilledSubpartitionView really has the deadlock issue based on two buffers. And now we have the similar issue as before, but throw the IOException instead. Your understanding of above two parts is right. But there exists another case: * When the view is created, it would be enqueued into the netty thread loop at first time, because it has both data and credit now as you mentioned. * Then a buffer is fetched from view and writeAndFlush() is called. If it is still available in the queue, that means it must has credit because next buffer is always available for reading ahead. For this case it has no problem, because we only wait the previous writeAndFlush() done to trigger the next one. * But if it is not available (no credit) when writeAndFlush() is called, then the queue is empty in netty thread loop. Before the future of writeAndFlush() is done, the netty thread could still process the AddCredit message which would make the view become available again, then it would be added into queue to trigger the next writeAndFlush(). That means the previous buffer is not recycled but the next write is also triggered to cause the problem. The process might be like this : first writeAndFlush() pending -> addCredit(trigger second writeAndFlush) -> finish the first writeAndFlush() to recycle buffer. * I think it might be caused by the improvement of reusing flink buffer in netty stack from release-1.5. We could break writeAndFlush() into write and flush two processes. In the before when the write process finishes, the flink buffer is copied into netty internal ByteBuffer to be recycled then, so it would not cause problem even though the second writeAndFlush is triggered before first pending done. But now the write process would still reference the flink buffer in netty stack until the flush is done. I would try to mock this process in relevant unit tests to verify and I might submit this test tomorrow for understanding it easily. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Fix the calculation of backlog in PipelinedSubpartition
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880427#comment-16880427 ] zhijiang commented on FLINK-11082: -- [~pnowojski], if the requested floating buffers are not used in idle channels, then they would be returned back to local buffer pool eventually with all the exclusive buffers recycled. E.g. * If the current backlog is 2 and only 1 exclusive buffer is available in remote channel, then it would request (initial credit + backlog - numAvailableBuffers) 3 floating buffers. * If the backlog is 1 and total 3 buffers available (all floating buffers) in remote channel. * If the backlog is 0, we would return 1 floating buffer back to local buffer pool, keep 2 available buffers(both floating) in remote channel. * If the backlog is 0, and one exclusive buffer is processed to recycle to remote channel, then it triggers returning one floating buffer back to local pool, still two buffers available in remote channel(one floating and one exclusive). * If the backlog is 0, and another exclusive buffer is processed to recycle to remote channel, then it also triggers returning one floating buffer back to local pool, still two buffers available in remote channel(both are exclusive). So the floating buffers would not be occupied by remote channel long time if there are already (backlog + initial credit) available buffers in remote channel. > Fix the calculation of backlog in PipelinedSubpartition > --- > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Affects Versions: 1.5.6, 1.7.1 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The backlog of subpartition should indicate how many buffers are consumable, > then the consumer could feedback the corresponding credits for transporting > these buffers. But in current PipelinedSubpartitionimplementation, the > backlog is increased by 1 when a BufferConsumer is added into > PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed > from PipelinedSubpartition. So the backlog only reflects how many buffers are > retained in PipelinedSubpartition, which is not always equivalent to the > number of consumable buffers. > The backlog inconsistency might result in floating buffers misdistribution on > consumer side, because the consumer would request floating buffers based on > backlog value, then one floating buffer might not be used in > RemoteInputChannel long time after requesting. > Considering the solution, the last buffer in PipelinedSubpartition could only > be consumable in the case of flush triggered or partition finished. So we > could calculate the backlog precisely based on partition flushed/finished > conditions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13141) Remove getBufferSize method from BufferPoolFactory
zhijiang created FLINK-13141: Summary: Remove getBufferSize method from BufferPoolFactory Key: FLINK-13141 URL: https://issues.apache.org/jira/browse/FLINK-13141 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang This is just a refactor work to make the interfacer of BufferPoolFactory more simple and clean. BufferPoolFactory#getBufferSize is only used for creating subpartitions in ResultPartitionFactory. We could pass the network buffer size from NettyShuffleEnvironmentConfiguration while constructing the ResultPartitionFactory, then the interface method getBufferSize could be removed form BufferPoolFactory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12852: - Comment: was deleted (was: I thought of another option to keep the dynamic local pool as now and only need to fix the internal logic in LocalBufferPool to solve the deadlock issue. The basic idea is that the local pool only requests buffers within min size from global initially, and eventually requests dynamic buffers within max size based on the history recycle ability. The details are as follows: * The local pool still has min and max size in construction as now. * We need an internal counter for statistic how many buffers were recycled totally before, which is the critical factor for indicating how many extra buffers beyond min size this pool could request from global. * The currentPoolSize would not reach max value directly after LocalBufferPool#setNumBuffers, and it should be the min size plus above recycled counter. * The assumption is that if the pool already recycled x buffers before, that means the downstream has the ability to consume x buffers and it could always recycle x buffers again, so we could request extra x buffers within max region from global pool. Then these x buffers must be returned to global finally after snarrowed in redistribution. [~StephanEwen] [~pnowojski] [~gaoyunhaii]) > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the > network memory into the ResourceProfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879997#comment-16879997 ] zhijiang commented on FLINK-12852: -- I thought of another option to keep the dynamic local pool as now and only need to fix the internal logic in LocalBufferPool to solve the deadlock issue. The basic idea is that the local pool only requests buffers within min size from global initially, and eventually requests dynamic buffers within max size based on the history recycle ability. The details are as follows: * The local pool still has min and max size in construction as now. * We need an internal counter for statistic how many buffers were recycled totally before, which is the critical factor for indicating how many extra buffers beyond min size this pool could request from global. * The currentPoolSize would not reach max value directly after LocalBufferPool#setNumBuffers, and it should be the min size plus above recycled counter. * The assumption is that if the pool already recycled x buffers before, that means the downstream has the ability to consume x buffers and it could always recycle x buffers again, so we could request extra x buffers within max region from global pool. Then these x buffers must be returned to global finally after snarrowed in redistribution. [~StephanEwen] [~pnowojski] [~gaoyunhaii] > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the > network memory into the ResourceProfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879396#comment-16879396 ] zhijiang edited comment on FLINK-12852 at 7/5/19 3:58 PM: -- Thanks for above detail conclusions [~StephanEwen]! I agree that the option 4 is the most feasible way ATM. The logic of fixed-size pool is easy and direct to avoid sharing buffers among tasks compared with dynamic size as now, so tasks are independent with each other, not relying on any assumptions. We ever realized the fixed-size local pool to avoid the buffers redistribution in early Blink version. I understand the option 1 has the same direction with option 4, both with fixed-size local pool. The difference is that option 4 takes the (2*numChannels + floating) as the fixed size of pool for better performance, and the option 1 takes the numChannels as the fixed size of pool. The option 1 is compatible with users when upgrading, but it might bring performance regression. The option 4 could keep the performance but might need user to adjust the config when upgrade failure. The other three options are indeed complex to implement and might break/rely on some assumptions to bring potential risks. was (Author: zjwang): Thanks for above detail conclusions [~StephanEwen]! I agree that the option 4 is the most feasible way ATM. The logic of fixed-size pool is easy and direct to avoid sharing buffers among tasks compared with dynamic size as now, so tasks are independent with each other, not relying on any assumptions. We ever realized the fixed-size local pool to avoid the buffers redistribution in early Blink version. I understand the option 1 has the same direction with option 4, both with fixed-size local buffer. The difference is that option 4 takes the (2*numChannels + floating) as the fixed size of pool for better performance, and the option 1 takes the numChannels as the fixed size of pool. The option 1 is compatible with users when upgrading, but it might bring performance regression. The option 4 could keep the performance but might need user to adjust the config when upgrade failure. The other three options are indeed complex to implement and might break/rely on some assumptions to bring potential risks. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 1
[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879396#comment-16879396 ] zhijiang commented on FLINK-12852: -- Thanks for above detail conclusions [~StephanEwen]! I agree that the option 4 is the most feasible way ATM. The logic of fixed-size pool is easy and direct to avoid sharing buffers among tasks compared with dynamic size as now, so tasks are independent with each other, not relying on any assumptions. We ever realized the fixed-size local pool to avoid the buffers redistribution in early Blink version. I understand the option 1 has the same direction with option 4, both with fixed-size local buffer. The difference is that option 4 takes the (2*numChannels + floating) as the fixed size of pool for better performance, and the option 1 takes the numChannels as the fixed size of pool. The option 1 is compatible with users when upgrading, but it might bring performance regression. The option 4 could keep the performance but might need user to adjust the config when upgrade failure. The other three options are indeed complex to implement and might break/rely on some assumptions to bring potential risks. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the > network memory into the ResourceProfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13126) Construct special test/benchmark to verify the backlog effect
zhijiang created FLINK-13126: Summary: Construct special test/benchmark to verify the backlog effect Key: FLINK-13126 URL: https://issues.apache.org/jira/browse/FLINK-13126 Project: Flink Issue Type: Sub-task Components: Benchmarks, Runtime / Network, Tests Reporter: zhijiang Assignee: zhijiang Based on Piotr's suggestion in reviewing [PR|[https://github.com/apache/flink/pull/7911]], it is better to construct relevant test or benchmark to further verify the backlog effect as a follow-up work for [FLINK-11082|https://issues.apache.org/jira/browse/FLINK-11082]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11082) Fix the calculation of backlog in PipelinedSubpartition
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-11082: - Affects Version/s: (was: 1.6.3) (was: 1.8.0) > Fix the calculation of backlog in PipelinedSubpartition > --- > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Affects Versions: 1.5.6, 1.7.1 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The backlog of subpartition should indicate how many buffers are consumable, > then the consumer could feedback the corresponding credits for transporting > these buffers. But in current PipelinedSubpartitionimplementation, the > backlog is increased by 1 when a BufferConsumer is added into > PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed > from PipelinedSubpartition. So the backlog only reflects how many buffers are > retained in PipelinedSubpartition, which is not always equivalent to the > number of consumable buffers. > The backlog inconsistency might result in floating buffers misdistribution on > consumer side, because the consumer would request floating buffers based on > backlog value, then one floating buffer might not be used in > RemoteInputChannel long time after requesting. > Considering the solution, the last buffer in PipelinedSubpartition could only > be consumable in the case of flush triggered or partition finished. So we > could calculate the backlog precisely based on partition flushed/finished > conditions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11082) Fix the calculation of backlog in PipelinedSubpartition
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-11082: - Description: The backlog of subpartition should indicate how many buffers are consumable, then the consumer could feedback the corresponding credits for transporting these buffers. But in current PipelinedSubpartitionimplementation, the backlog is increased by 1 when a BufferConsumer is added into PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed from PipelinedSubpartition. So the backlog only reflects how many buffers are retained in PipelinedSubpartition, which is not always equivalent to the number of consumable buffers. The backlog inconsistency might result in floating buffers misdistribution on consumer side, because the consumer would request floating buffers based on backlog value, then one floating buffer might not be used in RemoteInputChannel long time after requesting. Considering the solution, the last buffer in PipelinedSubpartition could only be consumable in the case of flush triggered or partition finished. So we could calculate the backlog precisely based on partition flushed/finished conditions. was: The backlog should indicate how many buffers are available in subpartition for downstream's consumption. The availability is considered from two factors. One is {{BufferConsumer}} finished, and the other is flush triggered. In current implementation, when the {{BufferConsumer}} is added into the subpartition, then the backlog is increased as a result, but this {{BufferConsumer}} is not yet available for network transport. Furthermore, the backlog would affect requesting floating buffers on downstream side. That means some floating buffers are fetched in advance but not be used for long time, so the floating buffers are not made use of efficiently. We found this scenario extremely for rebalance selector on upstream side, so we want to change when to increase backlog by finishing {{BufferConsumer}} or flush triggered. > Fix the calculation of backlog in PipelinedSubpartition > --- > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The backlog of subpartition should indicate how many buffers are consumable, > then the consumer could feedback the corresponding credits for transporting > these buffers. But in current PipelinedSubpartitionimplementation, the > backlog is increased by 1 when a BufferConsumer is added into > PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed > from PipelinedSubpartition. So the backlog only reflects how many buffers are > retained in PipelinedSubpartition, which is not always equivalent to the > number of consumable buffers. > The backlog inconsistency might result in floating buffers misdistribution on > consumer side, because the consumer would request floating buffers based on > backlog value, then one floating buffer might not be used in > RemoteInputChannel long time after requesting. > Considering the solution, the last buffer in PipelinedSubpartition could only > be consumable in the case of flush triggered or partition finished. So we > could calculate the backlog precisely based on partition flushed/finished > conditions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11082) Fix the calculation of backlog in PipelinedSubpartition
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-11082: - Summary: Fix the calculation of backlog in PipelinedSubpartition (was: Increase backlog only if it is available for consumption) > Fix the calculation of backlog in PipelinedSubpartition > --- > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878692#comment-16878692 ] zhijiang commented on FLINK-12852: -- Thanks for concerning this issue [~aitozi] We already have the check `numTotalRequiredBuffers + numberOfSegmentsToRequest > totalNumberOfMemorySegments` during `NetworkBufferPool#requestMemorySegments` and this check is mainly used for judging whether the global size could satisfy the total core size. If we replace the `numberOfSegmentsToRequest` with `numRequiredBuffers` as you suggested above, what is the value for `numRequiredBuffers` here? During `NetworkBufferPool#createBufferPool`, the `numRequiredBuffers` in check is for the core size in local pool. I guess your suggestion is also throwing the IOException("Insufficient buffers...") after allocating the exclusive buffers failed. There are two concerns for this solution: * How long do you want to wait before throwing the exception. The extra buffers recycle from other local pool might take some time after redistribution, but we could not estimate the rough time to throw this exception properly. * This seems a bit tricky/hacky to reuse the check here. If the exception tells users the message of insufficient buffers, users might be confused because the total buffers are actually enough. If the exception tells users the message of deadlock, users might still do not know what to do. The only benefit is identifying the deadlock easily, not via jstack. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the > network
[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878518#comment-16878518 ] zhijiang commented on FLINK-12852: -- We have two directions in general: buffers sharing between tasks or not. * The sharing direction seems more flexible and has more possibilities, but we need to confirm the buffers revocable mechanism to not bring downsides. ATM we have above the first and third ways for it. * The not-sharing direction could avoid potential issues completely, but it might bring resource waste in some cases. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the > network memory into the ResourceProfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878516#comment-16878516 ] zhijiang commented on FLINK-12852: -- [~pnowojski] "Network resource matching in slot" refers to the second one you mentioned, and "Lazy allocation buffers on producer side" refers to the first one. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the > network memory into the ResourceProfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878513#comment-16878513 ] zhijiang commented on FLINK-13100: -- What do you think [~pnowojski] [~StephanEwen] > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
zhijiang created FLINK-13100: Summary: Fix the unexpected IOException during FileBufferReader#nextBuffer Key: FLINK-13100 URL: https://issues.apache.org/jira/browse/FLINK-13100 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang In the implementation of FileBufferReader#nextBuffer, we expect the next memory segment always available based on the assumption that the nextBuffer call could only happen when the previous buffer was recycled before. Otherwise it would throw an IOException in current implementation. In fact, the above assumption is not making sense based on the credit-based and zero-copy features in network. The detail processes are as follows: * The netty thread finishes calling the channel.writeAndFlush() in PartitionRequestQueue and adds a listener to handle the ChannelFuture later. Before future done, the corresponding buffer is not recycled because of zero-copy improvement. * Before the previous future done, the netty thread could trigger next writeAndFlush via processing addCredit message, then FileBufferReader#nextBuffer would throw exception because of previous buffer not recycled. We thought of several ways for solving this potential bug: * It does not trigger the next writeAndFlush before the previous future done. To do so it has to maintain the future state and check it in relevant actions. I wonder it might bring performance regression in network throughput and bring extra state management. * Adjust the implementation of current FileBufferReader. We ever regarded the blocking partition view as always available based on the next buffer read ahead, so it would be always added into available queue in PartitionRequestQueue. Actually this next buffer ahead only simplifies the process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view availability could be judged based on available buffers in FileBufferReader instead of next buffer ahead. When the buffer is recycled into FileBufferReader after writeAndFlush done, it could call notifyDataAvailable to add this view into available queue in PartitionRequestQueue. I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-12738) Remove abstract getPageSize method from InputGate
[ https://issues.apache.org/jira/browse/FLINK-12738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang resolved FLINK-12738. -- Resolution: Fixed Fixed in b6563e385263beb556ce19855ca6dfdbb7f6c853 > Remove abstract getPageSize method from InputGate > - > > Key: FLINK-12738 > URL: https://issues.apache.org/jira/browse/FLINK-12738 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Currently {{InputGate#getPageSize}} is only used for constructing > {{BarrierBuffer}}. In order to make abstract InputGate simple and clean, we > should remove unnecessary abstract methods from it. > Considering the page size could be parsed directly from configuration which > could also visible while constructing {{BarrierBuffer}}, so it is reasonable > to do so. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878289#comment-16878289 ] zhijiang edited comment on FLINK-12852 at 7/4/19 3:27 AM: -- Network resource matching in slot has many unsetting issues which would be further discussed future, so we could not make it effect in short time. Lazy allocation buffers on producer side seems a feasible way atm. It could still retain the current core and maximum mechanism in local pool. But it brings another two effects: * Higher time to ramp up to full throughput as Stephan mentioned, especially for some very short-time jobs (several seconds finish) and I remembered there exists such cases in Kurt's benchmark before. We change the previous concurrent production and consumption to sequential way. For short-time job, before the consumer requests partition, all the data set might already be emitted and cached in partition pool on producer side before. * We rely on another assumption that produced buffers could be recycled finally once subpartition view is established. This assumption might limit our new features/improvements future. ATM we need to adjust the action to trigger partition request, that means RemoteInputChannel could only send partition request if the correspond task has no result partition or the partition's view has already been established. In future the InputSelection might also destroy the above assumption. Although the partition was requested, but the OP could select not to consumer that partition long time. was (Author: zjwang): Network resource matching in slot has many unsetting issues which should be further discussed future, so we could not make it effect in short time. Lazy allocation buffers on producer side seems a feasible way atm. It could still retain the current core and maximum mechanism in local pool. But it brings another two effects: * Higher time to ramp up to full throughput as Stephan mentioned, especially for some very short-time jobs (several seconds finish) and I remembered there exists such cases in Kurt's benchmark before. We change the previous concurrent production and consumption to sequential way. For short-time job, before the consumer requests partition, all the data set might already be emitted and cached in partition pool on producer side before. * We rely on another assumption that produced buffers could be recycled finally once subpartition view is established. This assumption might limit our new features/improvements future. ATM we need to adjust the action to trigger partition request, that means `RemoteInputChannel` could only send partition request if this task has no result partition or the partition's view has already been established. In future the InputSelection might also destroy the above assumption. Although the partition was requested, but the OP could select not to consumer that partition long time. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.run
[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878289#comment-16878289 ] zhijiang commented on FLINK-12852: -- Network resource matching in slot has many unsetting issues which should be further discussed future, so we could not make it effect in short time. Lazy allocation buffers on producer side seems a feasible way atm. It could still retain the current core and maximum mechanism in local pool. But it brings another two effects: * Higher time to ramp up to full throughput as Stephan mentioned, especially for some very short-time jobs (several seconds finish) and I remembered there exists such cases in Kurt's benchmark before. We change the previous concurrent production and consumption to sequential way. For short-time job, before the consumer requests partition, all the data set might already be emitted and cached in partition pool on producer side before. * We rely on another assumption that produced buffers could be recycled finally once subpartition view is established. This assumption might limit our new features/improvements future. ATM we need to adjust the action to trigger partition request, that means `RemoteInputChannel` could only send partition request if this task has no result partition or the partition's view has already been established. In future the InputSelection might also destroy the above assumption. Although the partition was requested, but the OP could select not to consumer that partition long time. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not inclu
[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16877737#comment-16877737 ] zhijiang commented on FLINK-12852: -- It is hardly to say whether we could get performance benefits via overusing the extra buffers beyond core size temporarily. In general the tasks would be deployed into one TM by sequence in very short internal time. If the first task occupies some buffers actually belong to other following tasks, it would also bring some cost/overhead to recycle these extra buffers afterwards. Especially this greedy mechanism might only have limited benefits in special cases, such as backpressure. Overall it is hard to evaluate the final benefits in different scenarios. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the > network memory into the ResourceProfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16877616#comment-16877616 ] zhijiang commented on FLINK-12852: -- Hey [~gaoyunhaii], considering the second point for long run, I have not thought it throughly. I am not very sure whether it is still worth keeping dynamic local pool between core and max size based on slot resource matching. If this dynamic would bring more troubles in practice, another option is adjusting it to a fix-size local pool instead. The system could calculate a reasonable default size for the local pool as now, and users could also tune it to any size they want. E.g. only 2 total buffers in local pool could also work for 100 subpartitions if users are not caring about the performance. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the > network memory into the ResourceProfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16877605#comment-16877605 ] zhijiang commented on FLINK-12852: -- The spill way could solve the deadlock issue in function, which is very similar with previous SpillableSubpartition's behavior. If we only want to spill the buffers between core size and max size after redistribution, it still could not solve another existing exception of `insufficient number of network buffers' which was always experienced in Flink for large-scale job. If we also spill some buffers within core size to avoid that `IOException`, the performance regression might be serious but users are not aware of it. Users might prefer to increase buffer options to avoid performance regression if they know. Especially for streaming job, it is better not to touch disk unless necessary. In contrast, if we agree that the slot resource matching would be the final way in future, then it could resolve both deadlock and `insufficient number of network buffers' issues. And users could decide to adjust the relevant buffer configs to make a tradeoff between performance and total resource usage. And we might further improve the internal mechanism for decreasing the requirements of core buffer sizes(including exclusive and core size in local buffer) to make job still run when given limited resource. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the > network memory into the ResourceProfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16877073#comment-16877073 ] zhijiang commented on FLINK-12852: -- [~StephanEwen] thanks for concerning this issue and sharing the suggestions. The root reason of this issue is the mechanism of distributing global buffers among LocalBufferPools. As we know the local pool has core size and max size for partition/gate. The core size must be satisfied otherwise an exception would be thrown. Every local pool could use max size at most if there are enough buffers in global pool to make better use of resource. But the precondition is the extra buffers beyond core size could be returned finally after redistribution. This precondition/assumption is not always satisfied especially in credit-based mode for the following reasons: * In non-credit mode, the producer could always push data to network until backpressure, so the extra used buffers in partition's local pool could be recycled in time. But now the producer could not send buffer to network until the consumer requested the exclusive buffers as initial credits. * In non-credit mode, the gate's local pool only needs the core size equal to numChannels, but now we need 2*numChannels as exclusive core size by default in credit-based mode. So the probability is higher than before. * In non-credit mode, the buffer request from global pool is lazy by data driven, that means after consumer receives data from network, then the local pool would request buffer from global pool. But now the exclusive buffer request from global pool is eager during startup. So the probability is also higher than before. If we make exclusive request also lazy, then it might also relieve the deadlock issue. I think the previous non-credit mode could not avoid deadlock completely in theory, not very sure, especially for some corner cases. But in credit-based mode, the above factors would make the probability of deadlock much higher than before. I agree with the implications mentioned by Stephan, the current blocking partition would not exist this issue. For streaming job it still exists this probability, but i think it should not be a blocker for release-1.9. The current proposed PR makes the deadlock to fail instead. It seems a bit better than now to tell users what happens, but it does not solve this issue in root. And users might still have a bad experience. We ever avoided this issue in alibaba via network resource matching in ResourceProfile and fixed size in local pool. I also agree with the general ideas Stephan proposed for solving this issue. The slot resource isolation (including shuffle resource) might be a right way to go in future. ATM we could make some improvements to decrease this probability, such as making exclusive size as 1 by default which Jira was already created before and making exclusive request lazy as floating buffers. > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.netwo
[jira] [Updated] (FLINK-12882) Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters
[ https://issues.apache.org/jira/browse/FLINK-12882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12882: - Description: The ResultPartitionID could be got directly from ResultPartitionDeploymentDescriptor, so it is no need to pass ExecutionAttemptID to construct new ResultPartitionID during creating ResultPartition in factory. (was: The {{ExecutionAttemptID}} is only used for constructing {{ResultPartitionID}} during creating {{ResultPartitionWriters in ShuffleEnvironment}}. Actually the {{ResultPartitionID}} could be got directly from {{ResultPartitionDeploymentDescriptor}} via {{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this field in the interface to make it simple.) > Remove ExecutionAttemptID field from > ShuffleEnvironment#createResultPartitionWriters > > > Key: FLINK-12882 > URL: https://issues.apache.org/jira/browse/FLINK-12882 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The ResultPartitionID could be got directly from > ResultPartitionDeploymentDescriptor, so it is no need to pass > ExecutionAttemptID to construct new ResultPartitionID during creating > ResultPartition in factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12882) Remove ExecutionAttemptID argument from ResultPartitionFactory#create
[ https://issues.apache.org/jira/browse/FLINK-12882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12882: - Summary: Remove ExecutionAttemptID argument from ResultPartitionFactory#create (was: Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters) > Remove ExecutionAttemptID argument from ResultPartitionFactory#create > - > > Key: FLINK-12882 > URL: https://issues.apache.org/jira/browse/FLINK-12882 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The ResultPartitionID could be got directly from > ResultPartitionDeploymentDescriptor, so it is no need to pass > ExecutionAttemptID to construct new ResultPartitionID during creating > ResultPartition in factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13010) Refactor the process of SchedulerNG#requestPartitionState
zhijiang created FLINK-13010: Summary: Refactor the process of SchedulerNG#requestPartitionState Key: FLINK-13010 URL: https://issues.apache.org/jira/browse/FLINK-13010 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently `requestPartitionState` is mainly used for querying partition state when the consumer receives `PartitionNotFoundException` during requesting partition. Actually we do not have the concept of partition state atm, and ` requestPartitionState` would return the corresponding producer's state as as result, so it exists a contradiction here. My suggestion is refactoring the method as `requestPartitionProducerState` and we do not need to pass `IntermediateDataSetID` and `ResultPartitionID` arguments for finding the corresponding execution attempt. We could only pass the `ExecutionAttemptID` in method then the corresponding execution attempt could be easily found from the mapping in `ExecutionGraph`. To do so, we could further remove ` IntermediateDataSetID` from `SingleInputGate` and might replace `IntermediateDataSetID` by `InputGateID` in `InputGateDeploymentDescriptor`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12284) InputBufferPoolUsage is incorrect in credit-based network control flow
[ https://issues.apache.org/jira/browse/FLINK-12284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12284: - Component/s: Runtime / Metrics > InputBufferPoolUsage is incorrect in credit-based network control flow > -- > > Key: FLINK-12284 > URL: https://issues.apache.org/jira/browse/FLINK-12284 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics, Runtime / Network >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0 >Reporter: Xiaogang Shi >Assignee: Aitozi >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When using credit-based network control flow, exclusive buffers are directly > assigned to {{RemoteInputChannel}} and are not counted in > {{LocalBufferPool}}, leading to incorrect InputBufferPoolUsage. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12889) Job keeps in FAILING state
[ https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16872933#comment-16872933 ] zhijiang edited comment on FLINK-12889 at 6/26/19 5:33 AM: --- My previous analysis in ML was as follows: Actually all the five "Source: ServiceLog" tasks are not in terminal state on JM view, the relevant processes shown in below: * The checkpoint in task causes OOM issue which would call `Task#failExternally` as a result, we could see the log "Attempting to fail task externally" in TM. * The source task would transform state from RUNNING to FAILED and then starts a canceler thread for canceling task, we could see log "Triggering cancellation of task" in TM. * When JM starts to cancel the source tasks, the rpc call `Task#cancelExecution` would find the task was already in FAILED state as above step 2, we could see log "Attempting to cancel task" in TM. At last all the five source tasks are not in terminal states from jm log, I guess the step 2 might not create canceler thread successfully, because the root failover was caused by OOM during creating native thread in step1, so it might exist possibilities that createing canceler thread is not successful as well in OOM case which is unstable. If so, the source task would not been interrupted at all, then it would not report to JM as well, but the state is already changed to FAILED before. For the other vertex tasks, it does not trigger `Task#failExternally` in step 1, and only receives the cancel rpc from JM in step 3. And I guess at this time later than the source period, the canceler thread could be created succesfully after some GCs, then these tasks could be canceled as reported to JM side. I think the key problem is under OOM case some behaviors are not within expectations, so it might bring problems. Maybe we should handle OOM error in extreme way like making TM exit to solve the potential issue. [~till.rohrmann] do you think it is worth fixing or have other concerns? was (Author: zjwang): My previous analysis in ML was as follows: Actually all the five "Source: ServiceLog" tasks are not in terminal state on JM view, the relevant processes shown in below: * The checkpoint in task causes OOM issue which would call `Task#failExternally` as a result, we could see the log "Attempting to fail task externally" in tm. * The source task would transform state from RUNNING to FAILED and then starts a canceler thread for canceling task, we could see log "Triggering cancellation of task" in tm. * When JM starts to cancel the source tasks, the rpc call `Task#cancelExecution` would find the task was already in FAILED state as above step 2, we could see log "Attempting to cancel task" in tm. At last all the five source tasks are not in terminal states from jm log, I guess the step 2 might not create canceler thread successfully, because the root failover was caused by OOM during creating native thread in step1, so it might exist possibilities that createing canceler thread is not successful as well in OOM case which is unstable. If so, the source task would not been interrupted at all, then it would not report to JM as well, but the state is already changed to FAILED before. For the other vertex tasks, it does not trigger `Task#failExternally` in step 1, and only receives the cancel rpc from JM in step 3. And I guess at this time later than the source period, the canceler thread could be created succesfully after some GCs, then these tasks could be canceled as reported to JM side. I think the key problem is under OOM case some behaviors are not within expectations, so it might bring problems. Maybe we should handle OOM error in extreme way like making TM exit to solve the potential issue. [~till.rohrmann] do you think it is worth fixing or have other concerns? > Job keeps in FAILING state > -- > > Key: FLINK-12889 > URL: https://issues.apache.org/jira/browse/FLINK-12889 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Fan Xinpu >Priority: Minor > Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, > taskmanager.log > > > There is a topology of 3 operator, such as, source, parser, and persist. > Occasionally, 5 subtasks of the source encounters exception and turns to > failed, at the same time, one subtask of the parser runs into exception and > turns to failed too. The jobmaster gets a message of the parser's failed. The > jobmaster then try to cancel all the subtask, most of the subtasks of the > three operator turns to canceled except the 5 subtasks of the source, because > the state of the 5 ones is already FAILED before jobmaster try to cancel it. > Then the jobmaster can not
[jira] [Comment Edited] (FLINK-12889) Job keeps in FAILING state
[ https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16872933#comment-16872933 ] zhijiang edited comment on FLINK-12889 at 6/26/19 5:32 AM: --- My previous analysis in ML was as follows: Actually all the five "Source: ServiceLog" tasks are not in terminal state on JM view, the relevant processes shown in below: * The checkpoint in task causes OOM issue which would call `Task#failExternally` as a result, we could see the log "Attempting to fail task externally" in tm. * The source task would transform state from RUNNING to FAILED and then starts a canceler thread for canceling task, we could see log "Triggering cancellation of task" in tm. * When JM starts to cancel the source tasks, the rpc call `Task#cancelExecution` would find the task was already in FAILED state as above step 2, we could see log "Attempting to cancel task" in tm. At last all the five source tasks are not in terminal states from jm log, I guess the step 2 might not create canceler thread successfully, because the root failover was caused by OOM during creating native thread in step1, so it might exist possibilities that createing canceler thread is not successful as well in OOM case which is unstable. If so, the source task would not been interrupted at all, then it would not report to JM as well, but the state is already changed to FAILED before. For the other vertex tasks, it does not trigger `Task#failExternally` in step 1, and only receives the cancel rpc from JM in step 3. And I guess at this time later than the source period, the canceler thread could be created succesfully after some GCs, then these tasks could be canceled as reported to JM side. I think the key problem is under OOM case some behaviors are not within expectations, so it might bring problems. Maybe we should handle OOM error in extreme way like making TM exit to solve the potential issue. [~till.rohrmann] do you think it is worth fixing or have other concerns? was (Author: zjwang): My previous analysis in ML was as follows: Actually all the five "Source: ServiceLog" tasks are not in terminal state on JM view, the relevant processes shown in below: * The checkpoint in task causes OOM issue which would call `Task#failExternally` as a result, we could see the log "Attempting to fail task externally" in tm. * The source task would transform state from RUNNING to FAILED and then starts a canceler thread for canceling task, we could see log "Triggering cancellation of task" in tm. * When JM starts to cancel the source tasks, the rpc call `Task#cancelExecution` would find the task was already in FAILED state as above step 2, we could see log "Attempting to cancel task" in tm. At last all the five source tasks are not in terminal states from jm log, I guess the step 2 might not create canceler thread successfully, because the root failover was caused by OOM during creating native thread in step1, so it might exist possibilities that createing canceler thread is not successful as well in OOM case which is unstable. If so, the source task would not been interrupted at all, then it would not report to JM as well, but the state is already changed to FAILED before. For the other vertex tasks, it does not trigger `Task#failExternally` in step 1, and only receives the cancel rpc from JM in step 3. And I guess at this time later than the source period, the canceler thread could be created succesfully after some GCs, then these tasks could be canceled as reported to JM side. I think the key problem is under OOM case some behaviors are not within expectations, so it might bring problems. Maybe we should handle OOM error in extreme way like making TM exit to solve the potential issue. [~till.rohrmann] do you think it is worth fixing or have other concerns? > Job keeps in FAILING state > -- > > Key: FLINK-12889 > URL: https://issues.apache.org/jira/browse/FLINK-12889 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Fan Xinpu >Priority: Minor > Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, > taskmanager.log > > > There is a topology of 3 operator, such as, source, parser, and persist. > Occasionally, 5 subtasks of the source encounters exception and turns to > failed, at the same time, one subtask of the parser runs into exception and > turns to failed too. The jobmaster gets a message of the parser's failed. The > jobmaster then try to cancel all the subtask, most of the subtasks of the > three operator turns to canceled except the 5 subtasks of the source, because > the state of the 5 ones is already FAILED before jobmaster try to cancel it. > Then the jobmaster can not reach a fin
[jira] [Updated] (FLINK-12889) Job keeps in FAILING state
[ https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12889: - Priority: Minor (was: Blocker) > Job keeps in FAILING state > -- > > Key: FLINK-12889 > URL: https://issues.apache.org/jira/browse/FLINK-12889 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Fan Xinpu >Priority: Minor > Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, > taskmanager.log > > > There is a topology of 3 operator, such as, source, parser, and persist. > Occasionally, 5 subtasks of the source encounters exception and turns to > failed, at the same time, one subtask of the parser runs into exception and > turns to failed too. The jobmaster gets a message of the parser's failed. The > jobmaster then try to cancel all the subtask, most of the subtasks of the > three operator turns to canceled except the 5 subtasks of the source, because > the state of the 5 ones is already FAILED before jobmaster try to cancel it. > Then the jobmaster can not reach a final state but keeps in Failing state > meanwhile the subtask of the source kees in canceling state. > > The job run on a flink 1.7 cluster on yarn, and there is only one tm with 10 > slots. > > The attached files contains a jm log , tm log and the ui picture. > > The exception timestamp is about 2019-06-16 13:42:28. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12889) Job keeps in FAILING state
[ https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12889: - Affects Version/s: (was: 1.7.2) > Job keeps in FAILING state > -- > > Key: FLINK-12889 > URL: https://issues.apache.org/jira/browse/FLINK-12889 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Fan Xinpu >Priority: Blocker > Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, > taskmanager.log > > > There is a topology of 3 operator, such as, source, parser, and persist. > Occasionally, 5 subtasks of the source encounters exception and turns to > failed, at the same time, one subtask of the parser runs into exception and > turns to failed too. The jobmaster gets a message of the parser's failed. The > jobmaster then try to cancel all the subtask, most of the subtasks of the > three operator turns to canceled except the 5 subtasks of the source, because > the state of the 5 ones is already FAILED before jobmaster try to cancel it. > Then the jobmaster can not reach a final state but keeps in Failing state > meanwhile the subtask of the source kees in canceling state. > > The job run on a flink 1.7 cluster on yarn, and there is only one tm with 10 > slots. > > The attached files contains a jm log , tm log and the ui picture. > > The exception timestamp is about 2019-06-16 13:42:28. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12889) Job keeps in FAILING state
[ https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12889: - Component/s: (was: API / DataStream) Runtime / Coordination > Job keeps in FAILING state > -- > > Key: FLINK-12889 > URL: https://issues.apache.org/jira/browse/FLINK-12889 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.7.2 >Reporter: Fan Xinpu >Priority: Blocker > Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, > taskmanager.log > > > There is a topology of 3 operator, such as, source, parser, and persist. > Occasionally, 5 subtasks of the source encounters exception and turns to > failed, at the same time, one subtask of the parser runs into exception and > turns to failed too. The jobmaster gets a message of the parser's failed. The > jobmaster then try to cancel all the subtask, most of the subtasks of the > three operator turns to canceled except the 5 subtasks of the source, because > the state of the 5 ones is already FAILED before jobmaster try to cancel it. > Then the jobmaster can not reach a final state but keeps in Failing state > meanwhile the subtask of the source kees in canceling state. > > The job run on a flink 1.7 cluster on yarn, and there is only one tm with 10 > slots. > > The attached files contains a jm log , tm log and the ui picture. > > The exception timestamp is about 2019-06-16 13:42:28. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12889) Job keeps in FAILING state
[ https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16872933#comment-16872933 ] zhijiang commented on FLINK-12889: -- My previous analysis in ML was as follows: Actually all the five "Source: ServiceLog" tasks are not in terminal state on JM view, the relevant processes shown in below: * The checkpoint in task causes OOM issue which would call `Task#failExternally` as a result, we could see the log "Attempting to fail task externally" in tm. * The source task would transform state from RUNNING to FAILED and then starts a canceler thread for canceling task, we could see log "Triggering cancellation of task" in tm. * When JM starts to cancel the source tasks, the rpc call `Task#cancelExecution` would find the task was already in FAILED state as above step 2, we could see log "Attempting to cancel task" in tm. At last all the five source tasks are not in terminal states from jm log, I guess the step 2 might not create canceler thread successfully, because the root failover was caused by OOM during creating native thread in step1, so it might exist possibilities that createing canceler thread is not successful as well in OOM case which is unstable. If so, the source task would not been interrupted at all, then it would not report to JM as well, but the state is already changed to FAILED before. For the other vertex tasks, it does not trigger `Task#failExternally` in step 1, and only receives the cancel rpc from JM in step 3. And I guess at this time later than the source period, the canceler thread could be created succesfully after some GCs, then these tasks could be canceled as reported to JM side. I think the key problem is under OOM case some behaviors are not within expectations, so it might bring problems. Maybe we should handle OOM error in extreme way like making TM exit to solve the potential issue. [~till.rohrmann] do you think it is worth fixing or have other concerns? > Job keeps in FAILING state > -- > > Key: FLINK-12889 > URL: https://issues.apache.org/jira/browse/FLINK-12889 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: Fan Xinpu >Priority: Blocker > Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, > taskmanager.log > > > There is a topology of 3 operator, such as, source, parser, and persist. > Occasionally, 5 subtasks of the source encounters exception and turns to > failed, at the same time, one subtask of the parser runs into exception and > turns to failed too. The jobmaster gets a message of the parser's failed. The > jobmaster then try to cancel all the subtask, most of the subtasks of the > three operator turns to canceled except the 5 subtasks of the source, because > the state of the 5 ones is already FAILED before jobmaster try to cancel it. > Then the jobmaster can not reach a final state but keeps in Failing state > meanwhile the subtask of the source kees in canceling state. > > The job run on a flink 1.7 cluster on yarn, and there is only one tm with 10 > slots. > > The attached files contains a jm log , tm log and the ui picture. > > The exception timestamp is about 2019-06-16 13:42:28. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12909) add try catch when find a unique file name for the spilling channel
[ https://issues.apache.org/jira/browse/FLINK-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16871942#comment-16871942 ] zhijiang commented on FLINK-12909: -- Also you should connect the PR #7191 to this Jira ticket [~xymaqingxiang]. > add try catch when find a unique file name for the spilling channel > --- > > Key: FLINK-12909 > URL: https://issues.apache.org/jira/browse/FLINK-12909 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Reporter: xymaqingxiang >Priority: Major > > h2. What is the purpose of the change > Catch exceptions thrown due to disk loss, try to find a unique file name for > the spilling channel again. > Modify the createSpillingChannel() method of the > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer > class to solve this problem. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12909) add try catch when find a unique file name for the spilling channel
[ https://issues.apache.org/jira/browse/FLINK-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16871941#comment-16871941 ] zhijiang commented on FLINK-12909: -- The ticket does not describes the issue well. Any IOException thrown by `createNewFile` would make the attempt loop end to jump out, so the initial motivation was ignoring the intermediate exceptions to confirm all the attempts finish, otherwise the attempt logic seems useless. Also the new added warn log could show the specific failure reason for helping trace. It might be an option to wait some time for each attempt as Nico mentioned to enhance it, because in most cases the exceptions might still exist during these attempts, or we could remove the attempt loop directly to make it clear. > add try catch when find a unique file name for the spilling channel > --- > > Key: FLINK-12909 > URL: https://issues.apache.org/jira/browse/FLINK-12909 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Reporter: xymaqingxiang >Priority: Major > > h2. What is the purpose of the change > Catch exceptions thrown due to disk loss, try to find a unique file name for > the spilling channel again. > Modify the createSpillingChannel() method of the > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer > class to solve this problem. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12882) Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters
zhijiang created FLINK-12882: Summary: Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters Key: FLINK-12882 URL: https://issues.apache.org/jira/browse/FLINK-12882 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang The {{ExecutionAttemptID}} is only used for constructing {{ResultPartitionID}} during creating {{ResultPartitionWriter}}s in {{ShuffleEnvironment}}. Actually the {{ResultPartitionID}} could be got directly from {{ResultPartitionDeploymentDescriptor}} via {{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this field in the interface to make it simple. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12882) Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters
[ https://issues.apache.org/jira/browse/FLINK-12882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12882: - Description: The {{ExecutionAttemptID}} is only used for constructing {{ResultPartitionID}} during creating {{ResultPartitionWriters in ShuffleEnvironment}}. Actually the {{ResultPartitionID}} could be got directly from {{ResultPartitionDeploymentDescriptor}} via {{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this field in the interface to make it simple. was: The {{ExecutionAttemptID}} is only used for constructing {{ResultPartitionID}} during creating {{ResultPartitionWriter}}s in {{ShuffleEnvironment}}. Actually the {{ResultPartitionID}} could be got directly from {{ResultPartitionDeploymentDescriptor}} via {{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this field in the interface to make it simple. > Remove ExecutionAttemptID field from > ShuffleEnvironment#createResultPartitionWriters > > > Key: FLINK-12882 > URL: https://issues.apache.org/jira/browse/FLINK-12882 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The {{ExecutionAttemptID}} is only used for constructing > {{ResultPartitionID}} during creating {{ResultPartitionWriters in > ShuffleEnvironment}}. > Actually the {{ResultPartitionID}} could be got directly from > {{ResultPartitionDeploymentDescriptor}} via > {{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this > field in the interface to make it simple. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12843) Refactor the pin logic in ReleaseOnConsumptionResultPartition
[ https://issues.apache.org/jira/browse/FLINK-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12843: - Description: The pin logic is for adding the reference counter based on number of subpartitions for {{ReleaseOnConsumptionResultPartition}}. It seems not necessary to do it in while loop as now, because the atomic counter would not be accessed by other threads during pin. If the {{ReleaseOnConsumptionResultPartition}} was not created yet, the {{createSubpartitionView}} would not be called actually resulting in {{PartitionNotFoundException}}. So we could simple increase the reference counter in ReleaseOnConsumptionResultPartition constructor directly. was: The pin logic is for adding the reference counter based on number of subpartitions in {{ResultPartition}}. It seems not necessary to do it in while loop as now, because the atomic counter would not be accessed by other threads during pin. If the `ResultPartition` is not created yet, the {{ResultPartition#createSubpartitionView}} would not be called and it would response {{ResultPartitionNotFoundException}} in {{ResultPartitionManager}}. So we could simple increase the reference counter in {{ResultPartition}} constructor directly. > Refactor the pin logic in ReleaseOnConsumptionResultPartition > - > > Key: FLINK-12843 > URL: https://issues.apache.org/jira/browse/FLINK-12843 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The pin logic is for adding the reference counter based on number of > subpartitions for {{ReleaseOnConsumptionResultPartition}}. > It seems not necessary to do it in while loop as now, because the atomic > counter would not be accessed by other threads during pin. If the > {{ReleaseOnConsumptionResultPartition}} was not created yet, the > {{createSubpartitionView}} would not be called actually resulting in > {{PartitionNotFoundException}}. > So we could simple increase the reference counter in > ReleaseOnConsumptionResultPartition constructor directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12843) Refactor the pin logic in ReleaseOnConsumptionResultPartition
[ https://issues.apache.org/jira/browse/FLINK-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12843: - Summary: Refactor the pin logic in ReleaseOnConsumptionResultPartition (was: Refactor the pin logic in ResultPartition) > Refactor the pin logic in ReleaseOnConsumptionResultPartition > - > > Key: FLINK-12843 > URL: https://issues.apache.org/jira/browse/FLINK-12843 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The pin logic is for adding the reference counter based on number of > subpartitions in {{ResultPartition}}. It seems not necessary to do it in > while loop as now, because the atomic counter would not be accessed by other > threads during pin. If the `ResultPartition` is not created yet, the > {{ResultPartition#createSubpartitionView}} would not be called and it would > response {{ResultPartitionNotFoundException}} in {{ResultPartitionManager}}. > So we could simple increase the reference counter in {{ResultPartition}} > constructor directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863947#comment-16863947 ] zhijiang edited comment on FLINK-12070 at 6/14/19 11:09 AM: In Yingjie's testing based on session mode, the container is started with large memory by considering many slots per TM with less number of containers, instead of less num slots with more num containers. In common cases for per-job mode, the required container resource should be 1 cpu core with 4GB memory which might have 1GB network buffers then. I agree with the above assumption Stephan mentioned. We do not expect the previous SpillableSubpartition occupying network memory because it is difficult to be consumed multiple times and has many other issues. If the previous SpillableSubpartition is transformed to spilled mode because of limited network memory, then the performance comparing with new BlockingPartition might be same more or less. Even though comparing with pure memory mode in SpillableSubpartition, as long as we could confirm the new BlockingPartition implementation writes the data into OS cache and not waits for flushing onto disk, then the performance should also in the same level. I remembered the Berkeley DB provides three levels for writing as the tradeoff in performance and safety. The first is writing into process memory as success. The second is writing into OS cache as success. And the third is writing and flushing into disk as success. I am not sure if it exists the similar mechanism to take the second way in our case. I would also take a look at the alternative ways Stephan provides. was (Author: zjwang): In Yingjie's testing based on session mode, the container is started with large memory by considering many slots per TM with less number of containers, instead of less num slots with more num containers. In common cases for per-job mode, the required container resource should be 1 cpu core with 4GB memory which might have 1GB network buffers then. I agree with the above assumption Stephan mentioned. We do not expect the previous SpillableSubpartition occupying network memory because it is difficult to be consumed multiple times and has many other issues. If the previous SpillableSubpartition is transformed to spilled mode because of limited network memory, then the performance comparing with new BlockingPartition might be same more or less. Even though comparing with pure memory mode in SpillableSubpartition, as long as we could confirm the new BlockingPartition implementation writes the data into OS cache and not waits for flushing onto disk, then the performance should also in the same level. I remembered the Berkeley DB provides three levels for writing __ as the tradeoff in performance and safety. The first is writing into process memory as success. The second is writing into OS cache as success. And the third is writing and flushing into disk as success. I am not sure if it exists the similar mechanism to take the second way in our case. I would also take a look at the alternative ways Stephan provides. > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Stephan Ewen >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: image-2019-04-18-17-38-24-949.png > > Time Spent: 20m > Remaining Estimate: 0h > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863947#comment-16863947 ] zhijiang commented on FLINK-12070: -- In Yingjie's testing based on session mode, the container is started with large memory by considering many slots per TM with less number of containers, instead of less num slots with more num containers. In common cases for per-job mode, the required container resource should be 1 cpu core with 4GB memory which might have 1GB network buffers then. I agree with the above assumption Stephan mentioned. We do not expect the previous SpillableSubpartition occupying network memory because it is difficult to be consumed multiple times and has many other issues. If the previous SpillableSubpartition is transformed to spilled mode because of limited network memory, then the performance comparing with new BlockingPartition might be same more or less. Even though comparing with pure memory mode in SpillableSubpartition, as long as we could confirm the new BlockingPartition implementation writes the data into OS cache and not waits for flushing onto disk, then the performance should also in the same level. I remembered the Berkeley DB provides three levels for writing __ as the tradeoff in performance and safety. The first is writing into process memory as success. The second is writing into OS cache as success. And the third is writing and flushing into disk as success. I am not sure if it exists the similar mechanism to take the second way in our case. I would also take a look at the alternative ways Stephan provides. > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Stephan Ewen >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: image-2019-04-18-17-38-24-949.png > > Time Spent: 20m > Remaining Estimate: 0h > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12843) Refactor the pin logic in ResultPartition
zhijiang created FLINK-12843: Summary: Refactor the pin logic in ResultPartition Key: FLINK-12843 URL: https://issues.apache.org/jira/browse/FLINK-12843 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang The pin logic is for adding the reference counter based on number of subpartitions in {{ResultPartition}}. It seems not necessary to do it in while loop as now, because the atomic counter would not be accessed by other threads during pin. If the `ResultPartition` is not created yet, the {{ResultPartition#createSubpartitionView}} would not be called and it would response {{ResultPartitionNotFoundException}} in {{ResultPartitionManager}}. So we could simple increase the reference counter in {{ResultPartition}} constructor directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12842) Fix invalid check released state during ResultPartition#createSubpartitionView
zhijiang created FLINK-12842: Summary: Fix invalid check released state during ResultPartition#createSubpartitionView Key: FLINK-12842 URL: https://issues.apache.org/jira/browse/FLINK-12842 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently in {{ResultPartition#createSubpartitionView}} it would check whether this partition is released before creating view. But this check is based on {{refCnt != -1}} which seems invalid, because the reference counter would not always reflect the released state. In the case of {{ResultPartition#release/fail}}, the reference counter is not set to -1. Even if in the case of {{ResultPartition#onConsumedSubpartition}}, the reference counter seems also no chance to be -1. So we could check the real {{isReleased}} state during creating view instead of reference counter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12761) Fine grained resource management
[ https://issues.apache.org/jira/browse/FLINK-12761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12761: - Summary: Fine grained resource management (was: Fine grained resource managedment) > Fine grained resource management > > > Key: FLINK-12761 > URL: https://issues.apache.org/jira/browse/FLINK-12761 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Affects Versions: 1.8.0, 1.9.0 >Reporter: Tony Xintong Song >Assignee: Tony Xintong Song >Priority: Major > Labels: Umbrella > > This is an umbrella issue for enabling fine grained resource management in > Flink. > Fine grained resource management is a big topic that requires long term > efforts. There are many issues to be addressed and designing decisions to be > made, some of which may not be resolved in short time. Here we propose our > design and implementation plan for the upcoming release 1.9, as well as our > thoughts and ideas on the long term road map on this topic. > A practical short term target is to enable fine grained resource management > for batch sql jobs only in the upcoming Flink 1.9. This is necessary for > batch operators added from blink to achieve good performance. > Please find detailed design and implementation plan in attached docs. Any > comment and feedback are welcomed and appreciated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12738) Remove abstract getPageSize method from InputGate
[ https://issues.apache.org/jira/browse/FLINK-12738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12738: - Summary: Remove abstract getPageSize method from InputGate (was: Remove abstract getPageSize from InputGate) > Remove abstract getPageSize method from InputGate > - > > Key: FLINK-12738 > URL: https://issues.apache.org/jira/browse/FLINK-12738 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > Currently {{InputGate#getPageSize}} is only used for constructing > {{BarrierBuffer}}. In order to make abstract InputGate simple and clean, we > should remove unnecessary abstract methods from it. > Considering the page size could be parsed directly from configuration which > could also visible while constructing {{BarrierBuffer}}, so it is reasonable > to do so. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12740) SpillableSubpartitionTest deadlocks on Travis
[ https://issues.apache.org/jira/browse/FLINK-12740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16856643#comment-16856643 ] zhijiang commented on FLINK-12740: -- Yes, the new added uint test for FLINK-12544 might be not stable. It runs always successful in the first time on my side. But if it runs repeatedly in a loop, it would stuck at the third time. I would double check this test and might submit a fix later today. > SpillableSubpartitionTest deadlocks on Travis > - > > Key: FLINK-12740 > URL: https://issues.apache.org/jira/browse/FLINK-12740 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.8.1 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Major > Fix For: 1.8.1 > > > https://travis-ci.org/apache/flink/jobs/541225542 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12740) SpillableSubpartitionTest deadlocks on Travis
[ https://issues.apache.org/jira/browse/FLINK-12740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang reassigned FLINK-12740: Assignee: zhijiang > SpillableSubpartitionTest deadlocks on Travis > - > > Key: FLINK-12740 > URL: https://issues.apache.org/jira/browse/FLINK-12740 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.8.1 >Reporter: Chesnay Schepler >Assignee: zhijiang >Priority: Major > Fix For: 1.8.1 > > > https://travis-ci.org/apache/flink/jobs/541225542 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12738) Remove abstract getPageSize from InputGate
zhijiang created FLINK-12738: Summary: Remove abstract getPageSize from InputGate Key: FLINK-12738 URL: https://issues.apache.org/jira/browse/FLINK-12738 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently {{InputGate#getPageSize}} is only used for constructing {{BarrierBuffer}}. In order to make abstract InputGate simple and clean, we should remove unnecessary abstract methods from it. Considering the page size could be parsed directly from configuration which could also visible while constructing {{BarrierBuffer}}, so it is reasonable to do so. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12735) Make shuffle environment implementation independent with IOManager
zhijiang created FLINK-12735: Summary: Make shuffle environment implementation independent with IOManager Key: FLINK-12735 URL: https://issues.apache.org/jira/browse/FLINK-12735 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang The current creation of {{NetworkEnvironment}} is relying on {{IOManager}} from {{TaskManagerServices}}. In order not to rely on external specific components for implementing shuffle environment, and let the specific implementation creates internal components if required. The current abstract {{IOManager}} has two roles, one is for file channel management based on temp directories configuration, and the other is providing abstract methods for reading/writing files. We could further extract the file channel management as a separate internal class which could be reused for all the required components, like current {{BoundedBlockingSubpartition}}. To do so, the shuffle environment should also creates its internal channel manager to break dependency with {{IOManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11604) Extend the necessary methods in ResultPartitionWriter interface
[ https://issues.apache.org/jira/browse/FLINK-11604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang closed FLINK-11604. Resolution: Duplicate Already done in FLINK-12570 > Extend the necessary methods in ResultPartitionWriter interface > --- > > Key: FLINK-11604 > URL: https://issues.apache.org/jira/browse/FLINK-11604 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > This is a preparation work for future creating {{ResultPartitionWriter}} via > proposed {{ShuffleService}}. > Currently there exists only one {{ResultPartition}} implementation for > {{ResultPartitionWriter}} interface, so the specific {{ResultPartition}} > instance is easily referenced in many other classes such as {{Task}}, > {{NetworkEnvironment}}, etc. Even some private methods in {{ResultPartition}} > would be called directly in these reference classes. > Considering {{ShuffleService}} might create multiple different > {{ResultPartitionWriter}} implementations future, then all the other classes > should only reference with the interface and call the common methods. > Therefore we extend the related methods in {{ResultPartitionWriter}} > interface in order to cover existing logics in {{ResultPartition}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12544) Deadlock while releasing memory and requesting segment concurrent in SpillableSubpartition
[ https://issues.apache.org/jira/browse/FLINK-12544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16855696#comment-16855696 ] zhijiang commented on FLINK-12544: -- Merged into release-1.8 : 61fa1005adc61b6fc3439fdd127e5f25adea > Deadlock while releasing memory and requesting segment concurrent in > SpillableSubpartition > -- > > Key: FLINK-12544 > URL: https://issues.apache.org/jira/browse/FLINK-12544 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.8.1 > > Time Spent: 20m > Remaining Estimate: 0h > > It is reported by flink user, and the original jstack is as following: > > {code:java} > // "CoGroup (2/2)": > at > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213) > - waiting to lock <0x00062bf859b8> (a java.lang.Object) > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) > at java.lang.Thread.run(Thread.java:745) > "CoGroup (1/2)": > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277) > - waiting to lock <0x00063fdf4888> (a > java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147) > at > org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239) > - locked <0x00063fdf4ac8> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297) > - locked <0x00063c785350> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259) > at > org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272) > at > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224) > - locked <0x00062bf859b8> (a java.lang.Object) > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) > at java.lang.Thread.run(Thread.java:745) > "DataSource (1/1)": > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227) > - waiting to lock <0x00063fdf4ac8> (a > java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257) > - locked <0x00063fdf4888> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) >
[jira] [Closed] (FLINK-12544) Deadlock while releasing memory and requesting segment concurrent in SpillableSubpartition
[ https://issues.apache.org/jira/browse/FLINK-12544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang closed FLINK-12544. Resolution: Fixed Fix Version/s: 1.8.1 > Deadlock while releasing memory and requesting segment concurrent in > SpillableSubpartition > -- > > Key: FLINK-12544 > URL: https://issues.apache.org/jira/browse/FLINK-12544 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.8.1 > > Time Spent: 20m > Remaining Estimate: 0h > > It is reported by flink user, and the original jstack is as following: > > {code:java} > // "CoGroup (2/2)": > at > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213) > - waiting to lock <0x00062bf859b8> (a java.lang.Object) > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) > at java.lang.Thread.run(Thread.java:745) > "CoGroup (1/2)": > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277) > - waiting to lock <0x00063fdf4888> (a > java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147) > at > org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239) > - locked <0x00063fdf4ac8> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297) > - locked <0x00063c785350> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259) > at > org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272) > at > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224) > - locked <0x00062bf859b8> (a java.lang.Object) > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) > at java.lang.Thread.run(Thread.java:745) > "DataSource (1/1)": > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227) > - waiting to lock <0x00063fdf4ac8> (a > java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257) > - locked <0x00063fdf4888> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingColl
[jira] [Commented] (FLINK-12201) Introduce InputGateWithMetrics in Task to increment numBytesIn metric
[ https://issues.apache.org/jira/browse/FLINK-12201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1684#comment-1684 ] zhijiang commented on FLINK-12201: -- master : 3351ca11a366713328a6d0cb0e87100cb8c24200 > Introduce InputGateWithMetrics in Task to increment numBytesIn metric > - > > Key: FLINK-12201 > URL: https://issues.apache.org/jira/browse/FLINK-12201 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Andrey Zagrebin >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Incrementing of numBytesIn metric in SingleInputGate does not depend on > shuffle service and can be moved out of network internals into Task. Task > could wrap InputGate provided by ShuffleService with InputGateWithMetrics > which would increment numBytesIn metric. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12201) Introduce InputGateWithMetrics in Task to increment numBytesIn metric
[ https://issues.apache.org/jira/browse/FLINK-12201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang closed FLINK-12201. Resolution: Fixed > Introduce InputGateWithMetrics in Task to increment numBytesIn metric > - > > Key: FLINK-12201 > URL: https://issues.apache.org/jira/browse/FLINK-12201 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Andrey Zagrebin >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Incrementing of numBytesIn metric in SingleInputGate does not depend on > shuffle service and can be moved out of network internals into Task. Task > could wrap InputGate provided by ShuffleService with InputGateWithMetrics > which would increment numBytesIn metric. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16855252#comment-16855252 ] zhijiang edited comment on FLINK-12070 at 6/4/19 2:59 AM: -- Agree with Stephan and Stefan's opinions. We could add the batch blocking case for micro benchmark, then it could verify the performance changes for every merged commit for flink-1.9. I also think it is no need to write to mapped region as now. We actually do not confirm the mmap regions would be consumed immediately by downstream side after producer finishes, because it is up to scheduler decision and whether it has enough resource to schedule consumers. It is necessary to maintain different ways for reading files. Based on my previous lucene index experience, it also provides three ways for reading index files. * Files.newByteChannel for simple way. * Java nio FileChannel way. * Mmap way for large files in 64bit system, and with more free physical memory for mmap as Stefan mentioned. Then we could compare the behaviors for different ways and also provide more choices for users. was (Author: zjwang): Agree with Stephan and Stefan's opinions. We could add the batch blocking case for micro benchmark, then it could verify the performance changes for every merged commit for flink-1.9. I also think it is no need to write to mapped region as now. We actually do not confirm the mmap regions would be consumed immediately by downstream side after producer finishes, becuase it is up to scheduler decision and whether it has enough resource to schedule consumers. It is necessary to maintain different ways for reading files. Based on my previous lucene index experience, it also provides three ways for reading index files. * Files.newByteChannel for simple way. * Java nio FileChannel way. * Mmap way for large files in 64bit system, and with more free physical memory for mmap as Stefan mentioned. Then we could compare the behaviors for different ways and also provide more choices for users. > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Stephan Ewen >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: image-2019-04-18-17-38-24-949.png > > Time Spent: 20m > Remaining Estimate: 0h > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16855252#comment-16855252 ] zhijiang edited comment on FLINK-12070 at 6/4/19 2:58 AM: -- Agree with Stephan and Stefan's opinions. We could add the batch blocking case for micro benchmark, then it could verify the performance changes for every merged commit for flink-1.9. I also think it is no need to write to mapped region as now. We actually do not confirm the mmap regions would be consumed immediately by downstream side after producer finishes, becuase it is up to scheduler decision and whether it has enough resource to schedule consumers. It is necessary to maintain different ways for reading files. Based on my previous lucene index experience, it also provides three ways for reading index files. * Files.newByteChannel for simple way. * Java nio FileChannel way. * Mmap way for large files in 64bit system, and with more free physical memory for mmap as Stefan mentioned. Then we could compare the behaviors for different ways and also provide more choices for users. was (Author: zjwang): Agree with Stephan and Stefan's opinions. We could add the batch blocking case for micro benchmark, then it could verify the performance changes for every merged commit for flink-1.9. I also think it is no need to write to mapped region as now, because we actually do not confirm the mmap regions would be consumed immediately by downstream side after producer finishes, and it is up to scheduler decision and whether it has enough resource to schedule consumers. It is necessary to maintain different ways for reading files. Based on my previous lucene index experience, it also provides three ways for reading index files. * Files.newByteChannel for simple way. * Java nio FileChannel way. * Mmap way for large files in 64bit system, and with more free physical memory for mmap as Stefan mentioned. Then we could compare the behaviors for different ways and also provide more choices for users. > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Stephan Ewen >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: image-2019-04-18-17-38-24-949.png > > Time Spent: 20m > Remaining Estimate: 0h > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16855252#comment-16855252 ] zhijiang commented on FLINK-12070: -- Agree with Stephan and Stefan's opinions. We could add the batch blocking case for micro benchmark, then it could verify the performance changes for every merged commit for flink-1.9. I also think it is no need to write to mapped region as now, because we actually do not confirm the mmap regions would be consumed immediately by downstream side after producer finishes, and it is up to scheduler decision and whether it has enough resource to schedule consumers. It is necessary to maintain different ways for reading files. Based on my previous lucene index experience, it also provides three ways for reading index files. * Files.newByteChannel for simple way. * Java nio FileChannel way. * Mmap way for large files in 64bit system, and with more free physical memory for mmap as Stefan mentioned. Then we could compare the behaviors for different ways and also provide more choices for users. > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Stephan Ewen >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: image-2019-04-18-17-38-24-949.png > > Time Spent: 20m > Remaining Estimate: 0h > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12603) Remove getOwningTaskName method from InputGate
[ https://issues.apache.org/jira/browse/FLINK-12603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12603: - Summary: Remove getOwningTaskName method from InputGate (was: Refactor InputGate interface to remove unnecessary methods) > Remove getOwningTaskName method from InputGate > -- > > Key: FLINK-12603 > URL: https://issues.apache.org/jira/browse/FLINK-12603 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In order to make abstract InputGate simple for extending new implementations > in shuffle service architecture, we could remove unnecessary methods from it. > Currently InputGate#getOwningTaskName is only used for debugging log in > BarrierBuffer and StreamInputProcessor. This task name could be got from > Environment#getTaskInfo and then be passed into the constructor of > BarrierBuffer/StreamInputProcessor for use. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12603) Refactor InputGate interface to remove unnecessary methods
[ https://issues.apache.org/jira/browse/FLINK-12603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12603: - Description: In order to make abstract InputGate simple for extending new implementations in shuffle service architecture, we could remove unnecessary methods from it. Currently InputGate#getOwningTaskName is only used for debugging log in BarrierBuffer and StreamInputProcessor. This task name could be got from Environment#getTaskInfo and then be passed into the constructor of BarrierBuffer/StreamInputProcessor for use. was: In order to make abstract InputGate simple for extending new implementations in shuffle service architecture, we could remove unnecessary methods from it. Currently InputGate#getOwningTaskName is only used for debugging log in BarrierBuffer and StreamInputProcessor. This task name could be got from Environment#getTaskInfo and then be passed into the constructor of BarrierBuffer/StreamInputProcessor for use. InputGate#getPageSize is only used for BarrierBuffer constructor. We could remove this method and the page size could also be got form task manager configuration instead. > Refactor InputGate interface to remove unnecessary methods > -- > > Key: FLINK-12603 > URL: https://issues.apache.org/jira/browse/FLINK-12603 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In order to make abstract InputGate simple for extending new implementations > in shuffle service architecture, we could remove unnecessary methods from it. > Currently InputGate#getOwningTaskName is only used for debugging log in > BarrierBuffer and StreamInputProcessor. This task name could be got from > Environment#getTaskInfo and then be passed into the constructor of > BarrierBuffer/StreamInputProcessor for use. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12642) OutputBufferPoolUsageGauge can fail with NPE
[ https://issues.apache.org/jira/browse/FLINK-12642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16851462#comment-16851462 ] zhijiang commented on FLINK-12642: -- Yes, that is the ideal way which we confirmed before to make creation and setup together. If the task thread model is already ready, we could try this way. > OutputBufferPoolUsageGauge can fail with NPE > > > Key: FLINK-12642 > URL: https://issues.apache.org/jira/browse/FLINK-12642 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics, Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Priority: Critical > Fix For: 1.9.0 > > > The result partition metrics are initialized before > {{ResultPartitiion#setup}} was called. If a reporter tries to access a > {{OutputBufferPoolUsageGauge}} in between it will fail with an NPE since the > bufferpool of the partition is still null. > {code} > 2019-05-27 14:49:47,031 WARN > org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while > reporting metrics > java.lang.NullPointerException > at > org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.getValue(OutputBufferPoolUsageGauge.java:41) > at > org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.getValue(OutputBufferPoolUsageGauge.java:27) > at > org.apache.flink.metrics.slf4j.Slf4jReporter.tryReport(Slf4jReporter.java:114) > at > org.apache.flink.metrics.slf4j.Slf4jReporter.report(Slf4jReporter.java:80) > at > org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:436) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514) > at > java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:300) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) > at java.base/java.lang.Thread.run(Thread.java:844) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12544) Deadlock while releasing memory and requesting segment concurrent in SpillableSubpartition
[ https://issues.apache.org/jira/browse/FLINK-12544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12544: - Description: It is reported by flink user, and the original jstack is as following: {code:java} // "CoGroup (2/2)": at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213) - waiting to lock <0x00062bf859b8> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) at java.lang.Thread.run(Thread.java:745) "CoGroup (1/2)": at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277) - waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147) at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239) - locked <0x00063fdf4ac8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297) - locked <0x00063c785350> (a java.lang.Object) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259) at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272) at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224) - locked <0x00062bf859b8> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) at java.lang.Thread.run(Thread.java:745) "DataSource (1/1)": at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227) - waiting to lock <0x00063fdf4ac8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257) - locked <0x00063fdf4888> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) {code} Based on the above stack, it happens in the following scenario: * taskA: emit -> requestBufferBuilder -> synchronized in LocalBufferPool -> SpillableSubpartition#releaseMemory -> wait for synchronized in SpillableSubpartition * submit TaskB: trigger taskA releaseMemory -> SpillableSubpartition#releaseMemory -> synchronized in SpillableSubpartition -> SpillableSubpartition#spillFinishedBufferConsumers -> bufferConsumer#close -> LocalBufferPool#recycle -> wait for synchronized in LocalBufferPo
[jira] [Updated] (FLINK-12544) Deadlock while releasing memory and requesting segment concurrent in SpillableSubpartition
[ https://issues.apache.org/jira/browse/FLINK-12544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12544: - Summary: Deadlock while releasing memory and requesting segment concurrent in SpillableSubpartition (was: Deadlock during releasing memory in SpillableSubpartition) > Deadlock while releasing memory and requesting segment concurrent in > SpillableSubpartition > -- > > Key: FLINK-12544 > URL: https://issues.apache.org/jira/browse/FLINK-12544 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > It is reported by flink user, and the original jstack is as following: > > {code:java} > // "CoGroup (2/2)": > at > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213) > - waiting to lock <0x00062bf859b8> (a java.lang.Object) > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) > at java.lang.Thread.run(Thread.java:745) > "CoGroup (1/2)": > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277) > - waiting to lock <0x00063fdf4888> (a > java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147) > at > org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239) > - locked <0x00063fdf4ac8> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297) > - locked <0x00063c785350> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259) > at > org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272) > at > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224) > - locked <0x00062bf859b8> (a java.lang.Object) > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) > at java.lang.Thread.run(Thread.java:745) > "DataSource (1/1)": > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227) > - waiting to lock <0x00063fdf4ac8> (a > java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257) > - locked <0x00063fdf4888> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.
[jira] [Commented] (FLINK-12642) OutputBufferPoolUsageGauge can fail with NPE
[ https://issues.apache.org/jira/browse/FLINK-12642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16849508#comment-16849508 ] zhijiang commented on FLINK-12642: -- Thanks for reporting this [~Zentol]! I think we could check whether the `BufferPool` is null before calling detail getter methods in `OutputBufferPoolUsageGauge`. If you agree I could take it to submit the PR. > OutputBufferPoolUsageGauge can fail with NPE > > > Key: FLINK-12642 > URL: https://issues.apache.org/jira/browse/FLINK-12642 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics, Runtime / Network >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Priority: Critical > Fix For: 1.9.0 > > > The result partition metrics are initialized before > {{ResultPartitiion#setup}} was called. If a reporter tries to access a > {{OutputBufferPoolUsageGauge}} in between it will fail with an NPE since the > bufferpool of the partition is still null. > {code} > 2019-05-27 14:49:47,031 WARN > org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while > reporting metrics > java.lang.NullPointerException > at > org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.getValue(OutputBufferPoolUsageGauge.java:41) > at > org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.getValue(OutputBufferPoolUsageGauge.java:27) > at > org.apache.flink.metrics.slf4j.Slf4jReporter.tryReport(Slf4jReporter.java:114) > at > org.apache.flink.metrics.slf4j.Slf4jReporter.report(Slf4jReporter.java:80) > at > org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:436) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514) > at > java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:300) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) > at java.base/java.lang.Thread.run(Thread.java:844) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12630) Refactor abstract InputGate to general interface
zhijiang created FLINK-12630: Summary: Refactor abstract InputGate to general interface Key: FLINK-12630 URL: https://issues.apache.org/jira/browse/FLINK-12630 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang `InputGate` is currently defined as an abstract class which extracts the common codes for checking data availability for subclasses `SingleInputGate` and `UnionInputGate`, but it might bring limits for further extending `InputGate` implementations in shuffle service architecture. `SingleInputGate` is created from shuffle service so it belongs to the scope of shuffle service, while `UnionInputGate` is a wrapper of some `SingleInputGate`s so it should be in the task/processor stack. In order to make a new `InputGate` implementation from another new shuffle service could be directly pitched in, we should define a more clean `InputGate` interface to decouple the implementation of checking data available logic. In detail we could define the `isAvailable` method in `InputGate` interface and extract the current implementation as a separate class `FutureBasedAvailability` which could still be extent and reused for both `SingleInputGate` and `UnionInputGate`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12201) Introduce InputGateWithMetrics in Task to increment numBytesIn metric
[ https://issues.apache.org/jira/browse/FLINK-12201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12201: - Description: Incrementing of numBytesIn metric in SingleInputGate does not depend on shuffle service and can be moved out of network internals into Task. Task could wrap InputGate provided by ShuffleService with InputGateWithMetrics which would increment numBytesIn metric. (was: Incrementing of in/out byte metrics in local/remote input channel does not depend on shuffle service and can be moved out of network internals into Task. Task could wrap InputGate provided by ShuffleService with InputGateWithMetrics which would increment in/out byte metrics.) > Introduce InputGateWithMetrics in Task to increment numBytesIn metric > - > > Key: FLINK-12201 > URL: https://issues.apache.org/jira/browse/FLINK-12201 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Andrey Zagrebin >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Incrementing of numBytesIn metric in SingleInputGate does not depend on > shuffle service and can be moved out of network internals into Task. Task > could wrap InputGate provided by ShuffleService with InputGateWithMetrics > which would increment numBytesIn metric. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12201) Introduce InputGateWithMetrics in Task to increment numBytesIn metric
[ https://issues.apache.org/jira/browse/FLINK-12201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12201: - Summary: Introduce InputGateWithMetrics in Task to increment numBytesIn metric (was: Introduce InputGateWithMetrics in Task to increment in/out byte metrics) > Introduce InputGateWithMetrics in Task to increment numBytesIn metric > - > > Key: FLINK-12201 > URL: https://issues.apache.org/jira/browse/FLINK-12201 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Andrey Zagrebin >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Incrementing of in/out byte metrics in local/remote input channel does not > depend on shuffle service and can be moved out of network internals into > Task. Task could wrap InputGate provided by ShuffleService with > InputGateWithMetrics which would increment in/out byte metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12603) Refactor InputGate interface to remove unnecessary methods
[ https://issues.apache.org/jira/browse/FLINK-12603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12603: - Description: In order to make abstract InputGate simple for extending new implementations in shuffle service architecture, we could remove unnecessary methods from it. Currently InputGate#getOwningTaskName is only used for debugging log in BarrierBuffer and StreamInputProcessor. This task name could be got from Environment#getTaskInfo and then be passed into the constructor of BarrierBuffer/StreamInputProcessor for use. InputGate#getPageSize is only used for BarrierBuffer constructor. We could remove this method and the page size could also be got form task manager configuration instead. was: Current `InputGate#getOwningTaskName` is only used for logging in related components such as `BarrierBuffer`, `StreamInputProcessor`, etc. We could put this name in the structure of `TaskInfo`, then the related components could get task name directly from `RuntimeEnvironment#getTaskInfo`. To do so, we could simplify the interface of `InputGate`. > Refactor InputGate interface to remove unnecessary methods > -- > > Key: FLINK-12603 > URL: https://issues.apache.org/jira/browse/FLINK-12603 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In order to make abstract InputGate simple for extending new implementations > in shuffle service architecture, we could remove unnecessary methods from it. > Currently InputGate#getOwningTaskName is only used for debugging log in > BarrierBuffer and StreamInputProcessor. This task name could be got from > Environment#getTaskInfo and then be passed into the constructor of > BarrierBuffer/StreamInputProcessor for use. > InputGate#getPageSize is only used for BarrierBuffer constructor. We could > remove this method and the page size could also be got form task manager > configuration instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12603) Refactor InputGate interface to remove unnecessary methods
[ https://issues.apache.org/jira/browse/FLINK-12603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12603: - Summary: Refactor InputGate interface to remove unnecessary methods (was: Remove getOwningTaskName method from InputGate) > Refactor InputGate interface to remove unnecessary methods > -- > > Key: FLINK-12603 > URL: https://issues.apache.org/jira/browse/FLINK-12603 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Current `InputGate#getOwningTaskName` is only used for logging in related > components such as `BarrierBuffer`, `StreamInputProcessor`, etc. We could put > this name in the structure of `TaskInfo`, then the related components could > get task name directly from `RuntimeEnvironment#getTaskInfo`. > To do so, we could simplify the interface of `InputGate`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12603) Remove getOwningTaskName method from InputGate
zhijiang created FLINK-12603: Summary: Remove getOwningTaskName method from InputGate Key: FLINK-12603 URL: https://issues.apache.org/jira/browse/FLINK-12603 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Current `InputGate#getOwningTaskName` is only used for logging in related components such as `BarrierBuffer`, `StreamInputProcessor`, etc. We could put this name in the structure of `TaskInfo`, then the related components could get task name directly from `RuntimeEnvironment#getTaskInfo`. To do so, we could simplify the interface of `InputGate`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-6227) Introduce the abstract PartitionException for downstream task failure
[ https://issues.apache.org/jira/browse/FLINK-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-6227: Summary: Introduce the abstract PartitionException for downstream task failure (was: Introduce the DataConsumptionException for downstream task failure) > Introduce the abstract PartitionException for downstream task failure > - > > Key: FLINK-6227 > URL: https://issues.apache.org/jira/browse/FLINK-6227 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > It is part of FLIP-1. > > We define a new abstract special exception called PartitionException to > indicate the downstream task failure in consuming upstream partition. This > special exception is reported to JobMaster as a hint for unavailable > partition data, and then JobMaster could track the upstream executions to > restart for reproducing the partition data. > We make current PartitionNotFoundException extend PartitionException, because > it is reasonable to restart producer task when the requested partition is > already released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-6227) Introduce the DataConsumptionException for downstream task failure
[ https://issues.apache.org/jira/browse/FLINK-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-6227: Description: It is part of FLIP-1. We define a new abstract special exception called PartitionException to indicate the downstream task failure in consuming upstream partition. This special exception is reported to JobMaster as a hint for unavailable partition data, and then JobMaster could track the upstream executions to restart for reproducing the partition data. We make current PartitionNotFoundException extend PartitionException, because it is reasonable to restart producer task when the requested partition is already released. was: It is part of FLIP-1. We define a new special exception to indicate the downstream task failure in consuming upstream data. The {{JobManager}} will receive and consider this special exception to calculate the minimum connected sub-graph which is called {{FailoverRegion}}. So the {{DataConsumptionException}} should contain {{ResultPartitionID}} information for {{JobMaster}} tracking upstream executions. > Introduce the DataConsumptionException for downstream task failure > -- > > Key: FLINK-6227 > URL: https://issues.apache.org/jira/browse/FLINK-6227 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > It is part of FLIP-1. > > We define a new abstract special exception called PartitionException to > indicate the downstream task failure in consuming upstream partition. This > special exception is reported to JobMaster as a hint for unavailable > partition data, and then JobMaster could track the upstream executions to > restart for reproducing the partition data. > We make current PartitionNotFoundException extend PartitionException, because > it is reasonable to restart producer task when the requested partition is > already released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12458) Introduce PartitionConnectionException for unreachable producer
[ https://issues.apache.org/jira/browse/FLINK-12458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12458: - Description: If the consumer can not establish a connection to remote task executor, which might indicate the remote task executor is not reachable. We could wrap this connection exception into new proposed `PartitionConnectionException` which also extends `PartitionException`, then the job master would decide whether to restart the upstream region to re-producer partition data. (was: If the consumer can not establish a connection to remote task executor, which might indicate the remote task executor is not reachable. We could wrap this connection exception into existing `PartitionNotFoundException`, then the job master would decide whether to restart the upstream region to re-producer partition data.) > Introduce PartitionConnectionException for unreachable producer > --- > > Key: FLINK-12458 > URL: https://issues.apache.org/jira/browse/FLINK-12458 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > If the consumer can not establish a connection to remote task executor, which > might indicate the remote task executor is not reachable. We could wrap this > connection exception into new proposed `PartitionConnectionException` which > also extends `PartitionException`, then the job master would decide whether > to restart the upstream region to re-producer partition data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12458) Introduce PartitionConnectionException for unreachable producer
[ https://issues.apache.org/jira/browse/FLINK-12458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12458: - Summary: Introduce PartitionConnectionException for unreachable producer (was: Throw PartitionNotFoundException if consumer can not establish a connection to remote TM) > Introduce PartitionConnectionException for unreachable producer > --- > > Key: FLINK-12458 > URL: https://issues.apache.org/jira/browse/FLINK-12458 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > If the consumer can not establish a connection to remote task executor, which > might indicate the remote task executor is not reachable. We could wrap this > connection exception into existing `PartitionNotFoundException`, then the job > master would decide whether to restart the upstream region to re-producer > partition data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12571) Make NetworkEnvironment#start() return the binded data port
zhijiang created FLINK-12571: Summary: Make NetworkEnvironment#start() return the binded data port Key: FLINK-12571 URL: https://issues.apache.org/jira/browse/FLINK-12571 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently `NetworkEnvironment#getConnectionManager()` is mainly used for `TaskManagerServices` for getting binded data port from `NettyConnectionManager`. Considering the `ConnectionManager` as an internal component of `NetworkEnvironment`, it should not be exposed for outsides. For other ShuffleService implementations, it might have no `ConnectionManager` at all. We could make `ShuffleService#start()` return the binded data port to replace the `getConnectionManager`. For the `LocalConnectionManager` or other shuffle service implementations which have no binded data port, it could return a simple default value and it would have no harm. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10653) Introduce Pluggable Shuffle Manager Architecture
[ https://issues.apache.org/jira/browse/FLINK-10653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16844642#comment-16844642 ] zhijiang commented on FLINK-10653: -- Thanks for the concerns. [~prudenko] Actually we ever also researched the RDMA shuffle manager in Spark and the performance seems really increased much. I think after the basic architecture of shuffle manager in flink is ready, it could be feasible for expanding other shuffle implementations like RDMA. We could keep in touch and see then. :) > Introduce Pluggable Shuffle Manager Architecture > > > Key: FLINK-10653 > URL: https://issues.apache.org/jira/browse/FLINK-10653 > Project: Flink > Issue Type: New Feature > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > This is the umbrella issue for improving shuffle architecture. > Shuffle is the process of data transfer between stages, which involves in > writing outputs on sender side and reading data on receiver side. In flink > implementation, it covers three parts of writer, transport layer and reader > separately which are uniformed for both streaming and batch jobs. > In detail, the current ResultPartitionWriter interface on upstream side only > supports in-memory outputs for streaming job and local persistent file > outputs for batch job. If we extend to implement another writer such as > DfsWriter, RdmaWriter, SortMergeWriter, etc based on ResultPartitionWriter > interface, it has not the unified mechanism to extend the reader side > accordingly. > In order to make the shuffle architecture more flexible and support more > scenarios especially for batch jobs, a high level shuffle architecture is > necessary to manage and extend both writer and reader sides together. > Refer to the design doc for more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12564) Remove getBufferProvider method from ResultPartitionWriter interface
zhijiang created FLINK-12564: Summary: Remove getBufferProvider method from ResultPartitionWriter interface Key: FLINK-12564 URL: https://issues.apache.org/jira/browse/FLINK-12564 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently `ResultPartitionWriter#getBufferProvider` is used for requesting `BufferBuilder` in `RecordWriter`, then the `BufferConsumer` created from `BufferBuilder` is added into `ResultPartitionWriter` via `addBufferConsumer` method. We could merge these two methods in `ResultPartitionWriter` in order not to expose `getBufferProvider`. `ResultPartitionWriter` could internally request `BufferBuilder` and add the created `BufferConsumer` into one sub partition, then return the `BufferBuilder` for `RecordWriter` writing serialized data. Since we also change the `ResultPartitionWriter#addBufferConsumer` to `ResultPartitionWriter#requestBufferBuilder`, then another new method `ResultPartitionWriter#broadcastEvents` should be introduced for handling the case of events. In future it might worth further abstracting the `ResultPartitionWriter` to be not only related to `BufferBuilder`. We could provide `writeRecord(int targetIndex)` to replace `requestBufferBuilder`, then the serialization process could be done inside specific `ResultPartitionWriter` instance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12544) Deadlock during releasing memory in SpillableSubpartition
zhijiang created FLINK-12544: Summary: Deadlock during releasing memory in SpillableSubpartition Key: FLINK-12544 URL: https://issues.apache.org/jira/browse/FLINK-12544 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang It is reported by flink user, and the original jstack is as following: {code:java} // "CoGroup (2/2)": at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213) - waiting to lock <0x00062bf859b8> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) at java.lang.Thread.run(Thread.java:745) "CoGroup (1/2)": at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277) - waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147) at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239) - locked <0x00063fdf4ac8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297) - locked <0x00063c785350> (a java.lang.Object) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259) at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272) at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224) - locked <0x00062bf859b8> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) at java.lang.Thread.run(Thread.java:745) "DataSource (1/1)": at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227) - waiting to lock <0x00063fdf4ac8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257) - locked <0x00063fdf4888> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) {code} Based on the analysis, it happens when one task A is trying to release sub partition memory, then it occupies the lock in `LocalBufferPool`, and trying to get the lock in `SpillSubpartition`. Meanwhile, another task B is submitted to TM to trigger previous task to release memory, then it would already occu
[jira] [Commented] (FLINK-12202) Consider introducing batch metric register in NetworkEnviroment
[ https://issues.apache.org/jira/browse/FLINK-12202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839963#comment-16839963 ] zhijiang commented on FLINK-12202: -- Hey [~azagrebin], as we confirmed before, this ticket is not needed because we would register metrics while creating batch of partitions/gates. So should we close this jira? > Consider introducing batch metric register in NetworkEnviroment > --- > > Key: FLINK-12202 > URL: https://issues.apache.org/jira/browse/FLINK-12202 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics, Runtime / Network >Reporter: Andrey Zagrebin >Assignee: zhijiang >Priority: Major > > As we have some network specific metrics registered in TaskIOMetricGroup > (In/OutputBuffersGauge, In/OutputBufferPoolUsageGauge), we can introduce > batch metric registering in > NetworkEnviroment.registerMetrics(ProxyMetricGroup, partitions, gates), where > task passes its TaskIOMetricGroup into ProxyMetricGroup. This way we could > break a tie between task and NetworkEnviroment. > TaskIOMetricGroup.initializeBufferMetrics, In/OutputBuffersGauge, > In/OutputBufferPoolUsageGauge could be moved into > NetworkEnviroment.registerMetrics and network code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12474) UnionInputGate should be notified when closing SingleInputGate by canceler thread
[ https://issues.apache.org/jira/browse/FLINK-12474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang closed FLINK-12474. Resolution: Invalid > UnionInputGate should be notified when closing SingleInputGate by canceler > thread > - > > Key: FLINK-12474 > URL: https://issues.apache.org/jira/browse/FLINK-12474 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Priority: Minor > > If task is being canceled, the `SingleInputGate` would be closed by canceler > thread. If the `SingleInputGate` is waiting for buffer, > `inputChannelWithData` would be notified to wake task thread to exit early. > But if the `UnionInputGate` is waiting for buffer, task thread is still > stucking in wait when `SingleInputGate` is closed until cancel timeout. > To make task exit early in this case, we could make `SingleInputGate` further > notify `UnionInputGate` after it is closed, then it could also wake task > thread to exit during canceling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12474) UnionInputGate should be notified when closing SingleInputGate by canceler thread
[ https://issues.apache.org/jira/browse/FLINK-12474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839298#comment-16839298 ] zhijiang commented on FLINK-12474: -- Yes, you are right after double checking the codes. Because the `SingleInputGate#close` would call `inputChannelsWithData.notifyAll()` which could wake task thread, but I did not find the similar logic in `UnionInputGate`, so it brought my doubts. The `executor.interrupt()` could handle this case, then it seems unnecessary to call `inputChannelsWithData.notifyAll()` in `SingleInputGate#close`. I should double review the whole process. Sorry for the disturbing and I will close this jira. > UnionInputGate should be notified when closing SingleInputGate by canceler > thread > - > > Key: FLINK-12474 > URL: https://issues.apache.org/jira/browse/FLINK-12474 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Priority: Minor > > If task is being canceled, the `SingleInputGate` would be closed by canceler > thread. If the `SingleInputGate` is waiting for buffer, > `inputChannelWithData` would be notified to wake task thread to exit early. > But if the `UnionInputGate` is waiting for buffer, task thread is still > stucking in wait when `SingleInputGate` is closed until cancel timeout. > To make task exit early in this case, we could make `SingleInputGate` further > notify `UnionInputGate` after it is closed, then it could also wake task > thread to exit during canceling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12474) UnionInputGate should be notified when closing SingleInputGate by canceler thread
[ https://issues.apache.org/jira/browse/FLINK-12474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839230#comment-16839230 ] zhijiang commented on FLINK-12474: -- [~pnowojski] thanks for the confirmation! Yes, the `SingleInputGate` would be notified by canceler thread after calling `SingleInputGate#close()`. But for the case of `UnionInputGate` if the task is blocked by `wait()` because of no available data, I think it has to be wait the canceling timeout or I missed other parts to unblock task thread. Anyway you could solve this issue in `CheckpointBarrierHandler`, then I could close this jira. :) > UnionInputGate should be notified when closing SingleInputGate by canceler > thread > - > > Key: FLINK-12474 > URL: https://issues.apache.org/jira/browse/FLINK-12474 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Priority: Minor > > If task is being canceled, the `SingleInputGate` would be closed by canceler > thread. If the `SingleInputGate` is waiting for buffer, > `inputChannelWithData` would be notified to wake task thread to exit early. > But if the `UnionInputGate` is waiting for buffer, task thread is still > stucking in wait when `SingleInputGate` is closed until cancel timeout. > To make task exit early in this case, we could make `SingleInputGate` further > notify `UnionInputGate` after it is closed, then it could also wake task > thread to exit during canceling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12497) Refactor the start method of ConnectionManager
[ https://issues.apache.org/jira/browse/FLINK-12497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12497: - Description: In current {{ConnectionManager#start(ResultPartitionProvider, TaskEventDispatcher)}}, the parameters in start are only reasonable for {{NettyConnectionManager}} implementation, reductant for {{LocalConnectionManager}}. We could put these parameters in the constructor of {{NettyConnectionManager}}, then {{ConnectionManager#start()}} would be more cleaner for both implementations. And it could also bring benefits for calling start in {{NetworkEnvironment}} which does not need to maintain private {{TaskEventDispatcher}} in future. was: In current `ConnectionManager#start(ResultPartitionProvider, TaskEventDispatcher)`, the parameters in start are only reasonable for `NettyConnectionManager` implementation, reductant for `LocalConnectionManager`. We could put these parameters in the constructor of `NettyConnectionManager`, then `ConnectionManager#start()` would be more cleaner for both implementations. And it also bring benefits for calling start in `NetworkEnvironment` which does not need to maintain private `TaskEventDispatcher`. > Refactor the start method of ConnectionManager > -- > > Key: FLINK-12497 > URL: https://issues.apache.org/jira/browse/FLINK-12497 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > In current {{ConnectionManager#start(ResultPartitionProvider, > TaskEventDispatcher)}}, the parameters in start are only reasonable for > {{NettyConnectionManager}} implementation, reductant for > {{LocalConnectionManager}}. > We could put these parameters in the constructor of > {{NettyConnectionManager}}, then {{ConnectionManager#start()}} would be more > cleaner for both implementations. And it could also bring benefits for > calling start in {{NetworkEnvironment}} which does not need to maintain > private {{TaskEventDispatcher}} in future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12497) Refactor the start method of ConnectionManager
zhijiang created FLINK-12497: Summary: Refactor the start method of ConnectionManager Key: FLINK-12497 URL: https://issues.apache.org/jira/browse/FLINK-12497 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang In current `ConnectionManager#start(ResultPartitionProvider, TaskEventDispatcher)`, the parameters in start are only reasonable for `NettyConnectionManager` implementation, reductant for `LocalConnectionManager`. We could put these parameters in the constructor of `NettyConnectionManager`, then `ConnectionManager#start()` would be more cleaner for both implementations. And it also bring benefits for calling start in `NetworkEnvironment` which does not need to maintain private `TaskEventDispatcher`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11403) Introduce ResultPartitionWithConsumableNotifier in task for notifying consumable result partition
[ https://issues.apache.org/jira/browse/FLINK-11403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-11403: - Summary: Introduce ResultPartitionWithConsumableNotifier in task for notifying consumable result partition (was: Remove ResultPartitionConsumableNotifier from ResultPartition) > Introduce ResultPartitionWithConsumableNotifier in task for notifying > consumable result partition > -- > > Key: FLINK-11403 > URL: https://issues.apache.org/jira/browse/FLINK-11403 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The creation of {{ResultPartitionWriter}} from {{NetworkEnvironment}} relies > on {{TaskAction}}, {{ResultPartitionConsumableNotifier}}. For breaking this > tie, {{ResultPartitionWithConsumableNotifier}} is introduced for wrapping the > logic of notification. > In this way the later interface method > {{ShuffleService#createResultPartitionWriter}} would be simple. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11403) Remove ResultPartitionConsumableNotifier from ResultPartition
[ https://issues.apache.org/jira/browse/FLINK-11403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-11403: - Description: The creation of {{ResultPartitionWriter}} from {{NetworkEnvironment}} relies on {{TaskAction}}, {{ResultPartitionConsumableNotifier}}. For breaking this tie, {{ResultPartitionWithConsumableNotifier}} is introduced for wrapping the logic of notification. In this way the later interface method {{ShuffleService#createResultPartitionWriter}} would be simple. was: This is the precondition for introducing pluggable {{ShuffleService}} on TM side. In current process of creating {{ResultPartition}}, the {{ResultPartitionConsumableNotifier}} regarded as TM level component has to be passed into the constructor. In order to create {{ResultPartition}} easily from {{ShuffleService}}, the required information should be covered by {{ResultPartitionDeploymentDescriptor}} as much as possible, then we could remove this notifier from the constructor. And it is also reasonable for notifying consumable partition via {{TaskActions}} which is already covered in {{ResultPartition}}. > Remove ResultPartitionConsumableNotifier from ResultPartition > - > > Key: FLINK-11403 > URL: https://issues.apache.org/jira/browse/FLINK-11403 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The creation of {{ResultPartitionWriter}} from {{NetworkEnvironment}} relies > on {{TaskAction}}, {{ResultPartitionConsumableNotifier}}. For breaking this > tie, {{ResultPartitionWithConsumableNotifier}} is introduced for wrapping the > logic of notification. > In this way the later interface method > {{ShuffleService#createResultPartitionWriter}} would be simple. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12474) UnionInputGate should be notified when closing SingleInputGate by canceler thread
[ https://issues.apache.org/jira/browse/FLINK-12474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836964#comment-16836964 ] zhijiang commented on FLINK-12474: -- [~pnowojski], do you think this jira is worth fixing or my understanding is right? > UnionInputGate should be notified when closing SingleInputGate by canceler > thread > - > > Key: FLINK-12474 > URL: https://issues.apache.org/jira/browse/FLINK-12474 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: zhijiang >Priority: Minor > > If task is being canceled, the `SingleInputGate` would be closed by canceler > thread. If the `SingleInputGate` is waiting for buffer, > `inputChannelWithData` would be notified to wake task thread to exit early. > But if the `UnionInputGate` is waiting for buffer, task thread is still > stucking in wait when `SingleInputGate` is closed until cancel timeout. > To make task exit early in this case, we could make `SingleInputGate` further > notify `UnionInputGate` after it is closed, then it could also wake task > thread to exit during canceling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12474) UnionInputGate should be notified when closing SingleInputGate by canceler thread
zhijiang created FLINK-12474: Summary: UnionInputGate should be notified when closing SingleInputGate by canceler thread Key: FLINK-12474 URL: https://issues.apache.org/jira/browse/FLINK-12474 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: zhijiang If task is being canceled, the `SingleInputGate` would be closed by canceler thread. If the `SingleInputGate` is waiting for buffer, `inputChannelWithData` would be notified to wake task thread to exit early. But if the `UnionInputGate` is waiting for buffer, task thread is still stucking in wait when `SingleInputGate` is closed until cancel timeout. To make task exit early in this case, we could make `SingleInputGate` further notify `UnionInputGate` after it is closed, then it could also wake task thread to exit during canceling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12458) Throw PartitionNotFoundException if consumer can not establish a connection to remote TM
zhijiang created FLINK-12458: Summary: Throw PartitionNotFoundException if consumer can not establish a connection to remote TM Key: FLINK-12458 URL: https://issues.apache.org/jira/browse/FLINK-12458 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang If the consumer can not establish a connection to remote task executor, which might indicate the remote task executor is not reachable. We could wrap this connection exception into existing `PartitionNotFoundException`, then the job master would decide whether to restart the upstream region to re-producer partition data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12331) Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zhijiang created FLINK-12331: Summary: Introduce partition/gate setup to decouple task registration with NetworkEnvironment Key: FLINK-12331 URL: https://issues.apache.org/jira/browse/FLINK-12331 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Fix For: 1.9.0 In order to decouple task with {{NetworkEnvironment}} completely, we introduce interface methods for {{InputGate#setup}} and {{ResultPartitionWriter#setup}}. Then the task could call the {{setup}} method for the created partitions/gates directly instead of calling current {{registerTask}} via {{NetworkEnvironment}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-6227) Introduce the DataConsumptionException for downstream task failure
[ https://issues.apache.org/jira/browse/FLINK-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-6227: Component/s: Runtime / Coordination > Introduce the DataConsumptionException for downstream task failure > -- > > Key: FLINK-6227 > URL: https://issues.apache.org/jira/browse/FLINK-6227 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > It is part of FLIP-1. > We define a new special exception to indicate the downstream task failure in > consuming upstream data. > The {{JobManager}} will receive and consider this special exception to > calculate the minimum connected sub-graph which is called {{FailoverRegion}}. > So the {{DataConsumptionException}} should contain {{ResultPartitionID}} > information for {{JobMaster}} tracking upstream executions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12213) Pass TaskManagerMetricGroup into constructor of NetworkEnvironment
zhijiang created FLINK-12213: Summary: Pass TaskManagerMetricGroup into constructor of NetworkEnvironment Key: FLINK-12213 URL: https://issues.apache.org/jira/browse/FLINK-12213 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Fix For: 1.9.0 At the moment {{NetworkEnvironment#getNetworkBufferPool}} is called to add network related {{MetricGroup}}. In order to simplify the public API in {{NetworkEnvironment}} which is regarded as default {{ShuffleService}} implementation, we could pass the {{TaskManagerMetricGroup}} into constructor of {{NetworkEnvironment}}, then the related network {{MetricGroup}} could be added internally. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16818619#comment-16818619 ] zhijiang edited comment on FLINK-12070 at 4/16/19 4:25 AM: --- Thanks for providing the POC for this feature [~StephanEwen] I really approve the ways of making {{spillableSubpartition}} simple write to persistent file directly and make use of OS cache efficiently via mmap. Although the previous behavior of intermediate memory state for {{SpillableSubpartition}} could bring performance benefits in small-scale data, it makes the logic more complex to maintain, even other tasks might be impacted when it evicts to disk file to release memory segment. And the partition data is not determined considering the FLIP-1 requirements. These points are also my previous concerns and planning to be focused. Considering the issue of caching intermediate results, we ever implemented the similar function in previous Blink version, but finally it is abandoned for some reasons. Actually this issue is valuable in production, because it can both satisfy the speed of memory shuffle like streaming pipelined way and also consider the failover requirements via asynchronous eviction to disk. The mmappartition might bring more possibilities for this issue. BTW, what is your plan for making this POC into practice? Do you need hands for supplementing the unit tests and verifying benchmarks? The partition release issue after consumption could be considered together in the proposal of partition lifecycle management. was (Author: zjwang): Thanks for providing the POC for this feature [~StephanEwen] I really approve the ways of making {{spillableSubpartition}} simple write to persistent file directly and make use of OS cache efficiently via mmap. Although the previous behavior of intermediate memory state for {{SpillableSubpartition}} could bring performance benefits in small-scale data, it makes the logic more complex to maintain, even other tasks might be impacted when it evicts to disk file to release memory segment. And the partition data is not determined considering the FLIP-1 requirements. These points are also my previous concerns and planning to be focused. Considering the issue of caching intermediate results, we ever implemented the similar function in previous Blink version, but finally it is abandoned for some reasons. Actually this issue is valuable in production, because it can both satisfy the speed of memory shuffle like streaming pipelined way and also consider the failover requirements via asynchronous eviction to disk. The mmappartition might bring more possibilities for this issue. BTW, what is your plan for making this POC into practice? Do you need hands for supplementing the unit tests and verifying benchmarks? The partition release issue after consumption could be considered together in the proposal of partition lifecycle management before. > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Till Rohrmann >Assignee: BoWang >Priority: Major > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16818619#comment-16818619 ] zhijiang edited comment on FLINK-12070 at 4/16/19 4:24 AM: --- Thanks for providing the POC for this feature [~StephanEwen] I really approve the ways of making {{spillableSubpartition}} simple write to persistent file directly and make use of OS cache efficiently via mmap. Although the previous behavior of intermediate memory state for {{SpillableSubpartition}} could bring performance benefits in small-scale data, it makes the logic more complex to maintain, even other tasks might be impacted when it evicts to disk file to release memory segment. And the partition data is not determined considering the FLIP-1 requirements. These points are also my previous concerns and planning to be focused. Considering the issue of caching intermediate results, we ever implemented the similar function in previous Blink version, but finally it is abandoned for some reasons. Actually this issue is valuable in production, because it can both satisfy the speed of memory shuffle like streaming pipelined way and also consider the failover requirements via asynchronous eviction to disk. The mmappartition might bring more possibilities for this issue. BTW, what is your plan for making this POC into practice? Do you need hands for supplementing the unit tests and verifying benchmarks? The partition release issue after consumption could be considered together in the proposal of partition lifecycle management before. was (Author: zjwang): Thanks for providing the POC for this feature [~StephanEwen] I really approve the ways of making {{spillableSubpartition}} simple write to persistent file directly and make use of OS cache efficiently via mmap. Although the previous behavior of intermediate memory state for {{SpillableSubpartition}} could bring performance benefits in small-scale data, it makes the logic more complex to maintain, even other tasks might be impacted when it evicts to disk file to release memory segment. And the partition data is not determined considering the FLIP-1 requirements. These points are also my previous concerns and planning to be focused. Considering the issue of caching intermediate results, we also implement the similar function in previous Blink version before, but finally it is abandoned for some reasons. Actually this issue is valuable in production, because it can both satisfy the speed of memory shuffle like streaming pipelined way and also consider the failover requirements via asynchronous eviction to disk. The mmappartition might bring more possibilities for this issue. BTW, what is your plan for making this POC into practice? Do you need hands for supplementing the unit tests and verifying benchmarks? The partition release issue after consumption could be considered together in the proposal of partition lifecycle management before. > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Till Rohrmann >Assignee: BoWang >Priority: Major > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16818619#comment-16818619 ] zhijiang commented on FLINK-12070: -- Thanks for providing the POC for this feature [~StephanEwen] I really approve the ways of making {{spillableSubpartition}} simple write to persistent file directly and make use of OS cache efficiently via mmap. Although the previous behavior of intermediate memory state for {{SpillableSubpartition}} could bring performance benefits in small-scale data, it makes the logic more complex to maintain, even other tasks might be impacted when it evicts to disk file to release memory segment. And the partition data is not determined considering the FLIP-1 requirements. These points are also my previous concerns and planning to be focused. Considering the issue of caching intermediate results, we also implement the similar function in previous Blink version before, but finally it is abandoned for some reasons. Actually this issue is valuable in production, because it can both satisfy the speed of memory shuffle like streaming pipelined way and also consider the failover requirements via asynchronous eviction to disk. The mmappartition might bring more possibilities for this issue. BTW, what is your plan for making this POC into practice? Do you need hands for supplementing the unit tests and verifying benchmarks? The partition release issue after consumption could be considered together in the proposal of partition lifecycle management before. > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Till Rohrmann >Assignee: BoWang >Priority: Major > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6227) Introduce the DataConsumptionException for downstream task failure
[ https://issues.apache.org/jira/browse/FLINK-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16818018#comment-16818018 ] zhijiang commented on FLINK-6227: - Ok, we would focus on the implementation soon. Also we could fully consider the related parts besides FLINK-10289. :) > Introduce the DataConsumptionException for downstream task failure > -- > > Key: FLINK-6227 > URL: https://issues.apache.org/jira/browse/FLINK-6227 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > It is part of FLIP-1. > We define a new special exception to indicate the downstream task failure in > consuming upstream data. > The {{JobManager}} will receive and consider this special exception to > calculate the minimum connected sub-graph which is called {{FailoverRegion}}. > So the {{DataConsumptionException}} should contain {{ResultPartitionID}} > information for {{JobMaster}} tracking upstream executions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12154) Remove legacy fields for SingleInputGate
zhijiang created FLINK-12154: Summary: Remove legacy fields for SingleInputGate Key: FLINK-12154 URL: https://issues.apache.org/jira/browse/FLINK-12154 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang In {{SingleInputGate#create}}, we could remove unused parameter {{ExecutionAttemptID}}. And for the constructor of {{SingleInputGate}}, we could remove unused parameter {{TaskIOMetricGroup}}. Then we introduce {{createSingleInputGate}} for reusing the process of creating {{SingleInputGate}} in related tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12146) Remove unregister task from NetworkEnvironment to simplify the interface of ShuffleService
zhijiang created FLINK-12146: Summary: Remove unregister task from NetworkEnvironment to simplify the interface of ShuffleService Key: FLINK-12146 URL: https://issues.apache.org/jira/browse/FLINK-12146 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang The current {{NetworkEnvironment}} would be the default {{ShuffleService}} implementation in task manager. In order to make the interface simple, we try to avoid more interactive with {{NetworkEnvironment}}. {{NetworkEnvironment#unregisterTask}} is used for closing partition/gate and releasing partition from {{ResultPartitionManager}}. partition/gate close could be done in task which already maintains the arrays of them. Further we could release partition from {{ResultPartitionManager}} inside {{ResultPartition}} via introducing {{ResultPartition#close(Throwable)}}. To do so, the {{NetworkEnvironment#unregisterTask}} could be totally replaced to remove. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6227) Introduce the DataConsumptionException for downstream task failure
[ https://issues.apache.org/jira/browse/FLINK-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813215#comment-16813215 ] zhijiang commented on FLINK-6227: - Exactly, this exception is received on consumer side via network stack and then is {{setError}} on specific input channel. When this input channel is read by task, it would {{checkError}} to interrupt task running. So the whole process would not be through operator/user code to change this special exception. > Introduce the DataConsumptionException for downstream task failure > -- > > Key: FLINK-6227 > URL: https://issues.apache.org/jira/browse/FLINK-6227 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > It is part of FLIP-1. > We define a new special exception to indicate the downstream task failure in > consuming upstream data. > The {{JobManager}} will receive and consider this special exception to > calculate the minimum connected sub-graph which is called {{FailoverRegion}}. > So the {{DataConsumptionException}} should contain {{ResultPartitionID}} > information for {{JobMaster}} tracking upstream executions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12127) Move network related options to NetworkEnvironmentOptions
zhijiang created FLINK-12127: Summary: Move network related options to NetworkEnvironmentOptions Key: FLINK-12127 URL: https://issues.apache.org/jira/browse/FLINK-12127 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Some network related options in TaskManagerOptions could be moved into new introduced `NetworkEnvironmentOptions` which would be used for different shuffle services. -- This message was sent by Atlassian JIRA (v7.6.3#76005)