[ https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295193#comment-16295193 ]
Xingcan Cui commented on FLINK-8282: ------------------------------------ Hi [~twalthr], thanks for this ticket. I just wonder whether we could create an anonymous {{TwoInputStreamOperator}} directly since the Javadoc declares that to create a custom operator, we should use {{AbstractStreamOperator}} as a base class. > Transformation with TwoInputStreamOperator fails > ------------------------------------------------ > > Key: FLINK-8282 > URL: https://issues.apache.org/jira/browse/FLINK-8282 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.4.0 > Reporter: Timo Walther > > The following program fails because of multiple reasons (see exceptions > below). The transformation with a {{TwoInputStreamOperator}} does not extend > {{AbstractStreamOperator}}. I think this is the main cause why it fails. > Either we fix the exceptions or we check for {{AbstractStreamOperator}} first. > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream<Integer> ds1 = env.addSource(new > SourceFunction<Integer>() { > @Override > public void run(SourceContext<Integer> ctx) throws > Exception { > ctx.emitWatermark(new Watermark(100L)); > ctx.collect(12); > while (true) Thread.yield(); > } > @Override > public void cancel() { > } > }); > DataStream<Integer> ds2 = env.addSource(new > SourceFunction<Integer>() { > @Override > public void run(SourceContext<Integer> ctx) throws > Exception { > ctx.emitWatermark(new Watermark(200L)); > ctx.collect(12); > while (true) Thread.yield(); > } > @Override > public void cancel() { > } > }); > ds1.connect(ds2.broadcast()).transform("test", Types.INT, new > TwoInputStreamOperator<Integer, Integer, Integer>() { > @Override > public void processElement1(StreamRecord<Integer> > element) throws Exception { > System.out.println(); > } > @Override > public void processElement2(StreamRecord<Integer> > element) throws Exception { > System.out.println(); > } > @Override > public void processWatermark1(Watermark mark) throws > Exception { > System.out.println(); > } > @Override > public void processWatermark2(Watermark mark) throws > Exception { > System.out.println(); > } > @Override > public void processLatencyMarker1(LatencyMarker > latencyMarker) throws Exception { > } > @Override > public void processLatencyMarker2(LatencyMarker > latencyMarker) throws Exception { > } > @Override > public void setup(StreamTask<?, ?> containingTask, > StreamConfig config, Output<StreamRecord<Integer>> output) { > } > @Override > public void open() throws Exception { > } > @Override > public void close() throws Exception { > } > @Override > public void dispose() throws Exception { > } > @Override > public OperatorSnapshotResult snapshotState(long > checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws > Exception { > return null; > } > @Override > public void initializeState(OperatorSubtaskState > stateHandles) throws Exception { > } > @Override > public void notifyOfCompletedCheckpoint(long > checkpointId) throws Exception { > } > @Override > public void setKeyContextElement1(StreamRecord<?> > record) throws Exception { > } > @Override > public void setKeyContextElement2(StreamRecord<?> > record) throws Exception { > } > @Override > public ChainingStrategy getChainingStrategy() { > return null; > } > @Override > public void setChainingStrategy(ChainingStrategy > strategy) { > } > @Override > public MetricGroup getMetricGroup() { > return null; > } > @Override > public OperatorID getOperatorID() { > return null; > } > }).print(); > // execute program > env.execute("Streaming WordCount"); > {code} > Exceptions are either: > {code} > 16:27:51,177 WARN > org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error > while emitting latency marker. > java.lang.RuntimeException: Buffer pool is destroyed. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:825) > at > org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138) > ... 10 more > {code} > or > {code} > 16:33:53,826 WARN > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor - An exception > occurred during the metrics setup. > java.lang.NullPointerException > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:211) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:278) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:724) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)