[
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16363732#comment-16363732
]
ASF GitHub Bot commented on FLINK-8479:
---------------------------------------
GitHub user florianschmidt1994 opened a pull request:
https://github.com/apache/flink/pull/5482
[Flink-8480][DataStream] Add Java API for timebounded stream join
## What is the purpose of the change
* Add a JavaAPI to the DataStream API to join two streams based on
user-defined time boundaries
* Design doc can be found here
https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6
## Brief change log
* Add option`.between(Time, Time)` to streams that are already joined and
have their key selectors `where` and `equalTo` defined
* Add new inner class `TimeBounded` to `JoinedStreams`, which exposes
`process(TimeBoundedJoinFunction)` as well as optional
`upperBoundExclusive(boolean)` and `lowerBoundExclusive(boolean)` to the user
* Add new integration test `TimeboundedJoinITCase`
* **Depends on [FLINK-8479] to be merged**
Full example usage:
```java
streamOne
.join(streamTwo)
.where(new MyKeySelector())
.equalTo(new MyKeySelector())
.between(Time.milliseconds(-1), Time.milliseconds(1))
.process(new UdfTimeBoundedJoinFunction())
.addSink(new ResultSink());
```
## Verifying this change
This change added tests and can be verified as follows:
- Added integration tests in `TimeboundedJoinITCase` that validate
parameter translation and execution
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: yes
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes
- Anything that affects deployment or recovery: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? JavaDocs
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/florianschmidt1994/flink
flink-8480-timebounded-join-java-api
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5482.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5482
----
commit 34451540116d8bdd284fd01016a4cc74d8564d37
Author: Florian Schmidt <florian.schmidt.1994@...>
Date: 2018-01-18T14:47:14Z
[FLINK-8479] Implement TimeBoundedStreamJoinOperator
This operator is the basis for performing an inner join on two
streams using a time criteria defined as a lower and upper bound
commit fe65b1ead0511b0df5d640c728f5ce9e273d7ed5
Author: Florian Schmidt <florian.schmidt.1994@...>
Date: 2018-02-13T14:48:40Z
[FLINK-8480][DataStream] Add java api for timebounded stream joins
This commit adds a java implementation for timebounded stream joins.
The usage looks roughly like the following:
```java
streamOne
.join(streamTwo)
.where(new Tuple2KeyExtractor())
.equalTo(new Tuple2KeyExtractor())
.between(Time.milliseconds(0), Time.milliseconds(1))
.process(new CombineToStringJoinFunction())
.addSink(new ResultSink());
```
This change adds the functionality in JoinedStreams.java and adds
integration tests in TimeboundedJoinITCase.java
----
> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> ------------------------------------------------------------------------
>
> Key: FLINK-8479
> URL: https://issues.apache.org/jira/browse/FLINK-8479
> Project: Flink
> Issue Type: Sub-task
> Reporter: Florian Schmidt
> Assignee: Florian Schmidt
> Priority: Major
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)