[jira] [Commented] (FLINK-9994) IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp.

2018-07-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561226#comment-16561226
 ] 

ASF GitHub Bot commented on FLINK-9994:
---

asfgit closed pull request #6449: [FLINK-9994][DataStream API] IntervalJoinOp 
Context#getTimestamp() returns max timestamp.
URL: https://github.com/apache/flink/pull/6449
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
index 0c449e64f41..43085cb42c4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -152,6 +152,7 @@ public IntervalJoinOperator(
@Override
public void open() throws Exception {
super.open();
+
collector = new TimestampedCollector<>(output);
context = new ContextImpl(userFunction);
internalTimerService =
@@ -204,15 +205,15 @@ public void processElement2(StreamRecord record) 
throws Exception {
}
 
@SuppressWarnings("unchecked")
-   private  void processElement(
-   StreamRecord record,
-   MapState>> ourBuffer,
-   MapState>> otherBuffer,
-   long relativeLowerBound,
-   long relativeUpperBound,
-   boolean isLeft) throws Exception {
-
-   final OUR ourValue = record.getValue();
+   private  void processElement(
+   final StreamRecord record,
+   final MapState>> ourBuffer,
+   final MapState>> otherBuffer,
+   final long relativeLowerBound,
+   final long relativeUpperBound,
+   final boolean isLeft) throws Exception {
+
+   final THIS ourValue = record.getValue();
final long ourTimestamp = record.getTimestamp();
 
if (ourTimestamp == Long.MIN_VALUE) {
@@ -257,14 +258,18 @@ private boolean isLate(long timestamp) {
}
 
private void collect(T1 left, T2 right, long leftTimestamp, long 
rightTimestamp) throws Exception {
-   long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
+   final long resultTimestamp = Math.max(leftTimestamp, 
rightTimestamp);
+
collector.setAbsoluteTimestamp(resultTimestamp);
-   context.leftTimestamp = leftTimestamp;
-   context.rightTimestamp = rightTimestamp;
+   context.updateTimestamps(leftTimestamp, rightTimestamp, 
resultTimestamp);
+
userFunction.processElement(left, right, context, collector);
}
 
-   private  void addToBuffer(MapState>> 
buffer, T value, long timestamp) throws Exception {
+   private static  void addToBuffer(
+   final MapState>> buffer,
+   final T value,
+   final long timestamp) throws Exception {
List> elemsInBucket = buffer.get(timestamp);
if (elemsInBucket == null) {
elemsInBucket = new ArrayList<>();
@@ -313,6 +318,8 @@ public void onProcessingTime(InternalTimer 
timer) throws Exception {
 */
private final class ContextImpl extends ProcessJoinFunction.Context {
 
+   private long resultTimestamp = Long.MIN_VALUE;
+
private long leftTimestamp = Long.MIN_VALUE;
 
private long rightTimestamp = Long.MIN_VALUE;
@@ -321,6 +328,12 @@ private ContextImpl(ProcessJoinFunction func) 
{
func.super();
}
 
+   private void updateTimestamps(long left, long right, long 
result) {
+   this.leftTimestamp = left;
+   this.rightTimestamp = right;
+   this.resultTimestamp = result;
+   }
+
@Override
public long getLeftTimestamp() {
return leftTimestamp;
@@ -333,7 +346,7 @@ public long getRightTimestamp() {
 
@Override
public long getTimestamp() {
-   return leftTimestamp;
+   return resultTimestamp;
}
 
@Override
diff --git 

[jira] [Commented] (FLINK-9994) IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp.

2018-07-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561091#comment-16561091
 ] 

ASF GitHub Bot commented on FLINK-9994:
---

kl0u opened a new pull request #6449: [FLINK-9994][DataStream API] 
IntervalJoinOp Context#getTimestamp() returns max timestamp.
URL: https://github.com/apache/flink/pull/6449
 
 
   ## What is the purpose of the change
   
   Although the `Context.getTimestamp()` in the `IntervalJoinOperator` should 
return the max timestamp between the elements in the joined pair, it currently 
returns the one of the "left" element. 
   
   This is a remain from past versions of the code, as the timestamp of the 
collector is correctly updated to the max timestamp between the ones in the 
matched pair.
   
   ## Brief change log
   
   The main change is in the `collect(T1 left, T2 right, long leftTimestamp, 
long rightTimestamp)` of the `InternalJoinOperator`.
   
   ## Verifying this change
   
   The tests that were testing this behavior were wrong and they are now fixed.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp.
> 
>
> Key: FLINK-9994
> URL: https://issues.apache.org/jira/browse/FLINK-9994
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)