[jira] [Commented] (FLINK-8282) Transformation with TwoInputStreamOperator fails

2021-04-29 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17336680#comment-17336680
 ] 

Flink Jira Bot commented on FLINK-8282:
---

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Transformation with TwoInputStreamOperator fails
> 
>
> Key: FLINK-8282
> URL: https://issues.apache.org/jira/browse/FLINK-8282
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Priority: Major
>  Labels: stale-major
>
> The following program fails because of multiple reasons (see exceptions 
> below). The transformation with a {{TwoInputStreamOperator}} does not extend 
> {{AbstractStreamOperator}}. I think this is the main cause why it fails. 
> Either we fix the exceptions or we check for {{AbstractStreamOperator}} first.
> {code}
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>   DataStream ds1 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(100L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   DataStream ds2 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(200L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   ds1.connect(ds2.broadcast()).transform("test", Types.INT, new 
> TwoInputStreamOperator() {
>   @Override
>   public void processElement1(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processElement2(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark1(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark2(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processLatencyMarker1(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void processLatencyMarker2(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void setup(StreamTask containingTask, 
> StreamConfig config, Output> output) {
>   }
>   @Override
>   public void open() throws Exception {
>   }
>   @Override
>   public void close() throws Exception {
>   }
>   @Override
>   public void dispose() throws Exception {
>   }
>   @Override
>   public OperatorSnapshotResult snapshotState(long 
> checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws 
> Exception {
>   return null;
>   }
>   @Override
>   public void initializeState(OperatorSubtaskState 
> stateHandles) throws Exception {
>   }
>   @Override
>   public void notifyOfCompletedCheckpoint(long 
> checkpointId) throws Exception {
>   }
>   @Override
>   public void setKey

[jira] [Commented] (FLINK-8282) Transformation with TwoInputStreamOperator fails

2021-04-22 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17328688#comment-17328688
 ] 

Flink Jira Bot commented on FLINK-8282:
---

This major issue is unassigned and itself and all of its Sub-Tasks have not 
been updated for 30 days. So, it has been labeled "stale-major". If this ticket 
is indeed "major", please either assign yourself or give an update. Afterwards, 
please remove the label. In 7 days the issue will be deprioritized.

> Transformation with TwoInputStreamOperator fails
> 
>
> Key: FLINK-8282
> URL: https://issues.apache.org/jira/browse/FLINK-8282
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Priority: Major
>  Labels: stale-major
>
> The following program fails because of multiple reasons (see exceptions 
> below). The transformation with a {{TwoInputStreamOperator}} does not extend 
> {{AbstractStreamOperator}}. I think this is the main cause why it fails. 
> Either we fix the exceptions or we check for {{AbstractStreamOperator}} first.
> {code}
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>   DataStream ds1 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(100L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   DataStream ds2 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(200L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   ds1.connect(ds2.broadcast()).transform("test", Types.INT, new 
> TwoInputStreamOperator() {
>   @Override
>   public void processElement1(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processElement2(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark1(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark2(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processLatencyMarker1(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void processLatencyMarker2(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void setup(StreamTask containingTask, 
> StreamConfig config, Output> output) {
>   }
>   @Override
>   public void open() throws Exception {
>   }
>   @Override
>   public void close() throws Exception {
>   }
>   @Override
>   public void dispose() throws Exception {
>   }
>   @Override
>   public OperatorSnapshotResult snapshotState(long 
> checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws 
> Exception {
>   return null;
>   }
>   @Override
>   public void initializeState(OperatorSubtaskState 
> stateHandles) throws Exception {
>   }
>   @Override
>   public void notifyOfCompletedCheckpoint(long 
> checkpointId) throws Exception {
>   }
>   

[jira] [Commented] (FLINK-8282) Transformation with TwoInputStreamOperator fails

2018-01-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308293#comment-16308293
 ] 

Aljoscha Krettek commented on FLINK-8282:
-

In my opinion, there is no good separation of concerns between the 
{{StreamTask}} and the {{StreamOperator}}. {{AbstractStreamOperator}} does a 
lot of things that a stream operator shouldn't do and does things in a very 
specific way that {{StreamTask}} and other components expect to be done in this 
way and things go haywire if the operator doesn't behave that way.

Off the top of my head this includes everything that happens in {{setup()}}, 
i.e. metrics setup and configuration stuff, everything that happens in the 
various state initialisation methods and snapshot/restore methods, and the 
latency marker stuff.

There is actually this (somewhat old) issue: FLINK-4859.

> Transformation with TwoInputStreamOperator fails
> 
>
> Key: FLINK-8282
> URL: https://issues.apache.org/jira/browse/FLINK-8282
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>
> The following program fails because of multiple reasons (see exceptions 
> below). The transformation with a {{TwoInputStreamOperator}} does not extend 
> {{AbstractStreamOperator}}. I think this is the main cause why it fails. 
> Either we fix the exceptions or we check for {{AbstractStreamOperator}} first.
> {code}
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>   DataStream ds1 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(100L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   DataStream ds2 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(200L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   ds1.connect(ds2.broadcast()).transform("test", Types.INT, new 
> TwoInputStreamOperator() {
>   @Override
>   public void processElement1(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processElement2(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark1(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark2(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processLatencyMarker1(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void processLatencyMarker2(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void setup(StreamTask containingTask, 
> StreamConfig config, Output> output) {
>   }
>   @Override
>   public void open() throws Exception {
>   }
>   @Override
>   public void close() throws Exception {
>   }
>   @Override
>   public void dispose() throws Exception {
>   }
>   @Override
>   public OperatorSnapshotResult snapshotState(long 
> checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws 
> Exception {
>   return null;
>   }
>   @Override
>   

[jira] [Commented] (FLINK-8282) Transformation with TwoInputStreamOperator fails

2017-12-18 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295207#comment-16295207
 ] 

Timo Walther commented on FLINK-8282:
-

I don't know if this is needed. Actually, the {{transform()}} method is rather 
internal. [~aljoscha] what is your opinion here?

> Transformation with TwoInputStreamOperator fails
> 
>
> Key: FLINK-8282
> URL: https://issues.apache.org/jira/browse/FLINK-8282
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>
> The following program fails because of multiple reasons (see exceptions 
> below). The transformation with a {{TwoInputStreamOperator}} does not extend 
> {{AbstractStreamOperator}}. I think this is the main cause why it fails. 
> Either we fix the exceptions or we check for {{AbstractStreamOperator}} first.
> {code}
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>   DataStream ds1 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(100L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   DataStream ds2 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(200L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   ds1.connect(ds2.broadcast()).transform("test", Types.INT, new 
> TwoInputStreamOperator() {
>   @Override
>   public void processElement1(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processElement2(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark1(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark2(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processLatencyMarker1(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void processLatencyMarker2(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void setup(StreamTask containingTask, 
> StreamConfig config, Output> output) {
>   }
>   @Override
>   public void open() throws Exception {
>   }
>   @Override
>   public void close() throws Exception {
>   }
>   @Override
>   public void dispose() throws Exception {
>   }
>   @Override
>   public OperatorSnapshotResult snapshotState(long 
> checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws 
> Exception {
>   return null;
>   }
>   @Override
>   public void initializeState(OperatorSubtaskState 
> stateHandles) throws Exception {
>   }
>   @Override
>   public void notifyOfCompletedCheckpoint(long 
> checkpointId) throws Exception {
>   }
>   @Override
>   public void setKeyContextElement1(StreamRecord 
> record) throws Exception {
>   }
>   @Override
>   public void setKeyContextElement2(StreamRecord

[jira] [Commented] (FLINK-8282) Transformation with TwoInputStreamOperator fails

2017-12-18 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295193#comment-16295193
 ] 

Xingcan Cui commented on FLINK-8282:


Hi [~twalthr], thanks for this ticket. I just wonder whether we could create an 
anonymous {{TwoInputStreamOperator}} directly since the Javadoc declares that 
to create a custom operator, we should use {{AbstractStreamOperator}} as a base 
class.

> Transformation with TwoInputStreamOperator fails
> 
>
> Key: FLINK-8282
> URL: https://issues.apache.org/jira/browse/FLINK-8282
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>
> The following program fails because of multiple reasons (see exceptions 
> below). The transformation with a {{TwoInputStreamOperator}} does not extend 
> {{AbstractStreamOperator}}. I think this is the main cause why it fails. 
> Either we fix the exceptions or we check for {{AbstractStreamOperator}} first.
> {code}
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>   DataStream ds1 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(100L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   DataStream ds2 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(200L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   ds1.connect(ds2.broadcast()).transform("test", Types.INT, new 
> TwoInputStreamOperator() {
>   @Override
>   public void processElement1(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processElement2(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark1(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark2(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processLatencyMarker1(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void processLatencyMarker2(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void setup(StreamTask containingTask, 
> StreamConfig config, Output> output) {
>   }
>   @Override
>   public void open() throws Exception {
>   }
>   @Override
>   public void close() throws Exception {
>   }
>   @Override
>   public void dispose() throws Exception {
>   }
>   @Override
>   public OperatorSnapshotResult snapshotState(long 
> checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws 
> Exception {
>   return null;
>   }
>   @Override
>   public void initializeState(OperatorSubtaskState 
> stateHandles) throws Exception {
>   }
>   @Override
>   public void notifyOfCompletedCheckpoint(long 
> checkpointId) throws Exception {
>   }
>   @Override
>   public void setKeyContextElement1(StreamRecord 
> record) throws Exception {
>