yunfengzhou-hub commented on code in PR #97: URL: https://github.com/apache/flink-ml/pull/97#discussion_r876660941
########## flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java: ########## @@ -89,57 +104,31 @@ public List<Segment> getFinishSegments() { return finishSegments; } - private void finishCurrentSegment(boolean newSegment) throws IOException { - if (currentSegment != null) { - currentSegment.finish().ifPresent(finishSegments::add); - currentSegment = null; - } - - if (newSegment) { - currentSegment = new SegmentWriter(pathGenerator.get()); - } - } - - private class SegmentWriter { - - private final Path path; - - private final FSDataOutputStream outputStream; - - private final DataOutputView outputView; - - private int currentSegmentCount; - - public SegmentWriter(Path path) throws IOException { - this.path = path; - this.outputStream = fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE); - this.outputView = new DataOutputViewStreamWrapper(outputStream); - } - - public void addRecord(T record) throws IOException { - serializer.serialize(record, outputView); - currentSegmentCount += 1; - } + private SegmentWriter<T> createSegmentWriter( + SupplierWithException<Path, IOException> pathGenerator, MemoryManager memoryManager) + throws IOException { + boolean shouldCacheInMemory = MemoryUtils.isMemoryEnoughForCache(memoryManager); - public Optional<Segment> finish() throws IOException { - this.outputStream.flush(); - long size = outputStream.getPos(); - this.outputStream.close(); - - if (currentSegmentCount > 0) { - return Optional.of(new Segment(path, currentSegmentCount, size)); - } else { - // If there are no records, we tend to directly delete this file - fileSystem.delete(path, false); - return Optional.empty(); + if (shouldCacheInMemory) { + try { + return new MemorySegmentWriter<>(pathGenerator.get(), memoryManager); + } catch (MemoryAllocationException e) { + return new FsSegmentWriter<>(serializer, pathGenerator.get()); } } + return new FsSegmentWriter<>(serializer, pathGenerator.get()); } public void cleanup() throws IOException { - finishCurrentSegment(); + finish(); for (Segment segment : finishSegments) { - fileSystem.delete(segment.getPath(), false); + if (segment.isOnDisk()) { + fileSystem.delete(segment.path, false); + } + if (segment.isInMemory()) { + memoryManager.releaseMemory(segment.path, segment.inMemorySize); Review Comment: Memory is reserved in `MemorySegmentWriter`. I had set the key to the writer object, rather than segment.path. I'll correct this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org