This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c907544c4 [flink] Refactor compactorSink to support extended compact
type. (#4569)
c907544c4 is described below
commit c907544c4218e1ce28259915a9a6fbd18f0fb5a4
Author: HunterXHunter <[email protected]>
AuthorDate: Fri Nov 22 14:53:57 2024 +0800
[flink] Refactor compactorSink to support extended compact type. (#4569)
---
.../java/org/apache/paimon/flink/action/CompactAction.java | 3 ++-
.../org/apache/paimon/flink/action/CompactDatabaseAction.java | 3 ++-
.../main/java/org/apache/paimon/flink/sink/CompactorSink.java | 7 +++++--
.../org/apache/paimon/flink/sink/CompactorSinkBuilder.java | 9 ++++++++-
.../org/apache/paimon/flink/sink/StoreCompactOperator.java | 10 +++++-----
.../java/org/apache/paimon/flink/sink/CompactorSinkITCase.java | 6 ++++--
.../org/apache/paimon/flink/sink/StoreCompactOperatorTest.java | 3 ++-
7 files changed, 28 insertions(+), 13 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 8ea120015..ce88857f1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -138,7 +138,8 @@ public class CompactAction extends TableActionBase {
}
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(identifier.getFullName(), table);
- CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
+ CompactorSinkBuilder sinkBuilder =
+ new
CompactorSinkBuilder(table).withFullCompaction(!isStreaming);
sourceBuilder.withPartitionPredicate(getPredicate());
DataStreamSource<RowData> source =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
index fda9ff695..471c6fdd4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -259,7 +259,8 @@ public class CompactDatabaseAction extends ActionBase {
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(fullName, table)
.withPartitionIdleTime(partitionIdleTime);
- CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
+ CompactorSinkBuilder sinkBuilder =
+ new
CompactorSinkBuilder(table).withFullCompaction(!isStreaming);
DataStreamSource<RowData> source =
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index 7dc3ab115..a0c830d73 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -29,14 +29,17 @@ public class CompactorSink extends FlinkSink<RowData> {
private static final long serialVersionUID = 1L;
- public CompactorSink(FileStoreTable table) {
+ private final boolean fullCompaction;
+
+ public CompactorSink(FileStoreTable table, boolean fullCompaction) {
super(table, false);
+ this.fullCompaction = fullCompaction;
}
@Override
protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
- return new StoreCompactOperator(table, writeProvider, commitUser);
+ return new StoreCompactOperator(table, writeProvider, commitUser,
fullCompaction);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
index 926155cab..2173b1d34 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
@@ -37,6 +37,8 @@ public class CompactorSinkBuilder {
private DataStream<RowData> input;
+ private boolean fullCompaction;
+
public CompactorSinkBuilder(FileStoreTable table) {
this.table = table;
}
@@ -46,6 +48,11 @@ public class CompactorSinkBuilder {
return this;
}
+ public CompactorSinkBuilder withFullCompaction(boolean fullCompaction) {
+ this.fullCompaction = fullCompaction;
+ return this;
+ }
+
public DataStreamSink<?> build() {
BucketMode bucketMode = table.bucketMode();
switch (bucketMode) {
@@ -66,6 +73,6 @@ public class CompactorSinkBuilder {
.orElse(null);
DataStream<RowData> partitioned =
partition(input, new BucketsRowChannelComputer(), parallelism);
- return new CompactorSink(table).sinkFrom(partitioned);
+ return new CompactorSink(table, fullCompaction).sinkFrom(partitioned);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index bc7bb350d..9b152a81c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -52,6 +52,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
private final FileStoreTable table;
private final StoreSinkWrite.Provider storeSinkWriteProvider;
private final String initialCommitUser;
+ private final boolean fullCompaction;
private transient StoreSinkWriteState state;
private transient StoreSinkWrite write;
@@ -61,7 +62,8 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
public StoreCompactOperator(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
- String initialCommitUser) {
+ String initialCommitUser,
+ boolean fullCompaction) {
super(Options.fromMap(table.options()));
Preconditions.checkArgument(
!table.coreOptions().writeOnly(),
@@ -69,6 +71,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
this.table = table;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.initialCommitUser = initialCommitUser;
+ this.fullCompaction = fullCompaction;
}
@Override
@@ -136,10 +139,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
try {
for (Pair<BinaryRow, Integer> partitionBucket : waitToCompact) {
- write.compact(
- partitionBucket.getKey(),
- partitionBucket.getRight(),
- !write.streamingMode());
+ write.compact(partitionBucket.getKey(),
partitionBucket.getRight(), fullCompaction);
}
} catch (Exception e) {
throw new RuntimeException("Exception happens while executing
compaction.", e);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index a5f260fb2..c38ac4b3d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -132,7 +132,7 @@ public class CompactorSinkITCase extends AbstractTestBase {
.withContinuousMode(false)
.withPartitionPredicate(predicate)
.build();
- new CompactorSinkBuilder(table).withInput(source).build();
+ new
CompactorSinkBuilder(table).withFullCompaction(true).withInput(source).build();
env.execute();
snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
@@ -182,6 +182,7 @@ public class CompactorSinkITCase extends AbstractTestBase {
String.valueOf(sinkParalellism));
}
}))
+ .withFullCompaction(false)
.withInput(source)
.build();
@@ -267,7 +268,8 @@ public class CompactorSinkITCase extends AbstractTestBase {
false,
memoryPool,
metricGroup),
- "test");
+ "test",
+ true);
}
protected MultiTablesStoreCompactOperator createMultiTablesCompactOperator(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index 3f2daedff..f8387e1fc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -53,7 +53,8 @@ public class StoreCompactOperatorTest extends TableTestBase {
getTableDefault(),
(table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
compactRememberStoreWrite,
- "10086");
+ "10086",
+ !streamingMode);
TypeSerializer<Committable> serializer =
new CommittableTypeInfo().createSerializer(new
ExecutionConfig());