This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit b1e7fd1287163c374fec46e2008a3cd911b199e5 Author: LsomeYeah <[email protected]> AuthorDate: Tue Oct 14 16:00:37 2025 +0800 [flink] support performing incremental clustering by flink (#6395) --- .../content/append-table/incremental-clustering.md | 49 ++- .../append/cluster/IncrementalClusterManager.java | 3 +- .../apache/paimon/flink/action/CompactAction.java | 153 +++++++- .../cluster/IncrementalClusterSplitSource.java | 118 ++++++ .../cluster/RemoveClusterBeforeFilesOperator.java | 58 +++ ...writeIncrementalClusterCommittableOperator.java | 112 ++++++ .../action/IncrementalClusterActionITCase.java | 413 +++++++++++++++++++++ .../paimon/spark/procedure/CompactProcedure.java | 2 +- 8 files changed, 896 insertions(+), 12 deletions(-) diff --git a/docs/content/append-table/incremental-clustering.md b/docs/content/append-table/incremental-clustering.md index 72f24ec17e..1cb08cbbdb 100644 --- a/docs/content/append-table/incremental-clustering.md +++ b/docs/content/append-table/incremental-clustering.md @@ -95,11 +95,14 @@ clustering and small-file merging must be performed exclusively via Incremental ## Run Incremental Clustering {{< hint info >}} -Currently, only support running Incremental Clustering in spark, support for flink will be added in the near future. +only support running Incremental Clustering in batch mode. {{< /hint >}} -To run a Incremental Clustering job, follow these instructions. +To run a Incremental Clustering job, follow these instructions. + +You don’t need to specify any clustering-related parameters when running Incremental Clustering, +these options are already defined as table options. If you need to change clustering settings, please update the corresponding table options. {{< tabs "incremental-clustering" >}} @@ -117,8 +120,46 @@ CALL sys.compact(table => 'T') -- run incremental clustering with full mode, this will recluster all data CALL sys.compact(table => 'T', compact_strategy => 'full') ``` -You don’t need to specify any clustering-related parameters when running Incremental Clustering, -these are already defined as table options. If you need to change clustering settings, please update the corresponding table options. +{{< /tab >}} + +{{< tab "Flink Action" >}} + +Run the following command to submit a incremental clustering job for the table. + +```bash +<FLINK_HOME>/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + compact \ + --warehouse <warehouse-path> \ + --database <database-name> \ + --table <table-name> \ + [--compact_strategy <minor / full>] \ + [--table_conf <table_conf>] \ + [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] +``` + +Example: run incremental clustering + +```bash +<FLINK_HOME>/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + compact \ + --warehouse s3:///path/to/warehouse \ + --database test_db \ + --table test_table \ + --table_conf sink.parallelism=2 \ + --compact_strategy minor \ + --catalog_conf s3.endpoint=https://****.com \ + --catalog_conf s3.access-key=***** \ + --catalog_conf s3.secret-key=***** +``` +* `--compact_strategy` Determines how to pick files to be cluster, the default is `minor`. + * `full` : All files will be selected for clustered. + * `minor` : Pick the set of files that need to be clustered based on specified conditions. + +Note: write parallelism is set by `sink.parallelism`, if too big, may generate a large number of small files. + +You can use `-D execution.runtime-mode=batch` or `-yD execution.runtime-mode=batch` (for the ON-YARN scenario) to use batch mode. {{< /tab >}} {{< /tabs >}} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java index 887150577c..407f6b17ef 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java @@ -226,7 +226,8 @@ public class IncrementalClusterManager { return splits; } - public List<DataFileMeta> upgrade(List<DataFileMeta> filesAfterCluster, int outputLevel) { + public static List<DataFileMeta> upgrade( + List<DataFileMeta> filesAfterCluster, int outputLevel) { return filesAfterCluster.stream() .map(file -> file.upgrade(outputLevel)) .collect(Collectors.toList()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index b9bfaac7f8..4d514379ac 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -19,9 +19,13 @@ package org.apache.paimon.flink.action; import org.apache.paimon.CoreOptions; +import org.apache.paimon.append.cluster.IncrementalClusterManager; +import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource; +import org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator; import org.apache.paimon.flink.compact.AppendTableCompactBuilder; import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource; import org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator; @@ -32,7 +36,10 @@ import org.apache.paimon.flink.sink.CompactorSinkBuilder; import org.apache.paimon.flink.sink.FixedBucketSink; import org.apache.paimon.flink.sink.FlinkSinkBuilder; import org.apache.paimon.flink.sink.FlinkStreamPartitioner; +import org.apache.paimon.flink.sink.RowAppendTableSink; import org.apache.paimon.flink.sink.RowDataChannelComputer; +import org.apache.paimon.flink.sorter.TableSortInfo; +import org.apache.paimon.flink.sorter.TableSorter; import org.apache.paimon.flink.source.CompactorSourceBuilder; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; @@ -43,6 +50,7 @@ import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.predicate.PredicateProjectionConverter; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Pair; @@ -67,6 +75,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions; import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; @@ -100,10 +109,6 @@ public class CompactAction extends TableActionBase { checkArgument( !((FileStoreTable) table).coreOptions().dataEvolutionEnabled(), "Compact action does not support data evolution table yet. "); - checkArgument( - !(((FileStoreTable) table).bucketMode() == BucketMode.BUCKET_UNAWARE - && ((FileStoreTable) table).coreOptions().clusteringIncrementalEnabled()), - "The table has enabled incremental clustering, and do not support compact in flink yet."); HashMap<String, String> dynamicOptions = new HashMap<>(tableConf); dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false"); table = table.copy(dynamicOptions); @@ -147,8 +152,12 @@ public class CompactAction extends TableActionBase { if (fileStoreTable.coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) { return buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming); } else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) { - buildForAppendTableCompact(env, fileStoreTable, isStreaming); - return true; + if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) { + return buildForIncrementalClustering(env, fileStoreTable, isStreaming); + } else { + buildForAppendTableCompact(env, fileStoreTable, isStreaming); + return true; + } } else { buildForBucketedTableCompact(env, fileStoreTable, isStreaming); return true; @@ -202,6 +211,138 @@ public class CompactAction extends TableActionBase { builder.build(); } + private boolean buildForIncrementalClustering( + StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) { + checkArgument(!isStreaming, "Incremental clustering currently only supports batch mode"); + checkArgument( + partitions == null, + "Incremental clustering currently does not support specifying partitions"); + checkArgument( + whereSql == null, "Incremental clustering currently does not support predicates"); + + IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table); + + // non-full strategy as default for incremental clustering + if (fullCompaction == null) { + fullCompaction = false; + } + Options options = new Options(table.options()); + int localSampleMagnification = table.coreOptions().getLocalSampleMagnification(); + if (localSampleMagnification < 20) { + throw new IllegalArgumentException( + String.format( + "the config '%s=%d' should not be set too small, greater than or equal to 20 is needed.", + CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(), + localSampleMagnification)); + } + String commitUser = CoreOptions.createCommitUser(options); + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + table.coreOptions().partitionDefaultName(), + table.store().partitionType(), + table.partitionKeys().toArray(new String[0]), + table.coreOptions().legacyPartitionName()); + + // 1. pick cluster files for each partition + Map<BinaryRow, CompactUnit> compactUnits = + incrementalClusterManager.prepareForCluster(fullCompaction); + if (compactUnits.isEmpty()) { + LOGGER.info( + "No partition needs to be incrementally clustered. " + + "Please set '--compact_strategy full' if you need to forcibly trigger the cluster."); + if (this.forceStartFlinkJob) { + env.fromSequence(0, 0) + .name("Nothing to Cluster Source") + .sinkTo(new DiscardingSink<>()); + return true; + } else { + return false; + } + } + Map<BinaryRow, DataSplit[]> partitionSplits = + compactUnits.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + incrementalClusterManager + .toSplits( + entry.getKey(), + entry.getValue().files()) + .toArray(new DataSplit[0]))); + + // 2. read,sort and write in partition + List<DataStream<Committable>> dataStreams = new ArrayList<>(); + + for (Map.Entry<BinaryRow, DataSplit[]> entry : partitionSplits.entrySet()) { + DataSplit[] splits = entry.getValue(); + LinkedHashMap<String, String> partitionSpec = + partitionComputer.generatePartValues(entry.getKey()); + // 2.1 generate source for current partition + Pair<DataStream<RowData>, DataStream<Committable>> sourcePair = + IncrementalClusterSplitSource.buildSource( + env, + table, + partitionSpec, + splits, + options.get(FlinkConnectorOptions.SCAN_PARALLELISM)); + + // 2.2 cluster in partition + Integer sinkParallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM); + if (sinkParallelism == null) { + sinkParallelism = sourcePair.getLeft().getParallelism(); + } + TableSortInfo sortInfo = + new TableSortInfo.Builder() + .setSortColumns(incrementalClusterManager.clusterKeys()) + .setSortStrategy(incrementalClusterManager.clusterCurve()) + .setSinkParallelism(sinkParallelism) + .setLocalSampleSize(sinkParallelism * localSampleMagnification) + .setGlobalSampleSize(sinkParallelism * 1000) + .setRangeNumber(sinkParallelism * 10) + .build(); + DataStream<RowData> sorted = + TableSorter.getSorter(env, sourcePair.getLeft(), table, sortInfo).sort(); + + // 2.3 write and then reorganize the committable + // set parallelism to null, and it'll forward parallelism when doWrite() + RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null); + boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor(); + DataStream<Committable> clusterCommittable = + sink.doWrite( + FlinkSinkBuilder.mapToInternalRow( + sorted, + table.rowType(), + blobAsDescriptor, + table.catalogEnvironment().catalogContext()), + commitUser, + null) + .transform( + "Rewrite cluster committable", + new CommittableTypeInfo(), + new RewriteIncrementalClusterCommittableOperator( + table, + compactUnits.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + unit -> + unit.getValue() + .outputLevel())))); + dataStreams.add(clusterCommittable); + dataStreams.add(sourcePair.getRight()); + } + + // 3. commit + RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null); + DataStream<Committable> dataStream = dataStreams.get(0); + for (int i = 1; i < dataStreams.size(); i++) { + dataStream = dataStream.union(dataStreams.get(i)); + } + sink.doCommit(dataStream, commitUser); + return true; + } + protected PartitionPredicate getPartitionPredicate() throws Exception { checkArgument( partitions == null || whereSql == null, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java new file mode 100644 index 0000000000..eae1d3b8e9 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.cluster; + +import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.sink.CommittableTypeInfo; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSource; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; +import org.apache.paimon.flink.source.SimpleSourceSplit; +import org.apache.paimon.flink.source.operator.ReadOperator; +import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.utils.Pair; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; + +import javax.annotation.Nullable; + +import java.util.Map; + +/** Source for Incremental Clustering. */ +public class IncrementalClusterSplitSource extends AbstractNonCoordinatedSource<Split> { + private static final long serialVersionUID = 1L; + + private final Split[] splits; + + public IncrementalClusterSplitSource(Split[] splits) { + this.splits = splits; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SourceReader<Split, SimpleSourceSplit> createReader(SourceReaderContext readerContext) + throws Exception { + return new IncrementalClusterSplitSource.Reader(); + } + + private class Reader extends AbstractNonCoordinatedSourceReader<Split> { + + @Override + public InputStatus pollNext(ReaderOutput<Split> output) throws Exception { + for (Split split : splits) { + DataSplit dataSplit = (DataSplit) split; + output.collect(dataSplit); + } + return InputStatus.END_OF_INPUT; + } + } + + public static Pair<DataStream<RowData>, DataStream<Committable>> buildSource( + StreamExecutionEnvironment env, + FileStoreTable table, + Map<String, String> partitionSpec, + DataSplit[] splits, + @Nullable Integer parallelism) { + DataStreamSource<Split> source = + env.fromSource( + new IncrementalClusterSplitSource(splits), + WatermarkStrategy.noWatermarks(), + String.format( + "Incremental-cluster split generator: %s - %s", + table.fullName(), partitionSpec), + new JavaTypeInfo<>(Split.class)); + + if (parallelism != null) { + source.setParallelism(parallelism); + } + + return Pair.of( + new DataStream<>(source.getExecutionEnvironment(), source.getTransformation()) + .transform( + String.format( + "Incremental-cluster reader: %s - %s", + table.fullName(), partitionSpec), + InternalTypeInfo.of( + LogicalTypeConversion.toLogicalType(table.rowType())), + new ReadOperator(table::newRead, null, null)), + source.forward() + .transform( + "Remove files to be clustered", + new CommittableTypeInfo(), + new RemoveClusterBeforeFilesOperator()) + .forceNonParallel()); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java new file mode 100644 index 0000000000..907a69ba12 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.cluster; + +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.utils.BoundedOneInputOperator; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.Split; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.Collections; + +/** Operator used with {@link IncrementalClusterSplitSource}, to remove files to be clustered. */ +public class RemoveClusterBeforeFilesOperator extends BoundedOneInputOperator<Split, Committable> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement(StreamRecord<Split> element) throws Exception { + DataSplit dataSplit = (DataSplit) element.getValue(); + CommitMessageImpl message = + new CommitMessageImpl( + dataSplit.partition(), + dataSplit.bucket(), + dataSplit.totalBuckets(), + DataIncrement.emptyIncrement(), + new CompactIncrement( + dataSplit.dataFiles(), + Collections.emptyList(), + Collections.emptyList())); + output.collect( + new StreamRecord<>( + new Committable(Long.MAX_VALUE, Committable.Kind.FILE, message))); + } + + @Override + public void endInput() throws Exception {} +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java new file mode 100644 index 0000000000..6b0ef47f3f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.cluster; + +import org.apache.paimon.append.cluster.IncrementalClusterManager; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.utils.BoundedOneInputOperator; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessageImpl; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Rewrite committable for new files written after clustered. */ +public class RewriteIncrementalClusterCommittableOperator + extends BoundedOneInputOperator<Committable, Committable> { + private static final long serialVersionUID = 1L; + + private final FileStoreTable table; + private final Map<BinaryRow, Integer> outputLevels; + + private transient Map<BinaryRow, List<DataFileMeta>> partitionFiles; + + public RewriteIncrementalClusterCommittableOperator( + FileStoreTable table, Map<BinaryRow, Integer> outputLevels) { + this.table = table; + this.outputLevels = outputLevels; + } + + @Override + public void open() throws Exception { + partitionFiles = new HashMap<>(); + } + + @Override + public void processElement(StreamRecord<Committable> element) throws Exception { + Committable committable = element.getValue(); + if (committable.kind() != Committable.Kind.FILE) { + output.collect(element); + } + + CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); + checkArgument(message.bucket() == 0); + BinaryRow partition = message.partition(); + partitionFiles + .computeIfAbsent(partition, file -> new ArrayList<>()) + .addAll(message.newFilesIncrement().newFiles()); + } + + @Override + public void endInput() throws Exception { + emitAll(Long.MAX_VALUE); + } + + protected void emitAll(long checkpointId) { + for (Map.Entry<BinaryRow, List<DataFileMeta>> partitionEntry : partitionFiles.entrySet()) { + BinaryRow partition = partitionEntry.getKey(); + // upgrade the clustered file to outputLevel + List<DataFileMeta> clusterAfter = + IncrementalClusterManager.upgrade( + partitionEntry.getValue(), outputLevels.get(partition)); + LOG.info( + "Partition {}: upgrade file level to {}", + partition, + outputLevels.get(partition)); + CompactIncrement compactIncrement = + new CompactIncrement( + Collections.emptyList(), clusterAfter, Collections.emptyList()); + CommitMessageImpl clusterMessage = + new CommitMessageImpl( + partition, + // bucket 0 is bucket for unaware-bucket table + // for compatibility with the old design + 0, + table.coreOptions().bucket(), + DataIncrement.emptyIncrement(), + compactIncrement); + output.collect( + new StreamRecord<>( + new Committable(checkpointId, Committable.Kind.FILE, clusterMessage))); + } + + partitionFiles.clear(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java new file mode 100644 index 0000000000..29f02adfd5 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.StringUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +/** IT cases for incremental clustering action. */ +public class IncrementalClusterActionITCase extends ActionITCaseBase { + + @Test + public void testClusterUnpartitionedTable() throws Exception { + FileStoreTable table = createTable(null, 1); + + BinaryString randomStr = BinaryString.fromString(randomString(150)); + List<CommitMessage> messages = new ArrayList<>(); + + // first write + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + messages.addAll(write(GenericRow.of(i, j, randomStr, 0))); + } + } + commit(messages); + ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1}); + List<String> result1 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List<String> expected1 = + Lists.newArrayList( + "+I[0, 0]", + "+I[0, 1]", + "+I[0, 2]", + "+I[1, 0]", + "+I[1, 1]", + "+I[1, 2]", + "+I[2, 0]", + "+I[2, 1]", + "+I[2, 2]"); + assertThat(result1).containsExactlyElementsOf(expected1); + + // first cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + List<Split> splits = readBuilder.newScan().plan().splits(); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + List<String> result2 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List<String> expected2 = + Lists.newArrayList( + "+I[0, 0]", + "+I[0, 1]", + "+I[1, 0]", + "+I[1, 1]", + "+I[0, 2]", + "+I[1, 2]", + "+I[2, 0]", + "+I[2, 1]", + "+I[2, 2]"); + assertThat(result2).containsExactlyElementsOf(expected2); + + // second write + messages.clear(); + messages.addAll( + write( + GenericRow.of(0, 3, null, 0), + GenericRow.of(1, 3, null, 0), + GenericRow.of(2, 3, null, 0))); + messages.addAll( + write( + GenericRow.of(3, 0, null, 0), + GenericRow.of(3, 1, null, 0), + GenericRow.of(3, 2, null, 0), + GenericRow.of(3, 3, null, 0))); + commit(messages); + + List<String> result3 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List<String> expected3 = new ArrayList<>(expected2); + expected3.addAll( + Lists.newArrayList( + "+I[0, 3]", + "+I[1, 3]", + "+I[2, 3]", + "+I[3, 0]", + "+I[3, 1]", + "+I[3, 2]", + "+I[3, 3]")); + assertThat(result3).containsExactlyElementsOf(expected3); + + // second cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List<String> result4 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List<String> expected4 = new ArrayList<>(expected2); + expected4.addAll( + Lists.newArrayList( + "+I[0, 3]", + "+I[1, 3]", + "+I[3, 0]", + "+I[3, 1]", + "+I[2, 3]", + "+I[3, 2]", + "+I[3, 3]")); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4); + assertThat(result4).containsExactlyElementsOf(expected4); + + // full cluster + runAction(Lists.newArrayList("--compact_strategy", "full")); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List<String> result5 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List<String> expected5 = + new ArrayList<>( + Lists.newArrayList( + "+I[0, 0]", + "+I[0, 1]", + "+I[1, 0]", + "+I[1, 1]", + "+I[0, 2]", + "+I[0, 3]", + "+I[1, 2]", + "+I[1, 3]", + "+I[2, 0]", + "+I[2, 1]", + "+I[3, 0]", + "+I[3, 1]", + "+I[2, 2]", + "+I[2, 3]", + "+I[3, 2]", + "+I[3, 3]")); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(result5).containsExactlyElementsOf(expected5); + } + + @Test + public void testClusterPartitionedTable() throws Exception { + FileStoreTable table = createTable("pt", 1); + + BinaryString randomStr = BinaryString.fromString(randomString(150)); + List<CommitMessage> messages = new ArrayList<>(); + + // first write + List<String> expected1 = new ArrayList<>(); + for (int pt = 0; pt < 2; pt++) { + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + messages.addAll(write(GenericRow.of(i, j, (pt == 0) ? randomStr : null, pt))); + expected1.add(String.format("+I[%s, %s, %s]", i, j, pt)); + } + } + } + commit(messages); + ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1, 3}); + List<String> result1 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + assertThat(result1).containsExactlyElementsOf(expected1); + + // first cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + List<Split> splits = readBuilder.newScan().plan().splits(); + assertThat(splits.size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + List<String> result2 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List<String> expected2 = new ArrayList<>(); + for (int pt = 0; pt < 2; pt++) { + expected2.add(String.format("+I[0, 0, %s]", pt)); + expected2.add(String.format("+I[0, 1, %s]", pt)); + expected2.add(String.format("+I[1, 0, %s]", pt)); + expected2.add(String.format("+I[1, 1, %s]", pt)); + expected2.add(String.format("+I[0, 2, %s]", pt)); + expected2.add(String.format("+I[1, 2, %s]", pt)); + expected2.add(String.format("+I[2, 0, %s]", pt)); + expected2.add(String.format("+I[2, 1, %s]", pt)); + expected2.add(String.format("+I[2, 2, %s]", pt)); + } + assertThat(result2).containsExactlyElementsOf(expected2); + + // second write + messages.clear(); + for (int pt = 0; pt < 2; pt++) { + messages.addAll( + write( + GenericRow.of(0, 3, null, pt), + GenericRow.of(1, 3, null, pt), + GenericRow.of(2, 3, null, pt))); + messages.addAll( + write( + GenericRow.of(3, 0, null, pt), + GenericRow.of(3, 1, null, pt), + GenericRow.of(3, 2, null, pt), + GenericRow.of(3, 3, null, pt))); + } + commit(messages); + + List<String> result3 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List<String> expected3 = new ArrayList<>(); + for (int pt = 0; pt < 2; pt++) { + expected3.addAll(expected2.subList(9 * pt, 9 * pt + 9)); + expected3.add(String.format("+I[0, 3, %s]", pt)); + expected3.add(String.format("+I[1, 3, %s]", pt)); + expected3.add(String.format("+I[2, 3, %s]", pt)); + expected3.add(String.format("+I[3, 0, %s]", pt)); + expected3.add(String.format("+I[3, 1, %s]", pt)); + expected3.add(String.format("+I[3, 2, %s]", pt)); + expected3.add(String.format("+I[3, 3, %s]", pt)); + } + assertThat(result3).containsExactlyElementsOf(expected3); + + // second cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List<String> result4 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List<String> expected4 = new ArrayList<>(); + // for partition-0: only file in level-0 will be picked for clustering, outputLevel is 4 + expected4.add("+I[0, 0, 0]"); + expected4.add("+I[0, 1, 0]"); + expected4.add("+I[1, 0, 0]"); + expected4.add("+I[1, 1, 0]"); + expected4.add("+I[0, 2, 0]"); + expected4.add("+I[1, 2, 0]"); + expected4.add("+I[2, 0, 0]"); + expected4.add("+I[2, 1, 0]"); + expected4.add("+I[2, 2, 0]"); + expected4.add("+I[0, 3, 0]"); + expected4.add("+I[1, 3, 0]"); + expected4.add("+I[3, 0, 0]"); + expected4.add("+I[3, 1, 0]"); + expected4.add("+I[2, 3, 0]"); + expected4.add("+I[3, 2, 0]"); + expected4.add("+I[3, 3, 0]"); + // for partition-1:all files will be picked for clustering, outputLevel is 5 + expected4.add("+I[0, 0, 1]"); + expected4.add("+I[0, 1, 1]"); + expected4.add("+I[1, 0, 1]"); + expected4.add("+I[1, 1, 1]"); + expected4.add("+I[0, 2, 1]"); + expected4.add("+I[0, 3, 1]"); + expected4.add("+I[1, 2, 1]"); + expected4.add("+I[1, 3, 1]"); + expected4.add("+I[2, 0, 1]"); + expected4.add("+I[2, 1, 1]"); + expected4.add("+I[3, 0, 1]"); + expected4.add("+I[3, 1, 1]"); + expected4.add("+I[2, 2, 1]"); + expected4.add("+I[2, 3, 1]"); + expected4.add("+I[3, 2, 1]"); + expected4.add("+I[3, 3, 1]"); + assertThat(splits.size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(1)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4); + assertThat(((DataSplit) splits.get(1)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(result4).containsExactlyElementsOf(expected4); + } + + @Test + public void testClusterOnEmptyData() throws Exception { + createTable("pt", 1); + assertThatCode(() -> runAction(Collections.emptyList())).doesNotThrowAnyException(); + } + + protected FileStoreTable createTable(String partitionKeys, int sinkParallelism) + throws Exception { + catalog.createDatabase(database, true); + catalog.createTable(identifier(), schema(partitionKeys, sinkParallelism), true); + return (FileStoreTable) catalog.getTable(identifier()); + } + + private FileStoreTable getTable() throws Exception { + return (FileStoreTable) catalog.getTable(identifier()); + } + + private Identifier identifier() { + return Identifier.create(database, tableName); + } + + private List<CommitMessage> write(GenericRow... data) throws Exception { + BatchWriteBuilder builder = getTable().newBatchWriteBuilder(); + try (BatchTableWrite batchTableWrite = builder.newWrite()) { + for (GenericRow row : data) { + batchTableWrite.write(row); + } + return batchTableWrite.prepareCommit(); + } + } + + private void commit(List<CommitMessage> messages) throws Exception { + BatchTableCommit commit = getTable().newBatchWriteBuilder().newCommit(); + commit.commit(messages); + commit.close(); + } + + private static Schema schema(String partitionKeys, int sinkParallelism) { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("a", DataTypes.INT()); + schemaBuilder.column("b", DataTypes.INT()); + schemaBuilder.column("c", DataTypes.STRING()); + schemaBuilder.column("pt", DataTypes.INT()); + schemaBuilder.option("bucket", "-1"); + schemaBuilder.option("num-levels", "6"); + schemaBuilder.option("num-sorted-run.compaction-trigger", "2"); + schemaBuilder.option("scan.plan-sort-partition", "true"); + schemaBuilder.option("clustering.columns", "a,b"); + schemaBuilder.option("clustering.strategy", "zorder"); + schemaBuilder.option("clustering.incremental", "true"); + schemaBuilder.option("scan.parallelism", "1"); + schemaBuilder.option("sink.parallelism", String.valueOf(sinkParallelism)); + if (!StringUtils.isNullOrWhitespaceOnly(partitionKeys)) { + schemaBuilder.partitionKeys(partitionKeys); + } + return schemaBuilder.build(); + } + + private static String randomString(int length) { + String chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + StringBuilder sb = new StringBuilder(length); + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = 0; i < length; i++) { + sb.append(chars.charAt(random.nextInt(chars.length()))); + } + return sb.toString(); + } + + private void checkSnapshot(FileStoreTable table) { + assertThat(table.latestSnapshot().get().commitKind()) + .isEqualTo(Snapshot.CommitKind.COMPACT); + } + + private void runAction(List<String> extra) throws Exception { + StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build(); + ArrayList<String> baseArgs = + Lists.newArrayList("compact", "--database", database, "--table", tableName); + ThreadLocalRandom random = ThreadLocalRandom.current(); + if (random.nextBoolean()) { + baseArgs.addAll(Lists.newArrayList("--warehouse", warehouse)); + } else { + baseArgs.addAll(Lists.newArrayList("--catalog_conf", "warehouse=" + warehouse)); + } + baseArgs.addAll(extra); + + CompactAction action = createAction(CompactAction.class, baseArgs.toArray(new String[0])); + // action.withStreamExecutionEnvironment(env).build(); + // env.execute(); + action.withStreamExecutionEnvironment(env); + action.run(); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 7788a5a0be..7cd20be759 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -617,7 +617,7 @@ public class CompactProcedure extends BaseProcedure { List<DataFileMeta> clusterBefore = compactUnits.get(partition).files(); // upgrade the clustered file to outputLevel List<DataFileMeta> clusterAfter = - incrementalClusterManager.upgrade( + IncrementalClusterManager.upgrade( entry.getValue(), compactUnits.get(partition).outputLevel()); LOG.info( "Partition {}: upgrade file level to {}",
