JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r882251162
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java:
##########
@@ -155,7 +155,7 @@ static Map<String, String>
filterLogStoreOptions(Map<String, String> options) {
Map.Entry::getValue));
}
- static TableStore buildTableStore(DynamicTableFactory.Context context) {
Review Comment:
Why remove static?
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +200,70 @@ public void onDropTable(Context context, boolean
ignoreIfNotExists) {
@Override
public Map<String, String> onCompactTable(
Context context, CatalogPartitionSpec catalogPartitionSpec) {
- throw new UnsupportedOperationException("Not implement yet");
+ Map<String, String> newOptions = new
HashMap<>(context.getCatalogTable().getOptions());
+ FileStore fileStore = buildTableStore(context).buildFileStore();
+ FileStoreScan.Plan plan =
+ fileStore
+ .newScan()
+ .withPartitionFilter(
+ PredicateConverter.CONVERTER.fromMap(
+
catalogPartitionSpec.getPartitionSpec(),
+ fileStore.partitionType()))
+ .plan();
+
+ Preconditions.checkState(
+ plan.snapshotId() != null && !plan.files().isEmpty(),
+ "The specified %s to compact does not exist any snapshot",
+ catalogPartitionSpec.getPartitionSpec().isEmpty()
+ ? "table"
+ : String.format("partition %s",
catalogPartitionSpec.getPartitionSpec()));
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy =
plan.groupByPartFiles();
+ if
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+ groupBy =
+ pickManifest(
+ groupBy,
+ new
FileStoreOptions(Configuration.fromMap(newOptions))
+ .mergeTreeOptions(),
+ new
KeyComparatorSupplier(fileStore.partitionType()).get());
+ }
+ newOptions.put(
+ COMPACTION_SCANNED_MANIFEST.key(),
+ JsonSerdeUtil.toJson(new
PartitionedManifestMeta(plan.snapshotId(), groupBy)));
Review Comment:
Another choice is just `base64(InstantiationUtil.serializeObject(new
PartitionedManifestMeta))`.
This could be simpler. ManifestEntry is already unreadable anyway, so make
it all unreadable?
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +200,70 @@ public void onDropTable(Context context, boolean
ignoreIfNotExists) {
@Override
public Map<String, String> onCompactTable(
Context context, CatalogPartitionSpec catalogPartitionSpec) {
- throw new UnsupportedOperationException("Not implement yet");
+ Map<String, String> newOptions = new
HashMap<>(context.getCatalogTable().getOptions());
+ FileStore fileStore = buildTableStore(context).buildFileStore();
+ FileStoreScan.Plan plan =
+ fileStore
+ .newScan()
+ .withPartitionFilter(
+ PredicateConverter.CONVERTER.fromMap(
+
catalogPartitionSpec.getPartitionSpec(),
+ fileStore.partitionType()))
+ .plan();
+
+ Preconditions.checkState(
+ plan.snapshotId() != null && !plan.files().isEmpty(),
+ "The specified %s to compact does not exist any snapshot",
+ catalogPartitionSpec.getPartitionSpec().isEmpty()
+ ? "table"
+ : String.format("partition %s",
catalogPartitionSpec.getPartitionSpec()));
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy =
plan.groupByPartFiles();
+ if
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+ groupBy =
+ pickManifest(
+ groupBy,
+ new
FileStoreOptions(Configuration.fromMap(newOptions))
+ .mergeTreeOptions(),
+ new
KeyComparatorSupplier(fileStore.partitionType()).get());
+ }
+ newOptions.put(
+ COMPACTION_SCANNED_MANIFEST.key(),
+ JsonSerdeUtil.toJson(new
PartitionedManifestMeta(plan.snapshotId(), groupBy)));
+ return newOptions;
+ }
+
+ @VisibleForTesting
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+ MergeTreeOptions options,
+ Comparator<RowData> keyComparator) {
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new
HashMap<>();
+ UniversalCompaction compaction =
+ new UniversalCompaction(
+ options.maxSizeAmplificationPercent,
+ options.sizeRatio,
+ options.numSortedRunCompactionTrigger);
+
+ for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>>
partEntry :
+ groupBy.entrySet()) {
+ Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
+ for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+ partEntry.getValue().entrySet()) {
+ Levels levels =
Review Comment:
I'm trying to figure out how to select the files that need to be compacted.
A simple option would be:
1. the file is too small, smaller than the target file size
2. there is overlap between the files (we can use `IntervalPartition`)
bucket of pick files is larger than 1.
It looks like we dont need to use `Levels` and `UniversalCompaction`.
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -55,14 +56,14 @@ public FileStoreReadImpl(
WriteMode writeMode,
RowType keyType,
RowType valueType,
- Comparator<RowData> keyComparator,
+ Supplier<Comparator<RowData>> keyComparatorSupplier,
Review Comment:
Why we need to use `Supplier<Comparator<RowData>>`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]