This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new 454f7d1  [FLINK-27096] Flush buffer at epoch watermark
454f7d1 is described below

commit 454f7d13a1a490fb7f0a559bb2a13ae4b0788c29
Author: yunfengzhou-hub <yuri.zhouyunf...@outlook.com>
AuthorDate: Mon Jun 20 12:31:20 2022 +0800

    [FLINK-27096] Flush buffer at epoch watermark
    
    This closes #112.
---
 .../iteration/broadcast/RecordWriterBroadcastOutput.java     | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java
index fc99865..c11e6c8 100644
--- 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java
@@ -18,6 +18,7 @@
 package org.apache.flink.iteration.broadcast;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.iteration.IterationRecord;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -42,5 +43,16 @@ public class RecordWriterBroadcastOutput<OUT> implements 
BroadcastOutput<OUT> {
     public void broadcastEmit(StreamRecord<OUT> record) throws IOException {
         serializationDelegate.setInstance(record);
         recordWriter.broadcastEmit(serializationDelegate);
+        if (isIterationEpochWatermark(record)) {
+            recordWriter.flushAll();
+        }
+    }
+
+    private static <T> boolean isIterationEpochWatermark(StreamRecord<T> 
record) {
+        if (!(record.getValue() instanceof IterationRecord)) {
+            return false;
+        }
+        IterationRecord<?> iterationRecord = (IterationRecord<?>) 
record.getValue();
+        return 
iterationRecord.getType().equals(IterationRecord.Type.EPOCH_WATERMARK);
     }
 }

Reply via email to