yunfengzhou-hub commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r879052071


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheReader.java:
##########
@@ -20,120 +20,122 @@
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-
-import javax.annotation.Nullable;
+import org.apache.flink.runtime.memory.MemoryManager;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
-/** Reads the cached data from a list of paths. */
+/** Reads the cached data from a list of segments. */
 public class DataCacheReader<T> implements Iterator<T> {
 
-    private final TypeSerializer<T> serializer;
+    private final MemoryManager memoryManager;
 
-    private final FileSystem fileSystem;
+    private final TypeSerializer<T> serializer;
 
     private final List<Segment> segments;
 
-    @Nullable private SegmentReader currentSegmentReader;
+    private SegmentReader<T> currentReader;
+
+    private SegmentWriter<T> cacheWriter;
+
+    private int segmentIndex;
 
     public DataCacheReader(
-            TypeSerializer<T> serializer, FileSystem fileSystem, List<Segment> 
segments)
-            throws IOException {
-        this(serializer, fileSystem, segments, new Tuple2<>(0, 0));
+            TypeSerializer<T> serializer, MemoryManager memoryManager, 
List<Segment> segments) {
+        this(serializer, memoryManager, segments, new Tuple2<>(0, 0));
     }
 
     public DataCacheReader(
             TypeSerializer<T> serializer,
-            FileSystem fileSystem,
+            MemoryManager memoryManager,
             List<Segment> segments,
-            Tuple2<Integer, Integer> readerPosition)
-            throws IOException {
-
+            Tuple2<Integer, Integer> readerPosition) {
+        this.memoryManager = memoryManager;
         this.serializer = serializer;
-        this.fileSystem = fileSystem;
         this.segments = segments;
+        this.segmentIndex = readerPosition.f0;
+
+        createSegmentReaderAndCache(readerPosition.f0, readerPosition.f1);
+    }
+
+    private void createSegmentReaderAndCache(int index, int startOffset) {
+        try {
+            cacheWriter = null;
 
-        if (readerPosition.f0 < segments.size()) {
-            this.currentSegmentReader = new SegmentReader(readerPosition.f0, 
readerPosition.f1);
+            if (index >= segments.size()) {
+                currentReader = null;
+                return;
+            }
+
+            currentReader = SegmentReader.create(serializer, 
segments.get(index), startOffset);
+
+            boolean shouldCacheInMemory =
+                    startOffset == 0
+                            && currentReader instanceof FsSegmentReader
+                            && 
MemoryUtils.isMemoryEnoughForCache(memoryManager);
+
+            if (shouldCacheInMemory) {
+                cacheWriter =
+                        SegmentWriter.create(
+                                segments.get(index).getPath(),
+                                memoryManager,
+                                serializer,
+                                segments.get(index).getFsSize(),
+                                true,
+                                false);
+            }
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
     }
 
     @Override
     public boolean hasNext() {
-        return currentSegmentReader != null && currentSegmentReader.hasNext();
+        return currentReader != null && currentReader.hasNext();
     }
 
     @Override
     public T next() {
         try {
-            T next = currentSegmentReader.next();
-
-            if (!currentSegmentReader.hasNext()) {
-                currentSegmentReader.close();
-                if (currentSegmentReader.index < segments.size() - 1) {
-                    currentSegmentReader = new 
SegmentReader(currentSegmentReader.index + 1, 0);
-                } else {
-                    currentSegmentReader = null;
+            T record = currentReader.next();
+
+            if (cacheWriter != null) {
+                if (!cacheWriter.addRecord(record)) {

Review Comment:
   In cases when the flink job is restored from a snapshot, which means 
previous caches created by the segment writer in `DataCacheWriter` is 
non-existent, this cache writer helps to re-create the caches during reading 
process.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to