tsreaper commented on a change in pull request #14: URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791535456
########## File path: flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java ########## @@ -71,7 +67,7 @@ public void beforeEach() throws IOException { root.getFileSystem().mkdirs(new Path(root + "/snapshot")); } - protected abstract String getSchema(); + protected abstract String getScheme(); Review comment: Scheme. This is the file system scheme, not table schema. ########## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java ########## @@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData partition, int bucket) } public String getPartitionString(BinaryRowData partition) { - return PartitionPathUtils.generatePartitionPath( - partitionComputer.generatePartValues( - Preconditions.checkNotNull( - partition, "Partition row data is null. This is unexpected."))); + synchronized (partitionComputer) { Review comment: Because we have to make sure that this class is thread safe. Partition computer is not thread safe. Another option is to create a read-only path factory which only supports a limited number of methods. ########## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java ########## @@ -109,34 +176,72 @@ public Long snapshotId() { } private List<ManifestEntry> scan() { - Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>(); - for (ManifestFileMeta manifest : manifests) { - // TODO read each manifest file concurrently - for (ManifestEntry entry : manifestFile.read(manifest.fileName())) { - ManifestEntry.Identifier identifier = entry.identifier(); - switch (entry.kind()) { - case ADD: - Preconditions.checkState( - !map.containsKey(identifier), - "Trying to add file %s which is already added. " - + "Manifest might be corrupted.", - identifier); - map.put(identifier, entry); - break; - case DELETE: - Preconditions.checkState( - map.containsKey(identifier), - "Trying to delete file %s which is not previously added. " - + "Manifest might be corrupted.", - identifier); - map.remove(identifier); - break; - default: - throw new UnsupportedOperationException( - "Unknown value kind " + entry.kind().name()); - } + List<ManifestEntry> entries; + try { + entries = + new ForkJoinPool() Review comment: If we want to use `parallelStream` we need `ForkJoinPool`. ########## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java ########## @@ -109,34 +176,72 @@ public Long snapshotId() { } private List<ManifestEntry> scan() { - Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>(); - for (ManifestFileMeta manifest : manifests) { - // TODO read each manifest file concurrently - for (ManifestEntry entry : manifestFile.read(manifest.fileName())) { - ManifestEntry.Identifier identifier = entry.identifier(); - switch (entry.kind()) { - case ADD: - Preconditions.checkState( - !map.containsKey(identifier), - "Trying to add file %s which is already added. " - + "Manifest might be corrupted.", - identifier); - map.put(identifier, entry); - break; - case DELETE: - Preconditions.checkState( - map.containsKey(identifier), - "Trying to delete file %s which is not previously added. " - + "Manifest might be corrupted.", - identifier); - map.remove(identifier); - break; - default: - throw new UnsupportedOperationException( - "Unknown value kind " + entry.kind().name()); - } + List<ManifestEntry> entries; + try { + entries = + new ForkJoinPool() Review comment: If we want to use `parallelStream` we need `ForkJoinPool`. If we don't specify one it will use a public pool shared across the whole JVM. As we're doing IO here it is very likely to block other tasks which use the public pool. ########## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java ########## @@ -31,6 +32,8 @@ FileStoreScan withPartitionFilter(Predicate predicate); + FileStoreScan withPartitionFilter(List<BinaryRowData> partitions); Review comment: No need. If I want to filter out partitions I would change the method name. ########## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java ########## @@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData partition, int bucket) } public String getPartitionString(BinaryRowData partition) { - return PartitionPathUtils.generatePartitionPath( - partitionComputer.generatePartValues( - Preconditions.checkNotNull( - partition, "Partition row data is null. This is unexpected."))); + synchronized (partitionComputer) { Review comment: After discussion we decided to mark some methods as thread-safe while others as not-thread-safe. ########## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java ########## @@ -109,34 +176,72 @@ public Long snapshotId() { } private List<ManifestEntry> scan() { - Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>(); - for (ManifestFileMeta manifest : manifests) { - // TODO read each manifest file concurrently - for (ManifestEntry entry : manifestFile.read(manifest.fileName())) { - ManifestEntry.Identifier identifier = entry.identifier(); - switch (entry.kind()) { - case ADD: - Preconditions.checkState( - !map.containsKey(identifier), - "Trying to add file %s which is already added. " - + "Manifest might be corrupted.", - identifier); - map.put(identifier, entry); - break; - case DELETE: - Preconditions.checkState( - map.containsKey(identifier), - "Trying to delete file %s which is not previously added. " - + "Manifest might be corrupted.", - identifier); - map.remove(identifier); - break; - default: - throw new UnsupportedOperationException( - "Unknown value kind " + entry.kind().name()); - } + List<ManifestEntry> entries; + try { + entries = + new ForkJoinPool() + .submit( + () -> + manifests + .parallelStream() + .filter(this::filterManifestFileMeta) + .flatMap(m -> readManifestFileMeta(m).stream()) Review comment: They are the same. Java's stream API is like Flink's job graph. It will not execute immediately but will build a stream graph and will only execute when collected. ########## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java ########## @@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData partition, int bucket) } public String getPartitionString(BinaryRowData partition) { - return PartitionPathUtils.generatePartitionPath( - partitionComputer.generatePartValues( - Preconditions.checkNotNull( - partition, "Partition row data is null. This is unexpected."))); + synchronized (partitionComputer) { Review comment: After discussion we decided to mark some methods as thread-safe while others as not-thread-safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org