This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push: new cb376d7e [hotfix] Discard watermarks when feed a datastream into iteration body cb376d7e is described below commit cb376d7e45074a6052c5331e511cfc5aca224ddf Author: Zhipeng Zhang <zhangzhipe...@gmail.com> AuthorDate: Fri Apr 14 15:09:03 2023 +0800 [hotfix] Discard watermarks when feed a datastream into iteration body This closes #223. --- .../src/main/java/org/apache/flink/iteration/Iterations.java | 3 +++ .../java/org/apache/flink/iteration/operator/InputOperator.java | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java index 6e6ff9d3..8f716770 100644 --- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java @@ -85,6 +85,9 @@ import static org.apache.flink.util.Preconditions.checkState; * <p>The limitation of constructing the subgraph inside the iteration body could be refer in {@link * IterationBody}. * + * <p>Note that the iteration framework cannot deal with watermarks correctly for now. It should be + * resolved by FLINK-31373. + * * <p>An example of the iteration is like: * * <pre>{@code diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java index b6908ff8..604715f1 100644 --- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java @@ -22,6 +22,7 @@ import org.apache.flink.iteration.IterationRecord; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** Input operator that wraps the user record into {@link IterationRecord}. */ @@ -46,4 +47,9 @@ public class InputOperator<T> extends AbstractStreamOperator<IterationRecord<T>> reusable.getValue().setValue(streamRecord.getValue()); output.collect(reusable); } + + @Override + public void processWatermark(Watermark mark) { + // TODO: FLINK-31373 Support processing watermarks in iterations. + } }