JingsongLi commented on a change in pull request #14: URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791468220
########## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java ########## @@ -18,62 +18,129 @@ package org.apache.flink.table.store.file.operation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.FileFormat; import org.apache.flink.table.store.file.Snapshot; import org.apache.flink.table.store.file.manifest.ManifestEntry; import org.apache.flink.table.store.file.manifest.ManifestFile; import org.apache.flink.table.store.file.manifest.ManifestFileMeta; import org.apache.flink.table.store.file.manifest.ManifestList; +import org.apache.flink.table.store.file.predicate.And; +import org.apache.flink.table.store.file.predicate.Equal; +import org.apache.flink.table.store.file.predicate.Literal; +import org.apache.flink.table.store.file.predicate.Or; import org.apache.flink.table.store.file.predicate.Predicate; import org.apache.flink.table.store.file.utils.FileStorePathFactory; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** Default implementation of {@link FileStoreScan}. */ public class FileStoreScanImpl implements FileStoreScan { + private final RowType partitionType; + private final RowType keyType; + private final RowType rowType; + private final FileFormat fileFormat; private final FileStorePathFactory pathFactory; - private final ManifestFile manifestFile; + + private final List<RowData.FieldGetter> partitionFieldGetters; private final ManifestList manifestList; private Long snapshotId; private List<ManifestFileMeta> manifests; + private Predicate partitionFilter; + private Predicate keyFilter; + private Predicate valueFilter; + private Integer bucket; public FileStoreScanImpl( - FileStorePathFactory pathFactory, - ManifestFile manifestFile, - ManifestList manifestList) { + RowType partitionType, + RowType keyType, + RowType rowType, + FileFormat fileFormat, + FileStorePathFactory pathFactory) { + this.partitionType = partitionType; + this.keyType = keyType; + this.rowType = rowType; + this.fileFormat = fileFormat; this.pathFactory = pathFactory; - this.manifestFile = manifestFile; - this.manifestList = manifestList; + + this.partitionFieldGetters = Review comment: Can we extract `RowToArrayConverter`? ########## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java ########## @@ -31,6 +32,8 @@ FileStoreScan withPartitionFilter(Predicate predicate); + FileStoreScan withPartitionFilter(List<BinaryRowData> partitions); Review comment: `inPartitions`? ########## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java ########## @@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData partition, int bucket) } public String getPartitionString(BinaryRowData partition) { - return PartitionPathUtils.generatePartitionPath( - partitionComputer.generatePartValues( - Preconditions.checkNotNull( - partition, "Partition row data is null. This is unexpected."))); + synchronized (partitionComputer) { Review comment: Why needs add `synchronized`? ########## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java ########## @@ -109,34 +176,72 @@ public Long snapshotId() { } private List<ManifestEntry> scan() { - Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>(); - for (ManifestFileMeta manifest : manifests) { - // TODO read each manifest file concurrently - for (ManifestEntry entry : manifestFile.read(manifest.fileName())) { - ManifestEntry.Identifier identifier = entry.identifier(); - switch (entry.kind()) { - case ADD: - Preconditions.checkState( - !map.containsKey(identifier), - "Trying to add file %s which is already added. " - + "Manifest might be corrupted.", - identifier); - map.put(identifier, entry); - break; - case DELETE: - Preconditions.checkState( - map.containsKey(identifier), - "Trying to delete file %s which is not previously added. " - + "Manifest might be corrupted.", - identifier); - map.remove(identifier); - break; - default: - throw new UnsupportedOperationException( - "Unknown value kind " + entry.kind().name()); - } + List<ManifestEntry> entries; + try { + entries = + new ForkJoinPool() Review comment: Why use `ForkJoinPool`? ########## File path: flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java ########## @@ -71,7 +67,7 @@ public void beforeEach() throws IOException { root.getFileSystem().mkdirs(new Path(root + "/snapshot")); } - protected abstract String getSchema(); + protected abstract String getScheme(); Review comment: `getSchema`? ########## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java ########## @@ -109,34 +176,72 @@ public Long snapshotId() { } private List<ManifestEntry> scan() { - Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>(); - for (ManifestFileMeta manifest : manifests) { - // TODO read each manifest file concurrently - for (ManifestEntry entry : manifestFile.read(manifest.fileName())) { - ManifestEntry.Identifier identifier = entry.identifier(); - switch (entry.kind()) { - case ADD: - Preconditions.checkState( - !map.containsKey(identifier), - "Trying to add file %s which is already added. " - + "Manifest might be corrupted.", - identifier); - map.put(identifier, entry); - break; - case DELETE: - Preconditions.checkState( - map.containsKey(identifier), - "Trying to delete file %s which is not previously added. " - + "Manifest might be corrupted.", - identifier); - map.remove(identifier); - break; - default: - throw new UnsupportedOperationException( - "Unknown value kind " + entry.kind().name()); - } + List<ManifestEntry> entries; + try { + entries = + new ForkJoinPool() + .submit( + () -> + manifests + .parallelStream() + .filter(this::filterManifestFileMeta) + .flatMap(m -> readManifestFileMeta(m).stream()) Review comment: filter `filterManifestEntry` in `readManifestFileMeta(m).stream()`? ########## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java ########## @@ -109,34 +176,72 @@ public Long snapshotId() { } private List<ManifestEntry> scan() { - Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>(); - for (ManifestFileMeta manifest : manifests) { - // TODO read each manifest file concurrently - for (ManifestEntry entry : manifestFile.read(manifest.fileName())) { - ManifestEntry.Identifier identifier = entry.identifier(); - switch (entry.kind()) { - case ADD: - Preconditions.checkState( - !map.containsKey(identifier), - "Trying to add file %s which is already added. " - + "Manifest might be corrupted.", - identifier); - map.put(identifier, entry); - break; - case DELETE: - Preconditions.checkState( - map.containsKey(identifier), - "Trying to delete file %s which is not previously added. " - + "Manifest might be corrupted.", - identifier); - map.remove(identifier); - break; - default: - throw new UnsupportedOperationException( - "Unknown value kind " + entry.kind().name()); - } + List<ManifestEntry> entries; + try { + entries = + new ForkJoinPool() + .submit( + () -> + manifests + .parallelStream() + .filter(this::filterManifestFileMeta) + .flatMap(m -> readManifestFileMeta(m).stream()) + .filter(this::filterManifestEntry) + .collect(Collectors.toList())) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed to read ManifestEntry list concurrently", e); + } + + Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>(); + for (ManifestEntry entry : entries) { + ManifestEntry.Identifier identifier = entry.identifier(); + switch (entry.kind()) { + case ADD: + Preconditions.checkState( + !map.containsKey(identifier), + "Trying to add file %s which is already added. " + + "Manifest might be corrupted.", + identifier); + map.put(identifier, entry); + break; + case DELETE: + Preconditions.checkState( + map.containsKey(identifier), + "Trying to delete file %s which is not previously added. " + + "Manifest might be corrupted.", + identifier); + map.remove(identifier); + break; + default: + throw new UnsupportedOperationException( + "Unknown value kind " + entry.kind().name()); } } return new ArrayList<>(map.values()); } + + private boolean filterManifestFileMeta(ManifestFileMeta manifest) { + return partitionFilter == null + || partitionFilter.test( + manifest.numAddedFiles() + manifest.numDeletedFiles(), + manifest.partitionStats()); + } + + private boolean filterManifestEntry(ManifestEntry entry) { + // TODO apply key & value filter after field stats are collected in + // SstFile.RollingFile#finish + return (partitionFilter == null + || partitionFilter.test( + partitionFieldGetters.stream() + .map(g -> g.getFieldOrNull(entry.partition())) + .toArray())) + && (bucket == null || entry.bucket() == bucket); + } + + private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) { + // ManifestFile is not thread safe + return new ManifestFile(partitionType, keyType, rowType, fileFormat, pathFactory) Review comment: Create a thread local to reuse `ManifestFile` object? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org