JingsongLi commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791468220



##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -18,62 +18,129 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Or;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /** Default implementation of {@link FileStoreScan}. */
 public class FileStoreScanImpl implements FileStoreScan {
 
+    private final RowType partitionType;
+    private final RowType keyType;
+    private final RowType rowType;
+    private final FileFormat fileFormat;
     private final FileStorePathFactory pathFactory;
-    private final ManifestFile manifestFile;
+
+    private final List<RowData.FieldGetter> partitionFieldGetters;
     private final ManifestList manifestList;
 
     private Long snapshotId;
     private List<ManifestFileMeta> manifests;
+    private Predicate partitionFilter;
+    private Predicate keyFilter;
+    private Predicate valueFilter;
+    private Integer bucket;
 
     public FileStoreScanImpl(
-            FileStorePathFactory pathFactory,
-            ManifestFile manifestFile,
-            ManifestList manifestList) {
+            RowType partitionType,
+            RowType keyType,
+            RowType rowType,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory) {
+        this.partitionType = partitionType;
+        this.keyType = keyType;
+        this.rowType = rowType;
+        this.fileFormat = fileFormat;
         this.pathFactory = pathFactory;
-        this.manifestFile = manifestFile;
-        this.manifestList = manifestList;
+
+        this.partitionFieldGetters =

Review comment:
       Can we extract `RowToArrayConverter`?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
##########
@@ -31,6 +32,8 @@
 
     FileStoreScan withPartitionFilter(Predicate predicate);
 
+    FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);

Review comment:
       `inPartitions`?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData 
partition, int bucket)
     }
 
     public String getPartitionString(BinaryRowData partition) {
-        return PartitionPathUtils.generatePartitionPath(
-                partitionComputer.generatePartValues(
-                        Preconditions.checkNotNull(
-                                partition, "Partition row data is null. This 
is unexpected.")));
+        synchronized (partitionComputer) {

Review comment:
       Why needs add `synchronized`?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new 
LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) 
{
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. 
"
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not 
previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       Why use `ForkJoinPool`?

##########
File path: 
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
##########
@@ -71,7 +67,7 @@ public void beforeEach() throws IOException {
         root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
     }
 
-    protected abstract String getSchema();
+    protected abstract String getScheme();

Review comment:
       `getSchema`?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new 
LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) 
{
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. 
"
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not 
previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()
+                            .submit(
+                                    () ->
+                                            manifests
+                                                    .parallelStream()
+                                                    
.filter(this::filterManifestFileMeta)
+                                                    .flatMap(m -> 
readManifestFileMeta(m).stream())

Review comment:
       filter `filterManifestEntry` in `readManifestFileMeta(m).stream()`?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new 
LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) 
{
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. 
"
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not 
previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()
+                            .submit(
+                                    () ->
+                                            manifests
+                                                    .parallelStream()
+                                                    
.filter(this::filterManifestFileMeta)
+                                                    .flatMap(m -> 
readManifestFileMeta(m).stream())
+                                                    
.filter(this::filterManifestEntry)
+                                                    
.collect(Collectors.toList()))
+                            .get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Failed to read ManifestEntry list 
concurrently", e);
+        }
+
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
+        for (ManifestEntry entry : entries) {
+            ManifestEntry.Identifier identifier = entry.identifier();
+            switch (entry.kind()) {
+                case ADD:
+                    Preconditions.checkState(
+                            !map.containsKey(identifier),
+                            "Trying to add file %s which is already added. "
+                                    + "Manifest might be corrupted.",
+                            identifier);
+                    map.put(identifier, entry);
+                    break;
+                case DELETE:
+                    Preconditions.checkState(
+                            map.containsKey(identifier),
+                            "Trying to delete file %s which is not previously 
added. "
+                                    + "Manifest might be corrupted.",
+                            identifier);
+                    map.remove(identifier);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unknown value kind " + entry.kind().name());
             }
         }
         return new ArrayList<>(map.values());
     }
+
+    private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
+        return partitionFilter == null
+                || partitionFilter.test(
+                        manifest.numAddedFiles() + manifest.numDeletedFiles(),
+                        manifest.partitionStats());
+    }
+
+    private boolean filterManifestEntry(ManifestEntry entry) {
+        // TODO apply key & value filter after field stats are collected in
+        //  SstFile.RollingFile#finish
+        return (partitionFilter == null
+                        || partitionFilter.test(
+                                partitionFieldGetters.stream()
+                                        .map(g -> 
g.getFieldOrNull(entry.partition()))
+                                        .toArray()))
+                && (bucket == null || entry.bucket() == bucket);
+    }
+
+    private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta 
manifest) {
+        // ManifestFile is not thread safe
+        return new ManifestFile(partitionType, keyType, rowType, fileFormat, 
pathFactory)

Review comment:
       Create a thread local to reuse `ManifestFile` object?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to