yunfengzhou-hub commented on code in PR #97: URL: https://github.com/apache/flink-ml/pull/97#discussion_r879052071
########## flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheReader.java: ########## @@ -20,120 +20,122 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; - -import javax.annotation.Nullable; +import org.apache.flink.runtime.memory.MemoryManager; import java.io.IOException; import java.util.Iterator; import java.util.List; -/** Reads the cached data from a list of paths. */ +/** Reads the cached data from a list of segments. */ public class DataCacheReader<T> implements Iterator<T> { - private final TypeSerializer<T> serializer; + private final MemoryManager memoryManager; - private final FileSystem fileSystem; + private final TypeSerializer<T> serializer; private final List<Segment> segments; - @Nullable private SegmentReader currentSegmentReader; + private SegmentReader<T> currentReader; + + private SegmentWriter<T> cacheWriter; + + private int segmentIndex; public DataCacheReader( - TypeSerializer<T> serializer, FileSystem fileSystem, List<Segment> segments) - throws IOException { - this(serializer, fileSystem, segments, new Tuple2<>(0, 0)); + TypeSerializer<T> serializer, MemoryManager memoryManager, List<Segment> segments) { + this(serializer, memoryManager, segments, new Tuple2<>(0, 0)); } public DataCacheReader( TypeSerializer<T> serializer, - FileSystem fileSystem, + MemoryManager memoryManager, List<Segment> segments, - Tuple2<Integer, Integer> readerPosition) - throws IOException { - + Tuple2<Integer, Integer> readerPosition) { + this.memoryManager = memoryManager; this.serializer = serializer; - this.fileSystem = fileSystem; this.segments = segments; + this.segmentIndex = readerPosition.f0; + + createSegmentReaderAndCache(readerPosition.f0, readerPosition.f1); + } + + private void createSegmentReaderAndCache(int index, int startOffset) { + try { + cacheWriter = null; - if (readerPosition.f0 < segments.size()) { - this.currentSegmentReader = new SegmentReader(readerPosition.f0, readerPosition.f1); + if (index >= segments.size()) { + currentReader = null; + return; + } + + currentReader = SegmentReader.create(serializer, segments.get(index), startOffset); + + boolean shouldCacheInMemory = + startOffset == 0 + && currentReader instanceof FsSegmentReader + && MemoryUtils.isMemoryEnoughForCache(memoryManager); + + if (shouldCacheInMemory) { + cacheWriter = + SegmentWriter.create( + segments.get(index).getPath(), + memoryManager, + serializer, + segments.get(index).getFsSize(), + true, + false); + } + + } catch (IOException e) { + throw new RuntimeException(e); } } @Override public boolean hasNext() { - return currentSegmentReader != null && currentSegmentReader.hasNext(); + return currentReader != null && currentReader.hasNext(); } @Override public T next() { try { - T next = currentSegmentReader.next(); - - if (!currentSegmentReader.hasNext()) { - currentSegmentReader.close(); - if (currentSegmentReader.index < segments.size() - 1) { - currentSegmentReader = new SegmentReader(currentSegmentReader.index + 1, 0); - } else { - currentSegmentReader = null; + T record = currentReader.next(); + + if (cacheWriter != null) { + if (!cacheWriter.addRecord(record)) { Review Comment: In cases when the flink job is restored from a snapshot, which means previous caches created by the segment writer in `DataCacheWriter` is non-existent, this cache writer helps to re-create the caches during reading process. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org