This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2cb22c3e68 [flink] Refactor WriterRefresher to better abstraction
(#6070)
2cb22c3e68 is described below
commit 2cb22c3e68c50c44831b341197409baabacfc9b8
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Aug 13 13:54:01 2025 +0800
[flink] Refactor WriterRefresher to better abstraction (#6070)
---
.../main/java/org/apache/paimon/CoreOptions.java | 12 ---
.../paimon/flink/compact/AppendTableCompactor.java | 37 ++++----
.../paimon/flink/sink/StoreCompactOperator.java | 15 +---
.../paimon/flink/sink/TableWriteOperator.java | 15 +---
.../apache/paimon/flink/sink/WriterRefresher.java | 76 ++++++++++------
.../paimon/flink/sink/WriterRefresherTest.java | 100 ++++++---------------
6 files changed, 98 insertions(+), 157 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 488445352c..e0f9af088d 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2683,18 +2683,6 @@ public class CoreOptions implements Serializable {
return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
}
- public Map<String, String> configGroups(Set<String> groups) {
- Map<String, String> configs = new HashMap<>();
- // external-paths config group
- String externalPaths = "external-paths";
- if (groups.contains(externalPaths)) {
- configs.put(DATA_FILE_EXTERNAL_PATHS.key(),
dataFileExternalPaths());
- configs.put(DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(),
externalPathStrategy().toString());
- configs.put(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS.key(),
externalSpecificFS());
- }
- return configs;
- }
-
public Boolean forceRewriteAllFiles() {
return options.get(COMPACTION_FORCE_REWRITE_ALL_FILES);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
index 45d60cac24..4b85eb5169 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
@@ -22,11 +22,12 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.AppendCompactTask;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.WriterRefresher;
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
-import org.apache.paimon.operation.FileStoreWrite;
+import org.apache.paimon.operation.FileStoreWrite.State;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.operation.metrics.MetricUtils;
import org.apache.paimon.table.FileStoreTable;
@@ -56,17 +57,15 @@ public class AppendTableCompactor {
private static final Logger LOG =
LoggerFactory.getLogger(AppendTableCompactor.class);
private FileStoreTable table;
- private final String commitUser;
-
- private transient BaseAppendFileStoreWrite write;
+ private BaseAppendFileStoreWrite write;
- protected final transient Queue<Future<CommitMessage>> result;
-
- private final transient Supplier<ExecutorService> compactExecutorsupplier;
- @Nullable private final transient CompactionMetrics compactionMetrics;
- @Nullable private final transient CompactionMetrics.Reporter
metricsReporter;
+ private final String commitUser;
+ protected final Queue<Future<CommitMessage>> result;
+ private final Supplier<ExecutorService> compactExecutorsupplier;
+ @Nullable private final CompactionMetrics compactionMetrics;
+ @Nullable private final CompactionMetrics.Reporter metricsReporter;
- @Nullable protected final transient
WriterRefresher<BaseAppendFileStoreWrite> writeRefresher;
+ @Nullable protected final WriterRefresher writeRefresher;
public AppendTableCompactor(
FileStoreTable table,
@@ -92,12 +91,7 @@ public class AppendTableCompactor {
? null
// partition and bucket fields are no use.
:
this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0);
- if (isStreaming) {
- this.writeRefresher =
- new WriterRefresher<>(table, write, (newTable, writer) ->
replace(newTable));
- } else {
- this.writeRefresher = null;
- }
+ this.writeRefresher = WriterRefresher.create(isStreaming, table,
this::replace);
}
public void processElement(AppendCompactTask task) throws Exception {
@@ -218,18 +212,17 @@ public class AppendTableCompactor {
}
private void replace(FileStoreTable newTable) throws Exception {
-
- List<? extends FileStoreWrite.State<?>> states = write.checkpoint();
- write.close();
- write = (BaseAppendFileStoreWrite)
newTable.store().newWrite(commitUser);
- write.restore((List) states);
+ this.table = newTable;
+ List<State<InternalRow>> states = write.checkpoint();
+ this.write.close();
+ this.write = (BaseAppendFileStoreWrite)
newTable.store().newWrite(commitUser);
+ this.write.restore(states);
}
public void tryRefreshWrite() {
if (commitUser == null) {
return;
}
-
if (writeRefresher != null) {
writeRefresher.tryRefresh();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 47398db852..7676084b94 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -70,7 +70,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
private transient DataFileMetaSerializer dataFileMetaSerializer;
private transient Set<Pair<BinaryRow, Integer>> waitToCompact;
- protected transient @Nullable WriterRefresher<StoreSinkWrite>
writeRefresher;
+ protected transient @Nullable WriterRefresher writeRefresher;
public StoreCompactOperator(
StreamOperatorParameters<Committable> parameters,
@@ -119,18 +119,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
getContainingTask().getEnvironment().getIOManager(),
memoryPool,
getMetricGroup());
-
- if (write.streamingMode()) {
- writeRefresher =
- new WriterRefresher<>(
- table,
- write,
- (newTable, writer) -> {
- writer.replace(newTable);
- });
- } else {
- writeRefresher = null;
- }
+ this.writeRefresher = WriterRefresher.create(write.streamingMode(),
table, write::replace);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index bdce88e2ea..ca27059265 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -58,7 +58,7 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
protected transient StoreSinkWriteState state;
protected transient StoreSinkWrite write;
- protected transient @Nullable WriterRefresher<StoreSinkWrite>
writeRefresher;
+ protected transient @Nullable WriterRefresher writeRefresher;
public TableWriteOperator(
StreamOperatorParameters<Committable> parameters,
@@ -99,18 +99,7 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
if (writeRestore != null) {
write.setWriteRestore(writeRestore);
}
-
- if (write.streamingMode()) {
- writeRefresher =
- new WriterRefresher<>(
- table,
- write,
- (newTable, writer) -> {
- writer.replace(newTable);
- });
- } else {
- writeRefresher = null;
- }
+ this.writeRefresher = WriterRefresher.create(write.streamingMode(),
table, write::replace);
}
public void setWriteRestore(@Nullable WriteRestore writeRestore) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
index 37d3a5cc9d..5dca871af5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,49 +30,66 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS;
+import static
org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS;
+import static org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY;
+import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
+
/** Writer refresher for refresh write when configs changed. */
-public class WriterRefresher<T> {
+public class WriterRefresher {
private static final Logger LOG =
LoggerFactory.getLogger(WriterRefresher.class);
- @Nullable private final Set<String> configGroups;
- private final Refresher<T> refresher;
private FileStoreTable table;
- private T write;
+ private final Refresher refresher;
+ private final Set<String> configGroups;
- public WriterRefresher(FileStoreTable table, T write, Refresher<T>
refresher) {
+ private WriterRefresher(FileStoreTable table, Refresher refresher,
Set<String> configGroups) {
this.table = table;
- this.write = write;
this.refresher = refresher;
+ this.configGroups = configGroups;
+ }
+
+ @Nullable
+ public static WriterRefresher create(
+ boolean isStreaming, FileStoreTable table, Refresher refresher) {
+ if (!isStreaming) {
+ return null;
+ }
+
String refreshDetectors =
Options.fromMap(table.options())
.get(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS);
- if (StringUtils.isNullOrWhitespaceOnly(refreshDetectors)) {
- configGroups = null;
- } else {
- configGroups =
Arrays.stream(refreshDetectors.split(",")).collect(Collectors.toSet());
+ Set<String> configGroups =
+ isNullOrWhitespaceOnly(refreshDetectors)
+ ? null
+ :
Arrays.stream(refreshDetectors.split(",")).collect(Collectors.toSet());
+ if (configGroups == null || configGroups.isEmpty()) {
+ return null;
}
+ return new WriterRefresher(table, refresher, configGroups);
}
public void tryRefresh() {
- if (configGroups == null || configGroups.isEmpty()) {
+ Optional<TableSchema> latestSchema = table.schemaManager().latest();
+ if (!latestSchema.isPresent()) {
return;
}
- Optional<TableSchema> latestSchema = table.schemaManager().latest();
- if (latestSchema.isPresent() && latestSchema.get().id() >
table.schema().id()) {
+ TableSchema latest = latestSchema.get();
+ if (latest.id() > table.schema().id()) {
try {
Map<String, String> currentOptions =
-
CoreOptions.fromMap(table.schema().options()).configGroups(configGroups);
+ configGroups(configGroups, table.coreOptions());
Map<String, String> newOptions =
- CoreOptions.fromMap(latestSchema.get().options())
- .configGroups(configGroups);
+ configGroups(configGroups,
CoreOptions.fromMap(latest.options()));
if (!Objects.equals(newOptions, currentOptions)) {
if (LOG.isDebugEnabled()) {
@@ -86,7 +102,7 @@ public class WriterRefresher<T> {
newOptions);
}
table = table.copy(newOptions);
- refresher.refresh(table, write);
+ refresher.refresh(table);
}
} catch (Exception e) {
throw new RuntimeException("update write failed.", e);
@@ -94,12 +110,22 @@ public class WriterRefresher<T> {
}
}
- /**
- * Refresher for refresh write when configs changed.
- *
- * @param <T> the type of writer.
- */
- public interface Refresher<T> {
- void refresh(FileStoreTable table, T writer) throws Exception;
+ /** Refresher when configs changed. */
+ public interface Refresher {
+ void refresh(FileStoreTable table) throws Exception;
+ }
+
+ public static Map<String, String> configGroups(Set<String> groups,
CoreOptions options) {
+ Map<String, String> configs = new HashMap<>();
+ // external-paths config group
+ String externalPaths = "external-paths";
+ if (groups.contains(externalPaths)) {
+ configs.put(DATA_FILE_EXTERNAL_PATHS.key(),
options.dataFileExternalPaths());
+ configs.put(
+ DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(),
+ options.externalPathStrategy().toString());
+ configs.put(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS.key(),
options.externalSpecificFS());
+ }
+ return configs;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
index ca4e3a76b6..10a793753e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.paimon.flink.sink.WriterRefresher.configGroups;
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.assertj.core.api.Assertions.assertThat;
@@ -80,10 +81,11 @@ public class WriterRefresherTest {
Map<String, String> refreshedOptions = new HashMap<>();
Set<String> groups =
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
- WriterRefresher<?> writerRefresher =
- new WriterRefresher<>(table1, refreshedOptions, new
TestWriteRefresher(groups));
+ WriterRefresher writerRefresher =
+ WriterRefresher.create(
+ true, table1, new TestWriteRefresher(groups,
refreshedOptions));
writerRefresher.tryRefresh();
-
assertThat(refreshedOptions).isEqualTo(table2.coreOptions().configGroups(groups));
+ assertThat(refreshedOptions).isEqualTo(configGroups(groups,
table2.coreOptions()));
writerRefresher.tryRefresh();
}
@@ -92,27 +94,10 @@ public class WriterRefresherTest {
// Create table without SINK_WRITER_REFRESH_DETECTORS option
Map<String, String> options = new HashMap<>();
createTable(options);
-
FileStoreTable table1 = getTable();
-
- // Make schema changes
- table1.schemaManager()
- .commitChanges(
- SchemaChange.setOption(
- CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(),
"external-path1"));
-
- Map<String, String> refreshedOptions = new HashMap<>();
- refreshedOptions.put("initial", "value");
-
- WriterRefresher<?> writerRefresher =
- new WriterRefresher<>(table1, refreshedOptions, new
TestWriteRefresher(null));
-
- // Should not refresh when configGroups is null
- writerRefresher.tryRefresh();
-
- // Options should remain unchanged
- assertThat(refreshedOptions).containsEntry("initial", "value");
- assertThat(refreshedOptions).hasSize(1);
+ WriterRefresher writerRefresher =
+ WriterRefresher.create(true, table1, new
TestWriteRefresher(null, null));
+ assertThat(writerRefresher).isNull();
}
@Test
@@ -121,27 +106,10 @@ public class WriterRefresherTest {
Map<String, String> options = new HashMap<>();
options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(),
"");
createTable(options);
-
FileStoreTable table1 = getTable();
-
- // Make schema changes
- table1.schemaManager()
- .commitChanges(
- SchemaChange.setOption(
- CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(),
"external-path1"));
-
- Map<String, String> refreshedOptions = new HashMap<>();
- refreshedOptions.put("initial", "value");
-
- WriterRefresher<?> writerRefresher =
- new WriterRefresher<>(table1, refreshedOptions, new
TestWriteRefresher(null));
-
- // Should not refresh when configGroups is empty
- writerRefresher.tryRefresh();
-
- // Options should remain unchanged since empty configGroups should
trigger early return
- assertThat(refreshedOptions).containsEntry("initial", "value");
- assertThat(refreshedOptions).hasSize(1);
+ WriterRefresher writerRefresher =
+ WriterRefresher.create(true, table1, new
TestWriteRefresher(null, null));
+ assertThat(writerRefresher).isNull();
}
@Test
@@ -153,30 +121,14 @@ public class WriterRefresherTest {
FileStoreTable table1 = getTable();
- // Make schema changes
- table1.schemaManager()
- .commitChanges(
- SchemaChange.setOption(
- CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(),
"external-path1"));
-
- Map<String, String> refreshedOptions = new HashMap<>();
- refreshedOptions.put("initial", "value");
-
Set<String> emptyGroups =
Arrays.stream(",,,".split(","))
.filter(s -> !s.trim().isEmpty())
.collect(Collectors.toSet());
- WriterRefresher<?> writerRefresher =
- new WriterRefresher<>(
- table1, refreshedOptions, new
TestWriteRefresher(emptyGroups));
-
- // Should not refresh when configGroups is effectively empty
- writerRefresher.tryRefresh();
-
- // Options should remain unchanged
- assertThat(refreshedOptions).containsEntry("initial", "value");
- assertThat(refreshedOptions).hasSize(1);
+ WriterRefresher writerRefresher =
+ WriterRefresher.create(true, table1, new
TestWriteRefresher(emptyGroups, null));
+ assertThat(writerRefresher).isNull();
}
@Test
@@ -192,8 +144,9 @@ public class WriterRefresherTest {
refreshedOptions.put("initial", "value");
Set<String> groups =
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
- WriterRefresher<?> writerRefresher =
- new WriterRefresher<>(table1, refreshedOptions, new
TestWriteRefresher(groups));
+ WriterRefresher writerRefresher =
+ WriterRefresher.create(
+ true, table1, new TestWriteRefresher(groups,
refreshedOptions));
// No schema changes made, should not refresh
writerRefresher.tryRefresh();
@@ -227,8 +180,9 @@ public class WriterRefresherTest {
refreshedOptions.put("initial", "value");
Set<String> groups =
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
- WriterRefresher<?> writerRefresher =
- new WriterRefresher<>(table1, refreshedOptions, new
TestWriteRefresher(groups));
+ WriterRefresher writerRefresher =
+ WriterRefresher.create(
+ true, table1, new TestWriteRefresher(groups,
refreshedOptions));
// Should not refresh when monitored config groups haven't changed
writerRefresher.tryRefresh();
@@ -253,19 +207,21 @@ public class WriterRefresherTest {
return (FileStoreTable) catalog.getTable(Identifier.create("default",
"T"));
}
- private static class TestWriteRefresher
- implements WriterRefresher.Refresher<Map<String, String>> {
- Set<String> groups;
+ private static class TestWriteRefresher implements
WriterRefresher.Refresher {
+
+ private final Set<String> groups;
+ private final Map<String, String> options;
- TestWriteRefresher(Set<String> groups) {
+ TestWriteRefresher(Set<String> groups, Map<String, String> options) {
this.groups = groups;
+ this.options = options;
}
@Override
- public void refresh(FileStoreTable table, Map<String, String> options)
throws Exception {
+ public void refresh(FileStoreTable table) {
options.clear();
if (groups != null) {
- options.putAll(table.coreOptions().configGroups(groups));
+ options.putAll(configGroups(groups, table.coreOptions()));
}
}
}