n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r271948421
########## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java ########## @@ -16,99 +16,88 @@ package com.uber.hoodie.common.table.view; -import com.google.common.collect.ImmutableMap; +import com.google.common.base.Preconditions; import com.uber.hoodie.common.model.CompactionOperation; -import com.uber.hoodie.common.model.FileSlice; -import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieFileGroup; import com.uber.hoodie.common.model.HoodieFileGroupId; -import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; -import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.util.CompactionUtils; -import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.Option; import com.uber.hoodie.common.util.collection.Pair; -import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.function.Predicate; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; /** - * Common abstract implementation for multiple TableFileSystemView Implementations. 2 possible - * implementations are ReadOptimizedView and RealtimeView <p> Concrete implementations extending - * this abstract class, should only implement getDataFilesInPartition which includes files to be - * included in the view - * + * TableFileSystemView Implementations based on in-memory storage. * @see TableFileSystemView * @since 0.3.0 */ -public class HoodieTableFileSystemView implements TableFileSystemView, - TableFileSystemView.ReadOptimizedView, - TableFileSystemView.RealtimeView, Serializable { +public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystemView { private static Logger log = LogManager.getLogger(HoodieTableFileSystemView.class); - protected HoodieTableMetaClient metaClient; - // This is the commits that will be visible for all views extending this view - protected HoodieTimeline visibleActiveTimeline; - // mapping from partition paths to file groups contained within them - protected HashMap<String, List<HoodieFileGroup>> partitionToFileGroupsMap; - // mapping from file id to the file group. - protected HashMap<HoodieFileGroupId, HoodieFileGroup> fileGroupMap; + protected Map<String, List<HoodieFileGroup>> partitionToFileGroupsMap; /** * PartitionPath + File-Id to pending compaction instant time */ - private final Map<HoodieFileGroupId, Pair<String, CompactionOperation>> fgIdToPendingCompaction; + private Map<HoodieFileGroupId, Pair<String, CompactionOperation>> fgIdToPendingCompaction; + + /** + * Flag to determine if closed + */ + private boolean closed = false; + + public HoodieTableFileSystemView() { + } /** * Create a file system view, as of the given timeline */ - public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, - HoodieTimeline visibleActiveTimeline) { - this.metaClient = metaClient; - this.visibleActiveTimeline = visibleActiveTimeline; - this.fileGroupMap = new HashMap<>(); - this.partitionToFileGroupsMap = new HashMap<>(); + public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) { + init(metaClient, visibleActiveTimeline); + } - // Build fileId to Pending Compaction Instants - List<HoodieInstant> pendingCompactionInstants = - metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList()); - this.fgIdToPendingCompaction = ImmutableMap.copyOf( - CompactionUtils.getAllPendingCompactionOperations(metaClient).entrySet().stream() - .map(entry -> Pair.of(entry.getKey(), Pair.of(entry.getValue().getKey(), - CompactionOperation.convertFromAvroRecordInstance(entry.getValue().getValue())))) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue))); + @Override + public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) { + this.partitionToFileGroupsMap = createPartitionToFileGroups(); + super.init(metaClient, visibleActiveTimeline); + } + + @Override + protected void resetViewState() { + this.fgIdToPendingCompaction = null; + this.partitionToFileGroupsMap = null; + } + + protected Map<String, List<HoodieFileGroup>> createPartitionToFileGroups() { + return new ConcurrentHashMap<>(); Review comment: throw unsupported here ? What does returning empty signify ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services