This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 380f367236 [flink] Move FlinkSink#createWriteProvider to a common
place so CDC sink can also use it (#5824)
380f367236 is described below
commit 380f367236affbe544445752c97245b9fb4d861f
Author: tsreaper <[email protected]>
AuthorDate: Tue Jul 1 22:09:34 2025 +0800
[flink] Move FlinkSink#createWriteProvider to a common place so CDC sink
can also use it (#5824)
---
.../org/apache/paimon/flink/sink/FlinkSink.java | 94 +--------------------
.../sink/MultiTablesStoreCompactOperator.java | 76 +----------------
.../apache/paimon/flink/sink/StoreSinkWrite.java | 98 ++++++++++++++++++++++
3 files changed, 103 insertions(+), 165 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 6349dc3972..6ce295b31d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -18,8 +18,6 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.TagCreationMode;
import
org.apache.paimon.flink.compact.changelog.ChangelogCompactCoordinateOperator;
import org.apache.paimon.flink.compact.changelog.ChangelogCompactSortOperator;
@@ -29,8 +27,6 @@ import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.SerializableRunnable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.operators.SlotSharingGroup;
@@ -46,7 +42,6 @@ import
org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import javax.annotation.Nullable;
@@ -56,9 +51,7 @@ import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
-import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.createCommitUser;
-import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
import static org.apache.paimon.flink.FlinkConnectorOptions.PRECOMMIT_COMPACT;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
@@ -92,89 +85,6 @@ public abstract class FlinkSink<T> implements Serializable {
this.ignorePreviousFiles = ignorePreviousFiles;
}
- private StoreSinkWrite.Provider createWriteProvider(
- CheckpointConfig checkpointConfig, boolean isStreaming, boolean
hasSinkMaterializer) {
- SerializableRunnable assertNoSinkMaterializer =
- () ->
- Preconditions.checkArgument(
- !hasSinkMaterializer,
- String.format(
- "Sink materializer must not be used
with Paimon sink. "
- + "Please set '%s' to '%s' in
Flink's config.",
-
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE
- .key(),
-
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));
-
- Options options = table.coreOptions().toConfiguration();
- ChangelogProducer changelogProducer =
table.coreOptions().changelogProducer();
- boolean waitCompaction;
- CoreOptions coreOptions = table.coreOptions();
- if (coreOptions.writeOnly()) {
- waitCompaction = false;
- } else {
- waitCompaction = coreOptions.prepareCommitWaitCompaction();
- int deltaCommits = -1;
- if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
- deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
- } else if
(options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
- long fullCompactionThresholdMs =
-
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL).toMillis();
- deltaCommits =
- (int)
- (fullCompactionThresholdMs
- /
checkpointConfig.getCheckpointInterval());
- }
-
- if (changelogProducer == ChangelogProducer.FULL_COMPACTION ||
deltaCommits >= 0) {
- int finalDeltaCommits = Math.max(deltaCommits, 1);
- return (table, commitUser, state, ioManager, memoryPool,
metricGroup) -> {
- assertNoSinkMaterializer.run();
- return new GlobalFullCompactionSinkWrite(
- table,
- commitUser,
- state,
- ioManager,
- ignorePreviousFiles,
- waitCompaction,
- finalDeltaCommits,
- isStreaming,
- memoryPool,
- metricGroup);
- };
- }
-
- if (coreOptions.needLookup()) {
- return (table, commitUser, state, ioManager, memoryPool,
metricGroup) -> {
- assertNoSinkMaterializer.run();
- return new LookupSinkWrite(
- table,
- commitUser,
- state,
- ioManager,
- ignorePreviousFiles,
- waitCompaction,
- isStreaming,
- memoryPool,
- metricGroup);
- };
- }
- }
-
- return (table, commitUser, state, ioManager, memoryPool, metricGroup)
-> {
- assertNoSinkMaterializer.run();
- return new StoreSinkWriteImpl(
- table,
- commitUser,
- state,
- ioManager,
- ignorePreviousFiles,
- waitCompaction,
- isStreaming,
- memoryPool,
- metricGroup);
- };
- }
-
public DataStreamSink<?> sinkFrom(DataStream<T> input) {
// This commitUser is valid only for new jobs.
// After the job starts, this commitUser will be recorded into the
states of write and
@@ -223,9 +133,11 @@ public abstract class FlinkSink<T> implements Serializable
{
(writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME) + "
: " + table.name(),
new CommittableTypeInfo(),
createWriteOperatorFactory(
- createWriteProvider(
+ StoreSinkWrite.createWriteProvider(
+ table,
env.getCheckpointConfig(),
isStreaming,
+ ignorePreviousFiles,
hasSinkMaterializer(input)),
commitUser));
if (parallelism == null) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 0dad216ab0..3687fea6f1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -47,8 +47,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
-import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
/**
@@ -155,7 +153,8 @@ public class MultiTablesStoreCompactOperator
+ " should not be true for
MultiTablesStoreCompactOperator.");
storeSinkWriteProvider =
- createWriteProvider(table, checkpointConfig, isStreaming,
ignorePreviousFiles);
+ StoreSinkWrite.createWriteProvider(
+ table, checkpointConfig, isStreaming,
ignorePreviousFiles, false);
StoreSinkWrite write =
writes.computeIfAbsent(
@@ -253,77 +252,6 @@ public class MultiTablesStoreCompactOperator
return table;
}
- private StoreSinkWrite.Provider createWriteProvider(
- FileStoreTable fileStoreTable,
- CheckpointConfig checkpointConfig,
- boolean isStreaming,
- boolean ignorePreviousFiles) {
- Options options = fileStoreTable.coreOptions().toConfiguration();
- CoreOptions.ChangelogProducer changelogProducer =
- fileStoreTable.coreOptions().changelogProducer();
- boolean waitCompaction;
- CoreOptions coreOptions = fileStoreTable.coreOptions();
- if (coreOptions.writeOnly()) {
- waitCompaction = false;
- } else {
- waitCompaction = coreOptions.prepareCommitWaitCompaction();
- int deltaCommits = -1;
- if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
- deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
- } else if
(options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
- long fullCompactionThresholdMs =
-
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL).toMillis();
- deltaCommits =
- (int)
- (fullCompactionThresholdMs
- /
checkpointConfig.getCheckpointInterval());
- }
-
- if (changelogProducer ==
CoreOptions.ChangelogProducer.FULL_COMPACTION
- || deltaCommits >= 0) {
- int finalDeltaCommits = Math.max(deltaCommits, 1);
- return (table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
- new GlobalFullCompactionSinkWrite(
- table,
- commitUser,
- state,
- ioManager,
- ignorePreviousFiles,
- waitCompaction,
- finalDeltaCommits,
- isStreaming,
- memoryPool,
- metricGroup);
- }
- }
-
- if (coreOptions.needLookup()) {
- return (table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
- new LookupSinkWrite(
- table,
- commitUser,
- state,
- ioManager,
- ignorePreviousFiles,
- waitCompaction,
- isStreaming,
- memoryPool,
- metricGroup);
- }
-
- return (table, commitUser, state, ioManager, memoryPool, metricGroup)
->
- new StoreSinkWriteImpl(
- table,
- commitUser,
- state,
- ioManager,
- ignorePreviousFiles,
- waitCompaction,
- isStreaming,
- memoryPool,
- metricGroup);
- }
-
/** {@link StreamOperatorFactory} of {@link
MultiTablesStoreCompactOperator}. */
public static class Factory
extends PrepareCommitOperator.Factory<RowData,
MultiTableCommittable> {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index 5bd2e9f9e1..d88cbea6f0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -18,18 +18,24 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.operation.WriteRestore;
+import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SerializableRunnable;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import javax.annotation.Nullable;
@@ -37,6 +43,9 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
+
/** Helper class of {@link PrepareCommitOperator} for different types of
paimon sinks. */
public interface StoreSinkWrite {
@@ -91,6 +100,95 @@ public interface StoreSinkWrite {
@Nullable MetricGroup metricGroup);
}
+ static StoreSinkWrite.Provider createWriteProvider(
+ FileStoreTable fileStoreTable,
+ CheckpointConfig checkpointConfig,
+ boolean isStreaming,
+ boolean ignorePreviousFiles,
+ boolean hasSinkMaterializer) {
+ SerializableRunnable assertNoSinkMaterializer =
+ () ->
+ Preconditions.checkArgument(
+ !hasSinkMaterializer,
+ String.format(
+ "Sink materializer must not be used
with Paimon sink. "
+ + "Please set '%s' to '%s' in
Flink's config.",
+
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE
+ .key(),
+
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));
+
+ Options options = fileStoreTable.coreOptions().toConfiguration();
+ CoreOptions.ChangelogProducer changelogProducer =
+ fileStoreTable.coreOptions().changelogProducer();
+ boolean waitCompaction;
+ CoreOptions coreOptions = fileStoreTable.coreOptions();
+ if (coreOptions.writeOnly()) {
+ waitCompaction = false;
+ } else {
+ waitCompaction = coreOptions.prepareCommitWaitCompaction();
+ int deltaCommits = -1;
+ if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
+ deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
+ } else if
(options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
+ long fullCompactionThresholdMs =
+
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL).toMillis();
+ deltaCommits =
+ (int)
+ (fullCompactionThresholdMs
+ /
checkpointConfig.getCheckpointInterval());
+ }
+
+ if (changelogProducer ==
CoreOptions.ChangelogProducer.FULL_COMPACTION
+ || deltaCommits >= 0) {
+ int finalDeltaCommits = Math.max(deltaCommits, 1);
+ return (table, commitUser, state, ioManager, memoryPool,
metricGroup) -> {
+ assertNoSinkMaterializer.run();
+ return new GlobalFullCompactionSinkWrite(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ finalDeltaCommits,
+ isStreaming,
+ memoryPool,
+ metricGroup);
+ };
+ }
+
+ if (coreOptions.needLookup()) {
+ return (table, commitUser, state, ioManager, memoryPool,
metricGroup) -> {
+ assertNoSinkMaterializer.run();
+ return new LookupSinkWrite(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ isStreaming,
+ memoryPool,
+ metricGroup);
+ };
+ }
+ }
+
+ return (table, commitUser, state, ioManager, memoryPool, metricGroup)
-> {
+ assertNoSinkMaterializer.run();
+ return new StoreSinkWriteImpl(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ isStreaming,
+ memoryPool,
+ metricGroup);
+ };
+ }
+
/** Provider of {@link StoreSinkWrite} that uses given write buffer. */
@FunctionalInterface
interface WithWriteBufferProvider extends Serializable {