[ 
https://issues.apache.org/jira/browse/FLINK-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561226#comment-16561226
 ] 

ASF GitHub Bot commented on FLINK-9994:
---------------------------------------

asfgit closed pull request #6449: [FLINK-9994][DataStream API] IntervalJoinOp 
Context#getTimestamp() returns max timestamp.
URL: https://github.com/apache/flink/pull/6449
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
index 0c449e64f41..43085cb42c4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -152,6 +152,7 @@ public IntervalJoinOperator(
        @Override
        public void open() throws Exception {
                super.open();
+
                collector = new TimestampedCollector<>(output);
                context = new ContextImpl(userFunction);
                internalTimerService =
@@ -204,15 +205,15 @@ public void processElement2(StreamRecord<T2> record) 
throws Exception {
        }
 
        @SuppressWarnings("unchecked")
-       private <OUR, OTHER> void processElement(
-                       StreamRecord<OUR> record,
-                       MapState<Long, List<BufferEntry<OUR>>> ourBuffer,
-                       MapState<Long, List<BufferEntry<OTHER>>> otherBuffer,
-                       long relativeLowerBound,
-                       long relativeUpperBound,
-                       boolean isLeft) throws Exception {
-
-               final OUR ourValue = record.getValue();
+       private <THIS, OTHER> void processElement(
+                       final StreamRecord<THIS> record,
+                       final MapState<Long, 
List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
+                       final MapState<Long, 
List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
+                       final long relativeLowerBound,
+                       final long relativeUpperBound,
+                       final boolean isLeft) throws Exception {
+
+               final THIS ourValue = record.getValue();
                final long ourTimestamp = record.getTimestamp();
 
                if (ourTimestamp == Long.MIN_VALUE) {
@@ -257,14 +258,18 @@ private boolean isLate(long timestamp) {
        }
 
        private void collect(T1 left, T2 right, long leftTimestamp, long 
rightTimestamp) throws Exception {
-               long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
+               final long resultTimestamp = Math.max(leftTimestamp, 
rightTimestamp);
+
                collector.setAbsoluteTimestamp(resultTimestamp);
-               context.leftTimestamp = leftTimestamp;
-               context.rightTimestamp = rightTimestamp;
+               context.updateTimestamps(leftTimestamp, rightTimestamp, 
resultTimestamp);
+
                userFunction.processElement(left, right, context, collector);
        }
 
-       private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> 
buffer, T value, long timestamp) throws Exception {
+       private static <T> void addToBuffer(
+                       final MapState<Long, 
List<IntervalJoinOperator.BufferEntry<T>>> buffer,
+                       final T value,
+                       final long timestamp) throws Exception {
                List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
                if (elemsInBucket == null) {
                        elemsInBucket = new ArrayList<>();
@@ -313,6 +318,8 @@ public void onProcessingTime(InternalTimer<K, String> 
timer) throws Exception {
         */
        private final class ContextImpl extends ProcessJoinFunction<T1, T2, 
OUT>.Context {
 
+               private long resultTimestamp = Long.MIN_VALUE;
+
                private long leftTimestamp = Long.MIN_VALUE;
 
                private long rightTimestamp = Long.MIN_VALUE;
@@ -321,6 +328,12 @@ private ContextImpl(ProcessJoinFunction<T1, T2, OUT> func) 
{
                        func.super();
                }
 
+               private void updateTimestamps(long left, long right, long 
result) {
+                       this.leftTimestamp = left;
+                       this.rightTimestamp = right;
+                       this.resultTimestamp = result;
+               }
+
                @Override
                public long getLeftTimestamp() {
                        return leftTimestamp;
@@ -333,7 +346,7 @@ public long getRightTimestamp() {
 
                @Override
                public long getTimestamp() {
-                       return leftTimestamp;
+                       return resultTimestamp;
                }
 
                @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
index ee3f4d8fc12..53f514b98da 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
@@ -481,13 +481,16 @@ public void testReturnsCorrectTimestamp() throws 
Exception {
                                TestElem.serializer(),
                                TestElem.serializer(),
                                new ProcessJoinFunction<TestElem, TestElem, 
Tuple2<TestElem, TestElem>>() {
+
+                                       private static final long 
serialVersionUID = 1L;
+
                                        @Override
                                        public void processElement(
                                                TestElem left,
                                                TestElem right,
                                                Context ctx,
                                                Collector<Tuple2<TestElem, 
TestElem>> out) throws Exception {
-                                               Assert.assertEquals(left.ts, 
ctx.getTimestamp());
+                                               
Assert.assertEquals(Math.max(left.ts, right.ts), ctx.getTimestamp());
                                        }
                                }
                        );


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp.
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-9994
>                 URL: https://issues.apache.org/jira/browse/FLINK-9994
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.6.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to