[ 
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250770#comment-17250770
 ] 

zlzhang0122 edited comment on FLINK-20618 at 12/17/20, 3:21 AM:
----------------------------------------------------------------

> the other task run well

> > [~roman_khachatryan]  Could you explain what is this other task? Is it a 
> > downstream? Is it on the same TM?

Sorry, it's my bad. The other task there means the other subtasks of the same 
operator, the operator have many parallelism and one of it has been 
stucked.It's not on the same TM.

[~pnowojski] I understand that a backpressure task will be blocked for a while, 
but in my case, one of the subtasks has been blocked for several days and would 
blocked all the time unless I restart it.

As the picture shows, the subtask 61 has been blocked for about 2 days, and the 
Bytes sent and the Records send has never been increased while the other 
subtasks can be increase normal.

 

!2020-12-17 11-10-06 的屏幕截图.png!

 

 


was (Author: zlzhang0122):
> the other task run well

> > [~roman_khachatryan]  Could you explain what is this other task? Is it a 
> > downstream? Is it on the same TM?

Sorry, it's my bad. The other task there means the other subtasks of the same 
operator, the operator have many parallelism and one of it has been 
stucked.It's not on the same TM.

[~pnowojski] I understand that a backpressure task will be blocked for a while, 
but in my case, one of the subtasks has been blocked for several days and would 
blocked all the time unless I restart it.

As the picture shows, the subtask 61 has been blocked for about 2 days, and the 
Bytes sent and the Records send has never been increased.

 

!2020-12-17 11-10-06 的屏幕截图.png!

 

 

> Some of the source operator subtasks will stuck when flink job in critical 
> backpressure
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-20618
>                 URL: https://issues.apache.org/jira/browse/FLINK-20618
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.11.0, 1.10.2
>            Reporter: zlzhang0122
>            Priority: Critical
>         Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, 
> 2020-12-17 11-10-06 的屏幕截图.png
>
>
> In some critical backpressure situation, some of the subtasks of source will 
> blocked to request buffer because of the LocalBufferPool is full,so the whole 
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) 
> -> SourceConversion(table=[default_catalog.default_database.transfer_c5, 
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, 
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, 
> labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, 
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, 
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 
> os_prio=0 tid=0x00007f43d07e1800 nid=0x1b1c waiting on condition 
> [0x00007f43b8488000]
> java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for <0x00000000db234488> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
>     at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
>     at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
>     at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
>     at StreamExecCalc$33.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
>     at SourceConversion$4.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
>     at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>     - locked <0x00000000d8d50fa8> (a java.lang.Object)
>     at 
> org.apache.flink.streaming.connectors.talos.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:379)
>     - locked <0x00000000d8d50fa8> (a java.lang.Object)
>     at 
> org.apache.flink.streaming.connectors.talos.internals.TalosFetcher2.runFetchLoop(TalosFetcher2.java:249)
>     at 
> org.apache.flink.streaming.connectors.talos.FlinkTalosConsumerBase.run(FlinkTalosConsumerBase.java:758)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>  
>  
>  
> Source: TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl) -> 
> SourceConversion(table=[default_catalog.default_database.transfer_c5, source: 
> [TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, 
> metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl]) 
> -> Calc(select=[hash, timestamp, step, isCustomize, hostname, endpoint, 
> metric, dsType, orgId, idc, labels, pdl, () AS $12, (timestamp - ((timestamp 
> + 18000) MOD 86400)) AS $13]) (62/128) #108 prio=5 os_prio=0 
> tid=0x00007f44dc178000 nid=0x1332 waiting for monitor entry 
> [0x00007f443dfd8000]
> java.lang.Thread.State: BLOCKED (on object monitor)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:86)
>     - waiting to lock <0x00000000d8d50fa8> (a java.lang.Object)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>     at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to