[
https://issues.apache.org/jira/browse/FLINK-24709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
XiaShengSheng resolved FLINK-24709.
-----------------------------------
Resolution: Fixed
> Fix the issue of interval join java case content in the official document case
> ------------------------------------------------------------------------------
>
> Key: FLINK-24709
> URL: https://issues.apache.org/jira/browse/FLINK-24709
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.8.0, 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0
> Reporter: XiaShengSheng
> Priority: Minor
> Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: case.png
>
> Original Estimate: 1h
> Remaining Estimate: 1h
>
> Fix the interval join java case in the official document case:
> Take the flink1.14.0 version document link as an example:
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/joining/#interval-join]
> 1、Your case is:
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> ...
> DataStream<Integer> orangeStream = ...
> DataStream<Integer> greenStream = ...
> orangeStream
> .keyBy(<KeySelector>)
> .intervalJoin(greenStream.keyBy(<KeySelector>))
> .between(Time.milliseconds(-2), Time.milliseconds(1))
> .process (new ProcessJoinFunction<Integer, Integer, String(){
> @Override
> public void processElement(Integer left, Integer right, Context ctx,
> Collector<String> out) {
> out.collect(first + "," + second);
> }
> });
> 2、After repair:
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> ...
> DataStream<Integer> orangeStream = ...
> DataStream<Integer> greenStream = ...
> orangeStream
> .keyBy(<KeySelector>)
> .intervalJoin(greenStream.keyBy(<KeySelector>))
> .between(Time.milliseconds(-2), Time.milliseconds(1))
> .process (new ProcessJoinFunction<Integer, Integer, String(){
> @Override
> public void processElement(Integer left, Integer right, Context ctx,
> Collector<String> out) {
> out.collect(left + "," + right);
> }
> });
--
This message was sent by Atlassian Jira
(v8.3.4#803005)