Repository: flink Updated Branches: refs/heads/master e7a060947 -> 68f446c9f
[hotfix] Reorder ProcessOperator methods so they conform to the order in the interface Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68f446c9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68f446c9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68f446c9 Branch: refs/heads/master Commit: 68f446c9f38f3faf70413fb19aab742871bb4a33 Parents: e7a0609 Author: Bowen Li <[email protected]> Authored: Wed Oct 25 23:01:22 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Oct 26 15:05:34 2017 +0200 ---------------------------------------------------------------------- .../api/operators/KeyedProcessOperator.java | 27 ++++++++++---------- .../operators/co/KeyedCoProcessOperator.java | 12 ++++----- 2 files changed, 20 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/68f446c9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java index 0f4b4f5..6501a9d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java @@ -118,16 +118,17 @@ public class KeyedProcessOperator<K, IN, OUT> } @Override + public TimerService timerService() { + return timerService; + } + + @Override public <X> void output(OutputTag<X> outputTag, X value) { if (outputTag == null) { throw new IllegalArgumentException("OutputTag must not be null."); } - output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp())); - } - @Override - public TimerService timerService() { - return timerService; + output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp())); } } @@ -145,18 +146,17 @@ public class KeyedProcessOperator<K, IN, OUT> } @Override - public TimeDomain timeDomain() { - checkState(timeDomain != null); - return timeDomain; - } - - @Override public Long timestamp() { checkState(timer != null); return timer.getTimestamp(); } @Override + public TimerService timerService() { + return timerService; + } + + @Override public <X> void output(OutputTag<X> outputTag, X value) { if (outputTag == null) { throw new IllegalArgumentException("OutputTag must not be null."); @@ -166,8 +166,9 @@ public class KeyedProcessOperator<K, IN, OUT> } @Override - public TimerService timerService() { - return timerService; + public TimeDomain timeDomain() { + checkState(timeDomain != null); + return timeDomain; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/68f446c9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java index e9402cf..4e2bfc7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java @@ -162,12 +162,6 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT> } @Override - public TimeDomain timeDomain() { - checkState(timeDomain != null); - return timeDomain; - } - - @Override public Long timestamp() { checkState(timer != null); return timer.getTimestamp(); @@ -186,5 +180,11 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT> output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp())); } + + @Override + public TimeDomain timeDomain() { + checkState(timeDomain != null); + return timeDomain; + } } }
