[ 
https://issues.apache.org/jira/browse/FLINK-23263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388777#comment-17388777
 ] 

Piotr Nowojski commented on FLINK-23263:
----------------------------------------

{quote}
inner join task backpressure status is high, and inner join task outputbuffer 
is filled.
{quote}
[~ana4] This is perfectly normal on it's own. You are not giving us any more 
information. Source task is backpressured by the downstream task. This 
downstream task (inner join?) is also backpressured by someone further down the 
stream. Perfectly normal so far. Please read about the backpressure, and how 
does it work. You need to find the reason of your backpressure.
{quote}
a) This is most likely a normal symptom of a backpressure:
https://www.ververica.com/blog/how-flink-handles-backpressure
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/back_pressure/
https://flink.apache.org/2021/07/07/backpressure.html

and it's not a bug. Please read the above materials, identify the source of the 
backpressure, analyse it and fix it accordingly.
{quote}
Further more, I'm asking again:
{quote}
Please post your job graph (...) and please check the backpressure status of 
all tasks.
{quote}
And lastly, by an off chance that there is something wrong, I've asked you to 
reproduce the problem on a more recent release. 
{quote}
There might have been a bug that was fixed in a later version. If you still can 
reproduce the problem after upgrading:
{quote}

> LocalBufferPool can not request memory.
> ---------------------------------------
>
>                 Key: FLINK-23263
>                 URL: https://issues.apache.org/jira/browse/FLINK-23263
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.10.1
>            Reporter: Ada Wong
>            Priority: Major
>
> Flink job is running, bug it can not consume kafka data.
> This following is exception.
> "Source: test_records (2/3)" #78 prio=5 os_prio=0 tid=0x00007fd4c4a24800 
> nid=0x21bf in Object.wait() [0x00007fd4d581a000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>       at java.lang.Object.wait(Native Method)
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:251)
>       - locked <0x000000074d8b0df0> (a java.util.ArrayDeque)
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:264)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:192)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>       - locked <0x000000074cbd3be0> (a java.lang.Object)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>       - locked <0x000000074cbd3be0> (a java.lang.Object)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711)
>       at 
> com.dtstack.flink.sql.source.kafka.KafkaConsumer011.run(KafkaConsumer011.java:66)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748)
> "Map -> to: Tuple2 -> Map -> (from: (id, sid, item, val, unit, dt,
> after_index, tablename, PROCTIME) -> where: (AND(=(tablename,
> CONCAT(_UTF-16LE't_real', currtime2(dt, _UTF-16LE'yyyyMMdd'))),
> OR(=(after_index, _UTF-16LE'2.6'), AND(=(sid, _UTF-16LE'7296'),
> =(after_index, _UTF-16LE'2.10'))), LIKE(item, _UTF-16LE'??5%'),
> =(MOD(getminutes(dt), 5), 0))), select: (id AS ID, sid AS STID, item
> AS ITEM, val AS VAL, unit AS UNIT, dt AS DATATIME), from: (id, sid,
> item, val, unit, dt, after_index, tablename, PROCTIME) -> where:
> (AND(=(tablename, CONCAT(_UTF-16LE't_real', currtime2(dt,
> _UTF-16LE'yyyyMMdd'))), OR(=(after_index, _UTF-16LE'2.6'), AND(=(sid,
> _UTF-16LE'7296'), =(after_index, _UTF-16LE'2.10'))), LIKE(item,
> _UTF-16LE'??5%'), =(MOD(getminutes(dt), 5), 0))), select: (sid AS
> STID, val AS VAL, dt AS DATATIME)) (1/1)" #79 prio=5 os_prio=0
> tid=0x00007fd4c4a94000 nid=0x21c0 in Object.wait()
> [0x00007fd4d5719000]
> java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:251)
> - locked <0x000000074e6c8b98> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:264)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:192)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
> at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
> at DataStreamCalcRule$4160.processElement(Unknown Source)
> at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
> at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
> at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:488)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:465)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:425)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
> at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
> at DataStreamSourceConversion$4104.processElement(Unknown Source)
> at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
> at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:488)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:465)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:425)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:686)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:488)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:465)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:425)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:488)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:465)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:425)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> - locked <0x000000074cb90938> (a java.lang.Object)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to