[ 
https://issues.apache.org/jira/browse/FLINK-6423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-6423:
----------------------------------
    Labels: stale-minor  (was: )

> JoinFunction can not be replaced with lambda
> --------------------------------------------
>
>                 Key: FLINK-6423
>                 URL: https://issues.apache.org/jira/browse/FLINK-6423
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.2.0
>            Reporter: Moshe Sayag
>            Priority: Minor
>              Labels: stale-minor
>
> JoinedStreams.WithWindow.apply(...) can get a JoinFunction as a parameter, 
> but not an equivalent lambda.
> Consider the following code, runWithFunction completes successfully, while 
> runWithLambda throws "java.lang.ArrayIndexOutOfBoundsException: -1"
> As this might look like a very minor issue, the exception is not clear and 
> might cause a developer to spend precious time while looking for the cause
> {code}
>     public void runWithFunction() throws Exception {
>         StreamExecutionEnvironment env = 
> LocalStreamEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>         DataStream<String> stream1 = env.fromElements("a", "b", "c");
>         DataStream<String> stream2 = env.fromElements("A", "B", "C");
>         DataStream<String> joined = stream1.join(stream2)
>                 .where(String::toLowerCase).equalTo(String::toLowerCase)
>                 .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)))
>                 .apply(new JoinFunction<String, String, String>() {
>                     @Override
>                     public String join(String s1, String s2) {
>                         return s1 + "_" + s2;
>                     }
>                 });
>         joined.print();
>         env.execute();
>     }
>     public void runWithLambda() throws Exception {
>         StreamExecutionEnvironment env = 
> LocalStreamEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>         DataStream<String> stream1 = env.fromElements("a", "b", "c");
>         DataStream<String> stream2 = env.fromElements("A", "B", "C");
>         DataStream<String> joined = stream1.join(stream2)
>                 .where(String::toLowerCase).equalTo(String::toLowerCase)
>                 .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)))
>                 .apply((JoinFunction<String, String, String>) (s1, s2) -> s1 
> + "_" + s2);
>         joined.print();
>         env.execute();
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to