[
https://issues.apache.org/jira/browse/FLINK-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561226#comment-16561226
]
ASF GitHub Bot commented on FLINK-9994:
---
asfgit closed pull request #6449: [FLINK-9994][DataStream API] IntervalJoinOp
Context#getTimestamp() returns max timestamp.
URL: https://github.com/apache/flink/pull/6449
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
index 0c449e64f41..43085cb42c4 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -152,6 +152,7 @@ public IntervalJoinOperator(
@Override
public void open() throws Exception {
super.open();
+
collector = new TimestampedCollector<>(output);
context = new ContextImpl(userFunction);
internalTimerService =
@@ -204,15 +205,15 @@ public void processElement2(StreamRecord record)
throws Exception {
}
@SuppressWarnings("unchecked")
- private void processElement(
- StreamRecord record,
- MapState>> ourBuffer,
- MapState>> otherBuffer,
- long relativeLowerBound,
- long relativeUpperBound,
- boolean isLeft) throws Exception {
-
- final OUR ourValue = record.getValue();
+ private void processElement(
+ final StreamRecord record,
+ final MapState>> ourBuffer,
+ final MapState>> otherBuffer,
+ final long relativeLowerBound,
+ final long relativeUpperBound,
+ final boolean isLeft) throws Exception {
+
+ final THIS ourValue = record.getValue();
final long ourTimestamp = record.getTimestamp();
if (ourTimestamp == Long.MIN_VALUE) {
@@ -257,14 +258,18 @@ private boolean isLate(long timestamp) {
}
private void collect(T1 left, T2 right, long leftTimestamp, long
rightTimestamp) throws Exception {
- long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
+ final long resultTimestamp = Math.max(leftTimestamp,
rightTimestamp);
+
collector.setAbsoluteTimestamp(resultTimestamp);
- context.leftTimestamp = leftTimestamp;
- context.rightTimestamp = rightTimestamp;
+ context.updateTimestamps(leftTimestamp, rightTimestamp,
resultTimestamp);
+
userFunction.processElement(left, right, context, collector);
}
- private void addToBuffer(MapState>>
buffer, T value, long timestamp) throws Exception {
+ private static void addToBuffer(
+ final MapState>> buffer,
+ final T value,
+ final long timestamp) throws Exception {
List> elemsInBucket = buffer.get(timestamp);
if (elemsInBucket == null) {
elemsInBucket = new ArrayList<>();
@@ -313,6 +318,8 @@ public void onProcessingTime(InternalTimer
timer) throws Exception {
*/
private final class ContextImpl extends ProcessJoinFunction.Context {
+ private long resultTimestamp = Long.MIN_VALUE;
+
private long leftTimestamp = Long.MIN_VALUE;
private long rightTimestamp = Long.MIN_VALUE;
@@ -321,6 +328,12 @@ private ContextImpl(ProcessJoinFunction func)
{
func.super();
}
+ private void updateTimestamps(long left, long right, long
result) {
+ this.leftTimestamp = left;
+ this.rightTimestamp = right;
+ this.resultTimestamp = result;
+ }
+
@Override
public long getLeftTimestamp() {
return leftTimestamp;
@@ -333,7 +346,7 @@ public long getRightTimestamp() {
@Override
public long getTimestamp() {
- return leftTimestamp;
+ return resultTimestamp;
}
@Override
diff --git