This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push: new 454f7d1 [FLINK-27096] Flush buffer at epoch watermark 454f7d1 is described below commit 454f7d13a1a490fb7f0a559bb2a13ae4b0788c29 Author: yunfengzhou-hub <yuri.zhouyunf...@outlook.com> AuthorDate: Mon Jun 20 12:31:20 2022 +0800 [FLINK-27096] Flush buffer at epoch watermark This closes #112. --- .../iteration/broadcast/RecordWriterBroadcastOutput.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java index fc99865..c11e6c8 100644 --- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java @@ -18,6 +18,7 @@ package org.apache.flink.iteration.broadcast; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.iteration.IterationRecord; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; @@ -42,5 +43,16 @@ public class RecordWriterBroadcastOutput<OUT> implements BroadcastOutput<OUT> { public void broadcastEmit(StreamRecord<OUT> record) throws IOException { serializationDelegate.setInstance(record); recordWriter.broadcastEmit(serializationDelegate); + if (isIterationEpochWatermark(record)) { + recordWriter.flushAll(); + } + } + + private static <T> boolean isIterationEpochWatermark(StreamRecord<T> record) { + if (!(record.getValue() instanceof IterationRecord)) { + return false; + } + IterationRecord<?> iterationRecord = (IterationRecord<?>) record.getValue(); + return iterationRecord.getType().equals(IterationRecord.Type.EPOCH_WATERMARK); } }