[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure
[ https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-20618: --- Affects Version/s: (was: 1.10.2) (was: 1.11.0) 1.10.0 1.11.1 > Some of the source operator subtasks will stuck when flink job in critical > backpressure > --- > > Key: FLINK-20618 > URL: https://issues.apache.org/jira/browse/FLINK-20618 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.10.0, 1.11.1 >Reporter: zlzhang0122 >Priority: Critical > Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 > 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, > 2020-12-17 11-10-06 的屏幕截图.png, 2020-12-17 16-45-00 的屏幕截图.png, stuck_node.txt, > stuck_node_downstream.txt > > > In some critical backpressure situation, some of the subtasks of source will > blocked to request buffer because of the LocalBufferPool is full,so the whole > task will be stuck and the other task run well. > Bellow is the jstack trace: > > Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, > isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) > -> SourceConversion(table=[default_catalog.default_database.transfer_c5, > source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, > endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, > timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, > labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, > hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, > (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 > os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition > [0x7f43b8488000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xdb234488> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at StreamExecCalc$33.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at SourceConversion$4.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.
[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure
[ https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zlzhang0122 updated FLINK-20618: Attachment: stuck_node_downstream.txt > Some of the source operator subtasks will stuck when flink job in critical > backpressure > --- > > Key: FLINK-20618 > URL: https://issues.apache.org/jira/browse/FLINK-20618 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.11.0, 1.10.2 >Reporter: zlzhang0122 >Priority: Critical > Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 > 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, > 2020-12-17 11-10-06 的屏幕截图.png, 2020-12-17 16-45-00 的屏幕截图.png, stuck_node.txt, > stuck_node_downstream.txt > > > In some critical backpressure situation, some of the subtasks of source will > blocked to request buffer because of the LocalBufferPool is full,so the whole > task will be stuck and the other task run well. > Bellow is the jstack trace: > > Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, > isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) > -> SourceConversion(table=[default_catalog.default_database.transfer_c5, > source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, > endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, > timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, > labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, > hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, > (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 > os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition > [0x7f43b8488000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xdb234488> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at StreamExecCalc$33.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at SourceConversion$4.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.fl
[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure
[ https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zlzhang0122 updated FLINK-20618: Attachment: stuck_node.txt > Some of the source operator subtasks will stuck when flink job in critical > backpressure > --- > > Key: FLINK-20618 > URL: https://issues.apache.org/jira/browse/FLINK-20618 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.11.0, 1.10.2 >Reporter: zlzhang0122 >Priority: Critical > Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 > 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, > 2020-12-17 11-10-06 的屏幕截图.png, 2020-12-17 16-45-00 的屏幕截图.png, stuck_node.txt, > stuck_node_downstream.txt > > > In some critical backpressure situation, some of the subtasks of source will > blocked to request buffer because of the LocalBufferPool is full,so the whole > task will be stuck and the other task run well. > Bellow is the jstack trace: > > Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, > isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) > -> SourceConversion(table=[default_catalog.default_database.transfer_c5, > source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, > endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, > timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, > labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, > hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, > (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 > os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition > [0x7f43b8488000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xdb234488> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at StreamExecCalc$33.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at SourceConversion$4.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streami
[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure
[ https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zlzhang0122 updated FLINK-20618: Attachment: 2020-12-17 16-45-00 的屏幕截图.png > Some of the source operator subtasks will stuck when flink job in critical > backpressure > --- > > Key: FLINK-20618 > URL: https://issues.apache.org/jira/browse/FLINK-20618 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.11.0, 1.10.2 >Reporter: zlzhang0122 >Priority: Critical > Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 > 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, > 2020-12-17 11-10-06 的屏幕截图.png, 2020-12-17 16-45-00 的屏幕截图.png > > > In some critical backpressure situation, some of the subtasks of source will > blocked to request buffer because of the LocalBufferPool is full,so the whole > task will be stuck and the other task run well. > Bellow is the jstack trace: > > Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, > isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) > -> SourceConversion(table=[default_catalog.default_database.transfer_c5, > source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, > endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, > timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, > labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, > hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, > (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 > os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition > [0x7f43b8488000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xdb234488> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at StreamExecCalc$33.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at SourceConversion$4.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$
[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure
[ https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zlzhang0122 updated FLINK-20618: Attachment: (was: 2020-12-17 10-51-42 的屏幕截图.png) > Some of the source operator subtasks will stuck when flink job in critical > backpressure > --- > > Key: FLINK-20618 > URL: https://issues.apache.org/jira/browse/FLINK-20618 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.11.0, 1.10.2 >Reporter: zlzhang0122 >Priority: Critical > Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 > 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, > 2020-12-17 11-10-06 的屏幕截图.png > > > In some critical backpressure situation, some of the subtasks of source will > blocked to request buffer because of the LocalBufferPool is full,so the whole > task will be stuck and the other task run well. > Bellow is the jstack trace: > > Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, > isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) > -> SourceConversion(table=[default_catalog.default_database.transfer_c5, > source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, > endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, > timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, > labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, > hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, > (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 > os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition > [0x7f43b8488000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xdb234488> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at StreamExecCalc$33.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at SourceConversion$4.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutpu
[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure
[ https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zlzhang0122 updated FLINK-20618: Attachment: 2020-12-17 11-10-06 的屏幕截图.png > Some of the source operator subtasks will stuck when flink job in critical > backpressure > --- > > Key: FLINK-20618 > URL: https://issues.apache.org/jira/browse/FLINK-20618 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.11.0, 1.10.2 >Reporter: zlzhang0122 >Priority: Critical > Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 > 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, > 2020-12-17 11-10-06 的屏幕截图.png > > > In some critical backpressure situation, some of the subtasks of source will > blocked to request buffer because of the LocalBufferPool is full,so the whole > task will be stuck and the other task run well. > Bellow is the jstack trace: > > Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, > isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) > -> SourceConversion(table=[default_catalog.default_database.transfer_c5, > source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, > endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, > timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, > labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, > hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, > (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 > os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition > [0x7f43b8488000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xdb234488> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at StreamExecCalc$33.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at SourceConversion$4.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(O
[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure
[ https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zlzhang0122 updated FLINK-20618: Attachment: 2020-12-17 10-51-42 的屏幕截图.png > Some of the source operator subtasks will stuck when flink job in critical > backpressure > --- > > Key: FLINK-20618 > URL: https://issues.apache.org/jira/browse/FLINK-20618 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.11.0, 1.10.2 >Reporter: zlzhang0122 >Priority: Critical > Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 > 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, > 2020-12-17 10-51-42 的屏幕截图.png > > > In some critical backpressure situation, some of the subtasks of source will > blocked to request buffer because of the LocalBufferPool is full,so the whole > task will be stuck and the other task run well. > Bellow is the jstack trace: > > Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, > isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) > -> SourceConversion(table=[default_catalog.default_database.transfer_c5, > source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, > endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, > timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, > labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, > hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, > (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 > os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition > [0x7f43b8488000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xdb234488> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at StreamExecCalc$33.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at SourceConversion$4.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(O
[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure
[ https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-20618: - Component/s: (was: API / Core) Runtime / Network > Some of the source operator subtasks will stuck when flink job in critical > backpressure > --- > > Key: FLINK-20618 > URL: https://issues.apache.org/jira/browse/FLINK-20618 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.11.0, 1.10.2 >Reporter: zlzhang0122 >Priority: Critical > Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 > 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png > > > In some critical backpressure situation, some of the subtasks of source will > blocked to request buffer because of the LocalBufferPool is full,so the whole > task will be stuck and the other task run well. > Bellow is the jstack trace: > > Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, > isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) > -> SourceConversion(table=[default_catalog.default_database.transfer_c5, > source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, > endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, > timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, > labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, > hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, > (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 > os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition > [0x7f43b8488000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xdb234488> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at StreamExecCalc$33.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at SourceConversion$4.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.colle
[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure
[ https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zlzhang0122 updated FLINK-20618: Attachment: 2020-12-16 11-53-42 的屏幕截图.png 2020-12-16 11-49-01 的屏幕截图.png > Some of the source operator subtasks will stuck when flink job in critical > backpressure > --- > > Key: FLINK-20618 > URL: https://issues.apache.org/jira/browse/FLINK-20618 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.11.0, 1.10.2 >Reporter: zlzhang0122 >Priority: Critical > Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 > 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png > > > In some critical backpressure situation, some of the subtasks of source will > blocked to request buffer because of the LocalBufferPool is full,so the whole > task will be stuck and the other task run well. > Bellow is the jstack trace: > > Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, > isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) > -> SourceConversion(table=[default_catalog.default_database.transfer_c5, > source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, > endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, > timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, > labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, > hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, > (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 > os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition > [0x7f43b8488000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xdb234488> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at StreamExecCalc$33.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at SourceConversion$4.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.coll
[jira] [Updated] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure
[ https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zlzhang0122 updated FLINK-20618: Attachment: 2020-12-16 11-48-30 的屏幕截图.png 2020-12-16 11-47-37 的屏幕截图.png > Some of the source operator subtasks will stuck when flink job in critical > backpressure > --- > > Key: FLINK-20618 > URL: https://issues.apache.org/jira/browse/FLINK-20618 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.11.0, 1.10.2 >Reporter: zlzhang0122 >Priority: Critical > Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 > 的屏幕截图.png > > > In some critical backpressure situation, some of the subtasks of source will > blocked to request buffer because of the LocalBufferPool is full,so the whole > task will be stuck and the other task run well. > Bellow is the jstack trace: > > Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, > isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) > -> SourceConversion(table=[default_catalog.default_database.transfer_c5, > source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, > endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, > timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, > labels, pdl]) -> Calc(select=[hash, timestamp, step, isCustomize, > hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, > (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 > os_prio=0 tid=0x7f43d07e1800 nid=0x1b1c waiting on condition > [0x7f43b8488000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xdb234488> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at StreamExecCalc$33.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710) > at SourceConversion$4.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.strea