tsreaper commented on a change in pull request #14:
URL: https://github.com/apache/flink-table-store/pull/14#discussion_r791535456



##########
File path: 
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
##########
@@ -71,7 +67,7 @@ public void beforeEach() throws IOException {
         root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
     }
 
-    protected abstract String getSchema();
+    protected abstract String getScheme();

Review comment:
       Scheme. This is the file system scheme, not table schema.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData 
partition, int bucket)
     }
 
     public String getPartitionString(BinaryRowData partition) {
-        return PartitionPathUtils.generatePartitionPath(
-                partitionComputer.generatePartValues(
-                        Preconditions.checkNotNull(
-                                partition, "Partition row data is null. This 
is unexpected.")));
+        synchronized (partitionComputer) {

Review comment:
       Because we have to make sure that this class is thread safe. Partition 
computer is not thread safe.
   
   Another option is to create a read-only path factory which only supports a 
limited number of methods.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new 
LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) 
{
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. 
"
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not 
previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       If we want to use `parallelStream` we need `ForkJoinPool`.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new 
LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) 
{
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. 
"
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not 
previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()

Review comment:
       If we want to use `parallelStream` we need `ForkJoinPool`. If we don't 
specify one it will use a public pool shared across the whole JVM. As we're 
doing IO here it is very likely to block other tasks which use the public pool.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
##########
@@ -31,6 +32,8 @@
 
     FileStoreScan withPartitionFilter(Predicate predicate);
 
+    FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);

Review comment:
       No need. If I want to filter out partitions I would change the method 
name.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData 
partition, int bucket)
     }
 
     public String getPartitionString(BinaryRowData partition) {
-        return PartitionPathUtils.generatePartitionPath(
-                partitionComputer.generatePartValues(
-                        Preconditions.checkNotNull(
-                                partition, "Partition row data is null. This 
is unexpected.")));
+        synchronized (partitionComputer) {

Review comment:
       After discussion we decided to mark some methods as thread-safe while 
others as not-thread-safe.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -109,34 +176,72 @@ public Long snapshotId() {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new 
LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) 
{
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. 
"
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not 
previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    new ForkJoinPool()
+                            .submit(
+                                    () ->
+                                            manifests
+                                                    .parallelStream()
+                                                    
.filter(this::filterManifestFileMeta)
+                                                    .flatMap(m -> 
readManifestFileMeta(m).stream())

Review comment:
       They are the same. Java's stream API is like Flink's job graph. It will 
not execute immediately but will build a stream graph and will only execute 
when collected.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -106,10 +115,12 @@ public SstPathFactory createSstPathFactory(BinaryRowData 
partition, int bucket)
     }
 
     public String getPartitionString(BinaryRowData partition) {
-        return PartitionPathUtils.generatePartitionPath(
-                partitionComputer.generatePartValues(
-                        Preconditions.checkNotNull(
-                                partition, "Partition row data is null. This 
is unexpected.")));
+        synchronized (partitionComputer) {

Review comment:
       After discussion we decided to mark some methods as thread-safe while 
others as not-thread-safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to