[ 
https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250917#comment-17250917
 ] 

zlzhang0122 edited comment on FLINK-20618 at 12/17/20, 9:07 AM:
----------------------------------------------------------------

Sure, [~pnowojski] .

1. This is a sql job and the logic is very simple. It consumes data from a data 
source(a message queue like kafka) and do some conversion, then write the 
result to elasticsearch(I change es to console for test, but the problem 
continues).

The main sql is like this:
{code:java}
create table transfer_c5 (XXX);

create table esSink(XXX);

insert into esSink
select hash,step,isCustomize,hostname,
      
endpoint,metric,dsType,orgId,idc,Counter(metric,labels),ToTimestamp(`timestamp`*1000),ConvertMapToArray(labels),pdl
from (
   select * , ROW_NUMBER() OVER (partition by hash,step,dsType,idc, 
(`timestamp` - mod((`timestamp` + 18000), 86400))
   order by proctime()) as `rank` from transfer_c5
) as ranked
where ranked.`rank` = 1;
{code}
The StreamGraph lis like this:

!2020-12-17 16-45-00 的屏幕截图.png!

2. Yes, this subtasks 61 of the source task is the only one that is stuck, and 
all of the other tasks/subtaks able to make a progress except checkpoint, the 
downstream tasks/subtasks run well too, only one downstream subtasks which 
directly connect with the abnormal upstream subtasks cannot consume the 
particular resultsubparttion but it can consume other resultsubpartition 
normal. No any other tasks/subtasks has been affected include the downstream 
subtask in the same slot and they run well.

3.I have checked all the logs, stdout, stderr from all of the TM and JM, but 
found nothing useful.


was (Author: zlzhang0122):
Sure, [~pnowojski] .

1. This is a sql job and the logic is very simple. It consumes data from a data 
source(a message queue like kafka) and do some conversion, then write the 
result to elasticsearch(I change es to console for test, but the problem 
continues).

The main sql is like this:
{code:java}
create table transfe (XXX);

create table esS(XXX);

insert into esSink
select hash,step,isCustomize,hostname,
 
endpoint,metric,dsType,orgId,idc,Counter(metric,labels),ToTimestamp(`timestamp`*1000),ConvertMapToArray(labels),pdl
from (
 select * , ROW_NUMBER() OVER (partition by hash,step,dsType,idc, (`timestamp` 
- mod((`timestamp` + 18000), 86400))
 order by proctime()) as `rank` from transfer_c5
) as ranked
where ranked.`rank` = 1;
{code}
The StreamGraph lis like this:

!2020-12-17 16-45-00 的屏幕截图.png!

2. Yes, this subtasks 61 of the source task is the only one that is stuck, and 
all of the other tasks/subtaks able to make a progress except checkpoint, the 
downstream tasks/subtasks run well too, only one downstream subtasks which 
directly connect with the abnormal upstream subtasks cannot consume the 
particular resultsubparttion but it can consume other resultsubpartition 
normal. No any other tasks/subtasks has been affected include the downstream 
subtask in the same slot and they run well.

3.I have checked all the logs, stdout, stderr from all of the TM and JM, but 
found nothing useful.

> Some of the source operator subtasks will stuck when flink job in critical 
> backpressure
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-20618
>                 URL: https://issues.apache.org/jira/browse/FLINK-20618
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.11.0, 1.10.2
>            Reporter: zlzhang0122
>            Priority: Critical
>         Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 
> 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, 
> 2020-12-17 11-10-06 的屏幕截图.png, 2020-12-17 16-45-00 的屏幕截图.png
>
>
> In some critical backpressure situation, some of the subtasks of source will 
> blocked to request buffer because of the LocalBufferPool is full,so the whole 
> task will be stuck and the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) 
> -> SourceConversion(table=[default_catalog.default_database.transfer_c5, 
> source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, 
> timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, 
> labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, 
> hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, 
> (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 
> os_prio=0 tid=0x00007f43d07e1800 nid=0x1b1c waiting on condition 
> [0x00007f43b8488000]
> java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for <0x00000000db234488> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
>     at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
>     at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
>     at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
>     at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
>     at StreamExecCalc$33.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
>     at SourceConversion$4.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
>     at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>     - locked <0x00000000d8d50fa8> (a java.lang.Object)
>     at 
> org.apache.flink.streaming.connectors.talos.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:379)
>     - locked <0x00000000d8d50fa8> (a java.lang.Object)
>     at 
> org.apache.flink.streaming.connectors.talos.internals.TalosFetcher2.runFetchLoop(TalosFetcher2.java:249)
>     at 
> org.apache.flink.streaming.connectors.talos.FlinkTalosConsumerBase.run(FlinkTalosConsumerBase.java:758)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>  
>  
>  
> Source: TalosTableSource(hash, timestamp, step, isCustomize, hostname, 
> endpoint, metric, dsType, orgId, idc, labels, pdl) -> 
> SourceConversion(table=[default_catalog.default_database.transfer_c5, source: 
> [TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, 
> metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, timestamp, step, 
> isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl]) 
> -> Calc(select=[hash, timestamp, step, isCustomize, hostname, endpoint, 
> metric, dsType, orgId, idc, labels, pdl, () AS $12, (timestamp - ((timestamp 
> + 18000) MOD 86400)) AS $13]) (62/128) #108 prio=5 os_prio=0 
> tid=0x00007f44dc178000 nid=0x1332 waiting for monitor entry 
> [0x00007f443dfd8000]
> java.lang.Thread.State: BLOCKED (on object monitor)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:86)
>     - waiting to lock <0x00000000d8d50fa8> (a java.lang.Object)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>     at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to