JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r882251162


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java:
##########
@@ -155,7 +155,7 @@ static Map<String, String> 
filterLogStoreOptions(Map<String, String> options) {
                                 Map.Entry::getValue));
     }
 
-    static TableStore buildTableStore(DynamicTableFactory.Context context) {

Review Comment:
   Why remove static?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +200,70 @@ public void onDropTable(Context context, boolean 
ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        
catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", 
catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = 
plan.groupByPartFiles();
+        if 
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new 
FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new 
KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        newOptions.put(
+                COMPACTION_SCANNED_MANIFEST.key(),
+                JsonSerdeUtil.toJson(new 
PartitionedManifestMeta(plan.snapshotId(), groupBy)));

Review Comment:
   Another choice is just `base64(InstantiationUtil.serializeObject(new 
PartitionedManifestMeta))`.
   This could be simpler. ManifestEntry is already unreadable anyway, so make 
it all unreadable?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +200,70 @@ public void onDropTable(Context context, boolean 
ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        
catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", 
catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = 
plan.groupByPartFiles();
+        if 
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new 
FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new 
KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        newOptions.put(
+                COMPACTION_SCANNED_MANIFEST.key(),
+                JsonSerdeUtil.toJson(new 
PartitionedManifestMeta(plan.snapshotId(), groupBy)));
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+            MergeTreeOptions options,
+            Comparator<RowData> keyComparator) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new 
HashMap<>();
+        UniversalCompaction compaction =
+                new UniversalCompaction(
+                        options.maxSizeAmplificationPercent,
+                        options.sizeRatio,
+                        options.numSortedRunCompactionTrigger);
+
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
partEntry :
+                groupBy.entrySet()) {
+            Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                    partEntry.getValue().entrySet()) {
+                Levels levels =

Review Comment:
   I'm trying to figure out how to select the files that need to be compacted.
   A simple option would be:
   1. the file is too small, smaller than the target file size
   2. there is overlap between the files (we can use `IntervalPartition`)
   
   bucket of pick files is larger than 1.
   
   It looks like we dont need to use `Levels` and `UniversalCompaction`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -55,14 +56,14 @@ public FileStoreReadImpl(
             WriteMode writeMode,
             RowType keyType,
             RowType valueType,
-            Comparator<RowData> keyComparator,
+            Supplier<Comparator<RowData>> keyComparatorSupplier,

Review Comment:
   Why we need to use `Supplier<Comparator<RowData>>`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to