[ https://issues.apache.org/jira/browse/FLINK-22882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piotr Nowojski updated FLINK-22882: ----------------------------------- Description: On a cluster I observed symptoms of tasks being blocked for long time, causing long delays with unaligned checkpointing. 1% of those cases were caused by emitting records: {noformat} 2021-06-04 14:40:24,115 ERROR org.apache.flink.runtime.io.network.buffer.LocalBufferPool [] - Blocking wait [16497 ms] for an available buffer. java.lang.Exception: Stracktracegenerator at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:323) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:290) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:338) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:314) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:258) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:147) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:82) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:182) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.api.connector.source.lib.util.IteratorSourceReader.pollNext(IteratorSourceReader.java:98) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:422) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:680) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:635) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:646) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:619) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] {noformat} *All of those cases that I have seen were always coming from SourceOperator* was: On a cluster I observed symptoms of tasks being blocked for long time, causing long delays with unaligned checkpointing. 1% of those cases were caused by emitting records: {noformat} 2021-06-04 14:40:24,115 ERROR org.apache.flink.runtime.io.network.buffer.LocalBufferPool [] - Blocking wait [16497 ms] for an available buffer. java.lang.Exception: Stracktracegenerator at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:323) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:290) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:338) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:314) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:258) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:147) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:82) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:182) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.api.connector.source.lib.util.IteratorSourceReader.pollNext(IteratorSourceReader.java:98) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:422) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:680) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:635) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:646) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:619) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] {noformat} > Tasks are blocked while emitting records > ---------------------------------------- > > Key: FLINK-22882 > URL: https://issues.apache.org/jira/browse/FLINK-22882 > Project: Flink > Issue Type: Bug > Components: Runtime / Network, Runtime / Task > Affects Versions: 1.14.0 > Reporter: Piotr Nowojski > Priority: Critical > > On a cluster I observed symptoms of tasks being blocked for long time, > causing long delays with unaligned checkpointing. 1% of those cases were > caused by emitting records: > {noformat} > 2021-06-04 14:40:24,115 ERROR > org.apache.flink.runtime.io.network.buffer.LocalBufferPool [] - Blocking > wait [16497 ms] for an available buffer. > java.lang.Exception: Stracktracegenerator > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:323) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:290) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:338) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:314) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:258) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:147) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:82) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:182) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.api.connector.source.lib.util.IteratorSourceReader.pollNext(IteratorSourceReader.java:98) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:422) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:680) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:635) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:646) > [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:619) > [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] > {noformat} > *All of those cases that I have seen were always coming from SourceOperator* -- This message was sent by Atlassian Jira (v8.3.4#803005)