This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new cb376d7e [hotfix] Discard watermarks when feed a datastream into 
iteration body
cb376d7e is described below

commit cb376d7e45074a6052c5331e511cfc5aca224ddf
Author: Zhipeng Zhang <zhangzhipe...@gmail.com>
AuthorDate: Fri Apr 14 15:09:03 2023 +0800

    [hotfix] Discard watermarks when feed a datastream into iteration body
    
    This closes #223.
---
 .../src/main/java/org/apache/flink/iteration/Iterations.java        | 3 +++
 .../java/org/apache/flink/iteration/operator/InputOperator.java     | 6 ++++++
 2 files changed, 9 insertions(+)

diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
index 6e6ff9d3..8f716770 100644
--- 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
@@ -85,6 +85,9 @@ import static org.apache.flink.util.Preconditions.checkState;
  * <p>The limitation of constructing the subgraph inside the iteration body 
could be refer in {@link
  * IterationBody}.
  *
+ * <p>Note that the iteration framework cannot deal with watermarks correctly 
for now. It should be
+ * resolved by FLINK-31373.
+ *
  * <p>An example of the iteration is like:
  *
  * <pre>{@code
diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
index b6908ff8..604715f1 100644
--- 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
@@ -22,6 +22,7 @@ import org.apache.flink.iteration.IterationRecord;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /** Input operator that wraps the user record into {@link IterationRecord}. */
@@ -46,4 +47,9 @@ public class InputOperator<T> extends 
AbstractStreamOperator<IterationRecord<T>>
         reusable.getValue().setValue(streamRecord.getValue());
         output.collect(reusable);
     }
+
+    @Override
+    public void processWatermark(Watermark mark) {
+        // TODO: FLINK-31373 Support processing watermarks in iterations.
+    }
 }

Reply via email to