yunfengzhou-hub commented on code in PR #97: URL: https://github.com/apache/flink-ml/pull/97#discussion_r888749310
########## flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java: ########## @@ -19,127 +19,104 @@ package org.apache.flink.iteration.datacache.nonkeyed; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.SupplierWithException; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Optional; -/** Records the data received and replayed them on required. */ +/** Records the data received and replays them on required. */ public class DataCacheWriter<T> { - private final TypeSerializer<T> serializer; - private final FileSystem fileSystem; private final SupplierWithException<Path, IOException> pathGenerator; - private final List<Segment> finishSegments; + private final LimitedSizeMemoryManager memoryManager; + + private final TypeSerializer<T> serializer; + + private final List<Segment> finishedSegments; - private SegmentWriter currentSegment; + private SegmentWriter<T> currentWriter; public DataCacheWriter( TypeSerializer<T> serializer, FileSystem fileSystem, - SupplierWithException<Path, IOException> pathGenerator) + SupplierWithException<Path, IOException> pathGenerator, + LimitedSizeMemoryManager memoryManager) throws IOException { - this(serializer, fileSystem, pathGenerator, Collections.emptyList()); + this(serializer, fileSystem, pathGenerator, memoryManager, Collections.emptyList()); } public DataCacheWriter( TypeSerializer<T> serializer, FileSystem fileSystem, SupplierWithException<Path, IOException> pathGenerator, + LimitedSizeMemoryManager memoryManager, List<Segment> priorFinishedSegments) throws IOException { this.serializer = serializer; this.fileSystem = fileSystem; this.pathGenerator = pathGenerator; - - this.finishSegments = new ArrayList<>(priorFinishedSegments); - - this.currentSegment = new SegmentWriter(pathGenerator.get()); + this.memoryManager = memoryManager; + this.finishedSegments = new ArrayList<>(priorFinishedSegments); + this.currentWriter = + SegmentWriter.create( + pathGenerator.get(), this.memoryManager, serializer, 0L, true, true); } public void addRecord(T record) throws IOException { - currentSegment.addRecord(record); + boolean success = currentWriter.addRecord(record); Review Comment: I believe such case is possible. For example, we have written size of a vector to the last bytes of a memory segment, and failed to continue writing the values of the vector because no more segments are available for this operator in memory manager. In this case the code would re-create an `FsSegmentWriter` and re-write the size of the vector to the file. Such case exists, but it will do no harm to the program, because the idling `size` value in the segment is not tracked so would not be accessed. It does not waste memory space either since we are allocating space at segments' granularity. It will be released afterwards along with those valid values in the segment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org