????????????rocketMq??????????flink1.11.2

??????rocketmq source????????????????????????????????????????????????


org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
cn.shltkj.utils.RocketMqSourceFunction.lambda$run$0(RocketMqSourceFunction.java:49)
 ~[classes/:?]
        at 
com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl$MessageListenerImpl.consumeMessage(ConsumerImpl.java:110)
 [ons-client-1.8.8.Final.jar:1.8.8.Final]
        at 
com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:710)
 [ons-client-1.8.8.Final.jar:1.8.8.Final]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_271]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_271]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_271]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_271]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_271]
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        ... 14 more
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:97)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        ... 14 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
        at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:339)
 ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:309)
 ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilder(LocalBufferPool.java:256)
 ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.io.network.partition.ResultPartition.tryGetBufferBuilder(ResultPartition.java:218)
 ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:291)
 ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
 ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95)
 ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:135)
 ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
 ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
 ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:97)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        ... 14 more


????????????????????????????null??????????????????????????


??????????????source??????socket??????????????????????????????watermark????????


????????????????????????

回复