yunfengzhou-hub commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r876660941


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -89,57 +104,31 @@ public List<Segment> getFinishSegments() {
         return finishSegments;
     }
 
-    private void finishCurrentSegment(boolean newSegment) throws IOException {
-        if (currentSegment != null) {
-            currentSegment.finish().ifPresent(finishSegments::add);
-            currentSegment = null;
-        }
-
-        if (newSegment) {
-            currentSegment = new SegmentWriter(pathGenerator.get());
-        }
-    }
-
-    private class SegmentWriter {
-
-        private final Path path;
-
-        private final FSDataOutputStream outputStream;
-
-        private final DataOutputView outputView;
-
-        private int currentSegmentCount;
-
-        public SegmentWriter(Path path) throws IOException {
-            this.path = path;
-            this.outputStream = fileSystem.create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
-            this.outputView = new DataOutputViewStreamWrapper(outputStream);
-        }
-
-        public void addRecord(T record) throws IOException {
-            serializer.serialize(record, outputView);
-            currentSegmentCount += 1;
-        }
+    private SegmentWriter<T> createSegmentWriter(
+            SupplierWithException<Path, IOException> pathGenerator, 
MemoryManager memoryManager)
+            throws IOException {
+        boolean shouldCacheInMemory = 
MemoryUtils.isMemoryEnoughForCache(memoryManager);
 
-        public Optional<Segment> finish() throws IOException {
-            this.outputStream.flush();
-            long size = outputStream.getPos();
-            this.outputStream.close();
-
-            if (currentSegmentCount > 0) {
-                return Optional.of(new Segment(path, currentSegmentCount, 
size));
-            } else {
-                // If there are no records, we tend to directly delete this 
file
-                fileSystem.delete(path, false);
-                return Optional.empty();
+        if (shouldCacheInMemory) {
+            try {
+                return new MemorySegmentWriter<>(pathGenerator.get(), 
memoryManager);
+            } catch (MemoryAllocationException e) {
+                return new FsSegmentWriter<>(serializer, pathGenerator.get());
             }
         }
+        return new FsSegmentWriter<>(serializer, pathGenerator.get());
     }
 
     public void cleanup() throws IOException {
-        finishCurrentSegment();
+        finish();
         for (Segment segment : finishSegments) {
-            fileSystem.delete(segment.getPath(), false);
+            if (segment.isOnDisk()) {
+                fileSystem.delete(segment.path, false);
+            }
+            if (segment.isInMemory()) {
+                memoryManager.releaseMemory(segment.path, 
segment.inMemorySize);

Review Comment:
   Memory is reserved in `MemorySegmentWriter`. I had set the key to the writer 
object, rather than segment.path. I'll correct this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to