[ https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250145#comment-17250145 ]
zlzhang0122 commented on FLINK-20618: ------------------------------------- We found this maybe related to the sequence number is not match the excepted sequence number, in the normal channel,the sequence number and excepted sequence number shows below. [^2020-12-16 11-53-42 的屏幕截图.png] [^2020-12-16 11-48-30 的屏幕截图.png] [^2020-12-16 11-49-01 的屏幕截图.png] but in the abnormal channel,it shows like this: [^2020-12-16 11-47-37 的屏幕截图.png] [^2020-12-16 11-48-30 的屏幕截图.png] > Some of the source operator subtasks will stuck when flink job in critical > backpressure > --------------------------------------------------------------------------------------- > > Key: FLINK-20618 > URL: https://issues.apache.org/jira/browse/FLINK-20618 > Project: Flink > Issue Type: Bug > Components: API / Core > Affects Versions: 1.11.0, 1.10.2 > Reporter: zlzhang0122 > Priority: Critical > Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 > 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png > > > In some critical backpressure situation, some of the subtasks of source will > blocked to request buffer because of the LocalBufferPool is full,so the whole > task will be stuck and the other task run well. > Bellow is the jstack trace: > > Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, > isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) > -> SourceConversion(table=[default_catalog.default_database.transfer_c5, > source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, > endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, > timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, > labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, > hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, > (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 > os_prio=0 tid=0x00007f43d07e1800 nid=0x1b1c waiting on condition > [0x00007f43b8488000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000000db234488> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at StreamExecCalc$33.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at SourceConversion$4.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > - locked <0x00000000d8d50fa8> (a java.lang.Object) > at > org.apache.flink.streaming.connectors.talos.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:379) > - locked <0x00000000d8d50fa8> (a java.lang.Object) > at > org.apache.flink.streaming.connectors.talos.internals.TalosFetcher2.runFetchLoop(TalosFetcher2.java:249) > at > org.apache.flink.streaming.connectors.talos.FlinkTalosConsumerBase.run(FlinkTalosConsumerBase.java:758) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) > > > > Source: TalosTableSource(hash, timestamp, step, isCustomize, hostname, > endpoint, metric, dsType, orgId, idc, labels, pdl) -> > SourceConversion(table=[default_catalog.default_database.transfer_c5, source: > [TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, > metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, timestamp, step, > isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl]) > -> Calc(select=[hash, timestamp, step, isCustomize, hostname, endpoint, > metric, dsType, orgId, idc, labels, pdl, () AS $12, (timestamp - ((timestamp > + 18000) MOD 86400)) AS $13]) (62/128) #108 prio=5 os_prio=0 > tid=0x00007f44dc178000 nid=0x1332 waiting for monitor entry > [0x00007f443dfd8000] > java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:86) > - waiting to lock <0x00000000d8d50fa8> (a java.lang.Object) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian Jira (v8.3.4#803005)