[ https://issues.apache.org/jira/browse/FLINK-23263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388777#comment-17388777 ]
Piotr Nowojski commented on FLINK-23263: ---------------------------------------- {quote} inner join task backpressure status is high, and inner join task outputbuffer is filled. {quote} [~ana4] This is perfectly normal on it's own. You are not giving us any more information. Source task is backpressured by the downstream task. This downstream task (inner join?) is also backpressured by someone further down the stream. Perfectly normal so far. Please read about the backpressure, and how does it work. You need to find the reason of your backpressure. {quote} a) This is most likely a normal symptom of a backpressure: https://www.ververica.com/blog/how-flink-handles-backpressure https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/back_pressure/ https://flink.apache.org/2021/07/07/backpressure.html and it's not a bug. Please read the above materials, identify the source of the backpressure, analyse it and fix it accordingly. {quote} Further more, I'm asking again: {quote} Please post your job graph (...) and please check the backpressure status of all tasks. {quote} And lastly, by an off chance that there is something wrong, I've asked you to reproduce the problem on a more recent release. {quote} There might have been a bug that was fixed in a later version. If you still can reproduce the problem after upgrading: {quote} > LocalBufferPool can not request memory. > --------------------------------------- > > Key: FLINK-23263 > URL: https://issues.apache.org/jira/browse/FLINK-23263 > Project: Flink > Issue Type: Bug > Components: Runtime / Network > Affects Versions: 1.10.1 > Reporter: Ada Wong > Priority: Major > > Flink job is running, bug it can not consume kafka data. > This following is exception. > "Source: test_records (2/3)" #78 prio=5 os_prio=0 tid=0x00007fd4c4a24800 > nid=0x21bf in Object.wait() [0x00007fd4d581a000] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:251) > - locked <0x000000074d8b0df0> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:264) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:192) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > - locked <0x000000074cbd3be0> (a java.lang.Object) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > - locked <0x000000074cbd3be0> (a java.lang.Object) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) > at > com.dtstack.flink.sql.source.kafka.KafkaConsumer011.run(KafkaConsumer011.java:66) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > "Map -> to: Tuple2 -> Map -> (from: (id, sid, item, val, unit, dt, > after_index, tablename, PROCTIME) -> where: (AND(=(tablename, > CONCAT(_UTF-16LE't_real', currtime2(dt, _UTF-16LE'yyyyMMdd'))), > OR(=(after_index, _UTF-16LE'2.6'), AND(=(sid, _UTF-16LE'7296'), > =(after_index, _UTF-16LE'2.10'))), LIKE(item, _UTF-16LE'??5%'), > =(MOD(getminutes(dt), 5), 0))), select: (id AS ID, sid AS STID, item > AS ITEM, val AS VAL, unit AS UNIT, dt AS DATATIME), from: (id, sid, > item, val, unit, dt, after_index, tablename, PROCTIME) -> where: > (AND(=(tablename, CONCAT(_UTF-16LE't_real', currtime2(dt, > _UTF-16LE'yyyyMMdd'))), OR(=(after_index, _UTF-16LE'2.6'), AND(=(sid, > _UTF-16LE'7296'), =(after_index, _UTF-16LE'2.10'))), LIKE(item, > _UTF-16LE'??5%'), =(MOD(getminutes(dt), 5), 0))), select: (sid AS > STID, val AS VAL, dt AS DATATIME)) (1/1)" #79 prio=5 os_prio=0 > tid=0x00007fd4c4a94000 nid=0x21c0 in Object.wait() > [0x00007fd4d5719000] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:251) > - locked <0x000000074e6c8b98> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:264) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:192) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) > at DataStreamCalcRule$4160.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:488) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:465) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:425) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) > at DataStreamSourceConversion$4104.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:488) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:465) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:425) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:686) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:488) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:465) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:425) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:488) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:465) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:425) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > - locked <0x000000074cb90938> (a java.lang.Object) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian Jira (v8.3.4#803005)