This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1cc7cf2961 [flink] support detecting fields changes for dedicated
compaction in streaming mode (#6690)
1cc7cf2961 is described below
commit 1cc7cf2961277c53a0b26519eef99bfc8bed1450
Author: LsomeYeah <[email protected]>
AuthorDate: Wed Dec 17 10:18:53 2025 +0800
[flink] support detecting fields changes for dedicated compaction in
streaming mode (#6690)
---
.../paimon/flink/compact/AppendTableCompactor.java | 13 +-
.../flink/sink/AppendCompactWorkerOperator.java | 5 +-
...pendOnlyMultiTableCompactionWorkerOperator.java | 7 +-
...endOnlySingleTableCompactionWorkerOperator.java | 4 +-
.../apache/paimon/flink/sink/CompactRefresher.java | 106 ++++++++++++
.../{WriterRefresher.java => ConfigRefresher.java} | 43 ++---
.../paimon/flink/sink/StoreCompactOperator.java | 17 +-
.../paimon/flink/sink/TableWriteOperator.java | 8 +-
.../apache/paimon/flink/sink/WriteRefresher.java | 26 +++
.../paimon/flink/action/CompactActionITCase.java | 108 +++++++++++++
.../flink/action/CompactActionITCaseBase.java | 17 ++
.../paimon/flink/sink/CompactRefresherTest.java | 177 +++++++++++++++++++++
...RefresherTest.java => ConfigRefresherTest.java} | 60 ++++---
13 files changed, 521 insertions(+), 70 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
index 1efbccbc19..0a45514669 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
@@ -25,7 +25,8 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.flink.sink.Committable;
-import org.apache.paimon.flink.sink.WriterRefresher;
+import org.apache.paimon.flink.sink.CompactRefresher;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite.State;
import org.apache.paimon.operation.metrics.CompactionMetrics;
@@ -65,7 +66,7 @@ public class AppendTableCompactor {
@Nullable private final CompactionMetrics compactionMetrics;
@Nullable private final CompactionMetrics.Reporter metricsReporter;
- @Nullable protected final WriterRefresher writeRefresher;
+ @Nullable protected final CompactRefresher compactRefresher;
public AppendTableCompactor(
FileStoreTable table,
@@ -91,7 +92,7 @@ public class AppendTableCompactor {
? null
// partition and bucket fields are no use.
:
this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0);
- this.writeRefresher = WriterRefresher.create(isStreaming, table,
this::replace);
+ this.compactRefresher = CompactRefresher.create(isStreaming, table,
this::replace);
}
public void processElement(AppendCompactTask task) throws Exception {
@@ -219,12 +220,12 @@ public class AppendTableCompactor {
this.write.restore(states);
}
- public void tryRefreshWrite() {
+ public void tryRefreshWrite(List<DataFileMeta> files) {
if (commitUser == null) {
return;
}
- if (writeRefresher != null) {
- writeRefresher.tryRefresh();
+ if (compactRefresher != null && (!files.isEmpty())) {
+ compactRefresher.tryRefresh(files);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
index aa912a9eb3..e7f6ece177 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
@@ -84,10 +84,7 @@ public abstract class AppendCompactWorkerOperator<IN>
@Override
protected List<Committable> prepareCommit(boolean waitCompaction, long
checkpointId)
throws IOException {
- List<Committable> committables =
- this.unawareBucketCompactor.prepareCommit(waitCompaction,
checkpointId);
- this.unawareBucketCompactor.tryRefreshWrite();
- return committables;
+ return this.unawareBucketCompactor.prepareCommit(waitCompaction,
checkpointId);
}
private ExecutorService workerExecutor() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
index 4febb8d907..3450d80c30 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
@@ -112,9 +112,10 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
@Override
public void processElement(StreamRecord<MultiTableAppendCompactTask>
element) throws Exception {
Identifier identifier = element.getValue().tableIdentifier();
- compactorContainer
- .computeIfAbsent(identifier, this::compactor)
- .processElement(element.getValue());
+ AppendTableCompactor compactor =
+ compactorContainer.computeIfAbsent(identifier,
this::compactor);
+ compactor.tryRefreshWrite(element.getValue().compactBefore());
+ compactor.processElement(element.getValue());
}
private AppendTableCompactor compactor(Identifier tableId) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
index c1a4ce82da..6ad1de669b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
@@ -44,7 +44,9 @@ public class AppendOnlySingleTableCompactionWorkerOperator
@Override
public void processElement(StreamRecord<AppendCompactTask> element) throws
Exception {
- this.unawareBucketCompactor.processElement(element.getValue());
+ AppendCompactTask task = element.getValue();
+ this.unawareBucketCompactor.tryRefreshWrite(task.compactBefore());
+ this.unawareBucketCompactor.processElement(task);
}
/** {@link StreamOperatorFactory} of {@link
AppendOnlySingleTableCompactionWorkerOperator}. */
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactRefresher.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactRefresher.java
new file mode 100644
index 0000000000..e59e086754
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactRefresher.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.paimon.flink.sink.ConfigRefresher.configGroups;
+
+/** refresh write when table schema changed. This is for dedicated compaction.
*/
+public class CompactRefresher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactRefresher.class);
+
+ private FileStoreTable table;
+ private final WriteRefresher refresher;
+ private final @Nullable ConfigRefresher configRefresher;
+
+ private CompactRefresher(FileStoreTable table, WriteRefresher refresher) {
+ this.table = table;
+ this.refresher = refresher;
+ this.configRefresher = ConfigRefresher.create(true, table, refresher);
+ }
+
+ @Nullable
+ public static CompactRefresher create(
+ boolean isStreaming, FileStoreTable table, WriteRefresher
refresher) {
+ if (!isStreaming) {
+ return null;
+ }
+ return new CompactRefresher(table, refresher);
+ }
+
+ /**
+ * This is used for dedicated compaction in streaming mode. When the
schema-id of newly added
+ * data files exceeds the current schema-id, the writer needs to be
refreshed to prevent data
+ * loss.
+ */
+ public void tryRefresh(List<DataFileMeta> files) {
+ long fileSchemaId =
+
files.stream().mapToLong(DataFileMeta::schemaId).max().orElse(table.schema().id());
+ if (fileSchemaId > table.schema().id()) {
+ Optional<TableSchema> latestSchema =
table.schemaManager().latest();
+ if (!latestSchema.isPresent()) {
+ return;
+ }
+ TableSchema latest = latestSchema.get();
+
+ try {
+ // here we cannot use table.copy(lastestSchema), because table
used for
+ // dedicated compaction has some dynamic options, we should
not overwrite them.
+ // we just copy the lastest fields and options allowed to be
refreshed.
+ table = table.copyWithLatestSchema();
+
+ // refresh configs allowed to be updated by the way
+ if (configRefresher != null) {
+ table =
+ table.copy(
+ configGroups(
+ configRefresher.configGroups(),
+
CoreOptions.fromMap(latest.options())));
+ configRefresher.updateTable(table);
+ }
+
+ refresher.refresh(table);
+ LOG.info(
+ "write has been refreshed due to schema in data files
changed. new schema id:{}.",
+ table.schema().id());
+ } catch (Exception e) {
+ throw new RuntimeException("update write failed.", e);
+ }
+
+ } else {
+ // try refresh for configs
+ if (configRefresher != null) {
+ configRefresher.tryRefresh();
+ }
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ConfigRefresher.java
similarity index 77%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ConfigRefresher.java
index 5dca871af5..1d4b8deb08 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ConfigRefresher.java
@@ -42,24 +42,25 @@ import static
org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS
import static org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
-/** Writer refresher for refresh write when configs changed. */
-public class WriterRefresher {
+/** refresh write when configs changed. */
+public class ConfigRefresher {
- private static final Logger LOG =
LoggerFactory.getLogger(WriterRefresher.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ConfigRefresher.class);
private FileStoreTable table;
- private final Refresher refresher;
+ private final WriteRefresher refresher;
private final Set<String> configGroups;
- private WriterRefresher(FileStoreTable table, Refresher refresher,
Set<String> configGroups) {
+ private ConfigRefresher(
+ FileStoreTable table, WriteRefresher refresher, Set<String>
configGroups) {
this.table = table;
this.refresher = refresher;
this.configGroups = configGroups;
}
@Nullable
- public static WriterRefresher create(
- boolean isStreaming, FileStoreTable table, Refresher refresher) {
+ public static ConfigRefresher create(
+ boolean isStreaming, FileStoreTable table, WriteRefresher
refresher) {
if (!isStreaming) {
return null;
}
@@ -74,9 +75,13 @@ public class WriterRefresher {
if (configGroups == null || configGroups.isEmpty()) {
return null;
}
- return new WriterRefresher(table, refresher, configGroups);
+ return new ConfigRefresher(table, refresher, configGroups);
}
+ /**
+ * Try to refresh write when configs which are expected to be refreshed in
streaming mode
+ * changed.
+ */
public void tryRefresh() {
Optional<TableSchema> latestSchema = table.schemaManager().latest();
if (!latestSchema.isPresent()) {
@@ -92,17 +97,12 @@ public class WriterRefresher {
configGroups(configGroups,
CoreOptions.fromMap(latest.options()));
if (!Objects.equals(newOptions, currentOptions)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "table schema has changed, current
schema-id:{}, try to update write with new schema-id:{}. "
- + "current options:{}, new
options:{}.",
- table.schema().id(),
- latestSchema.get().id(),
- currentOptions,
- newOptions);
- }
table = table.copy(newOptions);
refresher.refresh(table);
+ LOG.info(
+ "write has been refreshed due to configs changed.
old options:{}, new options:{}.",
+ currentOptions,
+ newOptions);
}
} catch (Exception e) {
throw new RuntimeException("update write failed.", e);
@@ -110,9 +110,12 @@ public class WriterRefresher {
}
}
- /** Refresher when configs changed. */
- public interface Refresher {
- void refresh(FileStoreTable table) throws Exception;
+ public void updateTable(FileStoreTable table) {
+ this.table = table;
+ }
+
+ public Set<String> configGroups() {
+ return configGroups;
}
public static Map<String, String> configGroups(Set<String> groups,
CoreOptions options) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 989c8e91dc..0b7984d2f4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -70,7 +70,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
private transient DataFileMetaSerializer dataFileMetaSerializer;
private transient Set<Pair<BinaryRow, Integer>> waitToCompact;
- protected transient @Nullable WriterRefresher writeRefresher;
+ protected transient @Nullable CompactRefresher compactRefresher;
public StoreCompactOperator(
StreamOperatorParameters<Committable> parameters,
@@ -119,7 +119,8 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
getContainingTask().getEnvironment().getIOManager(),
memoryPoolFactory,
getMetricGroup());
- this.writeRefresher = WriterRefresher.create(write.streamingMode(),
table, write::replace);
+ this.compactRefresher =
+ CompactRefresher.create(write.streamingMode(), table,
write::replace);
}
@Override
@@ -150,6 +151,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
if (write.streamingMode()) {
write.notifyNewFiles(snapshotId, partition, bucket, files);
+ tryRefreshWrite(files);
} else {
Preconditions.checkArgument(
files.isEmpty(),
@@ -173,10 +175,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
}
waitToCompact.clear();
- List<Committable> committables = write.prepareCommit(waitCompaction,
checkpointId);
-
- tryRefreshWrite();
- return committables;
+ return write.prepareCommit(waitCompaction, checkpointId);
}
@Override
@@ -197,9 +196,9 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
return waitToCompact;
}
- private void tryRefreshWrite() {
- if (writeRefresher != null) {
- writeRefresher.tryRefresh();
+ private void tryRefreshWrite(List<DataFileMeta> files) {
+ if (compactRefresher != null) {
+ compactRefresher.tryRefresh(files);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index caf92cb742..70a6af8a25 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -58,7 +58,7 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
protected transient StoreSinkWriteState state;
protected transient StoreSinkWrite write;
- protected transient @Nullable WriterRefresher writeRefresher;
+ protected transient @Nullable ConfigRefresher configRefresher;
public TableWriteOperator(
StreamOperatorParameters<Committable> parameters,
@@ -99,7 +99,7 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
if (writeRestore != null) {
write.setWriteRestore(writeRestore);
}
- this.writeRefresher = WriterRefresher.create(write.streamingMode(),
table, write::replace);
+ this.configRefresher = ConfigRefresher.create(write.streamingMode(),
table, write::replace);
}
public void setWriteRestore(@Nullable WriteRestore writeRestore) {
@@ -157,8 +157,8 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
}
protected void tryRefreshWrite() {
- if (writeRefresher != null) {
- writeRefresher.tryRefresh();
+ if (configRefresher != null) {
+ configRefresher.tryRefresh();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriteRefresher.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriteRefresher.java
new file mode 100644
index 0000000000..83fb376556
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriteRefresher.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.table.FileStoreTable;
+
+/** refresher for refreshing write in streaming mode. */
+public interface WriteRefresher {
+ void refresh(FileStoreTable table) throws Exception;
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index b5409e22af..fb941a73d5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -26,9 +26,11 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TraceableFileIO;
@@ -367,6 +369,62 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
assertThat(fileIO.listStatus(new
Path(externalPath2)).length).isGreaterThanOrEqualTo(1);
}
+ @Test
+ public void testStreamingCompactWithAddingColumns() throws Exception {
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.CHANGELOG_PRODUCER.key(),
"full-compaction");
+ tableOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
+ tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(),
"1s");
+ tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+
+ FileStoreTable table =
+ prepareTable(
+ Arrays.asList("dt", "hh"),
+ Arrays.asList("dt", "hh", "k"),
+ Collections.emptyList(),
+ tableOptions);
+
+ // base records
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString("20221208")),
+ rowData(1, 100, 16, BinaryString.fromString("20221208")),
+ rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+ checkLatestSnapshot(table, 1, Snapshot.CommitKind.APPEND);
+ runAction(true);
+ checkLatestSnapshot(table, 2, Snapshot.CommitKind.COMPACT, 60_000);
+
+ SchemaChange schemaChange = SchemaChange.addColumn("new_col",
DataTypes.INT());
+ table.schemaManager().commitChanges(schemaChange);
+ // wait a checkpoint at least to wait the writer refresh
+ Thread.sleep(1000);
+
+ // incremental records
+ table = getFileStoreTable(tableName);
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+ writeData(
+ rowData(1, 101, 15, BinaryString.fromString("20221208"), 1),
+ rowData(1, 101, 16, BinaryString.fromString("20221208"), 1),
+ rowData(2, 101, 15, BinaryString.fromString("20221209"), 2));
+ checkLatestSnapshot(table, 3, Snapshot.CommitKind.APPEND);
+ checkLatestSnapshot(table, 4, Snapshot.CommitKind.COMPACT, 60_000);
+
+ List<String> res =
+ getResult(
+ table.newRead(),
+ table.newSnapshotReader().read().splits(),
+ table.rowType());
+ assertThat(res)
+ .containsExactlyInAnyOrder(
+ "+I[1, 101, 15, 20221208, 1]",
+ "+I[1, 101, 16, 20221208, 1]",
+ "+I[1, 100, 15, 20221209, NULL]",
+ "+I[2, 101, 15, 20221209, 2]");
+ }
+
@Test
public void testUnawareBucketStreamingCompact() throws Exception {
Map<String, String> tableOptions = new HashMap<>();
@@ -474,6 +532,56 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
assertThat(fileIO.listStatus(new
Path(externalPath2)).length).isGreaterThanOrEqualTo(1);
}
+ @Test
+ public void testUnawareBucketStreamingCompactWithWithAddingColumns()
throws Exception {
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(),
"1s");
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+ tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+ tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+
+ FileStoreTable table =
+ prepareTable(
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ tableOptions);
+
+ // base records
+ writeData(rowData(1, 100, 15, BinaryString.fromString("20221208")));
+ writeData(rowData(1, 100, 16, BinaryString.fromString("20221208")));
+
+ checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+ // repairing that the ut don't specify the real partition of table
+ runActionForUnawareTable(true);
+
+ checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT, 60_000);
+
+ SchemaChange schemaChange = SchemaChange.addColumn("new_col",
DataTypes.INT());
+ table.schemaManager().commitChanges(schemaChange);
+ Thread.sleep(1000);
+
+ table = getFileStoreTable(tableName);
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+ writeData(rowData(1, 101, 15, BinaryString.fromString("20221208"), 1));
+ checkLatestSnapshot(table, 4, Snapshot.CommitKind.APPEND);
+ checkLatestSnapshot(table, 5, Snapshot.CommitKind.COMPACT, 60_000);
+
+ List<String> res =
+ getResult(
+ table.newRead(),
+ table.newSnapshotReader().read().splits(),
+ table.rowType());
+ assertThat(res)
+ .containsExactlyInAnyOrder(
+ "+I[1, 100, 15, 20221208, NULL]",
+ "+I[1, 100, 16, 20221208, NULL]",
+ "+I[1, 101, 15, 20221208, 1]");
+ }
+
@Test
public void testUnawareBucketBatchCompact() throws Exception {
Map<String, String> tableOptions = new HashMap<>();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
index 41d01bdf7f..c246f6a4bc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
@@ -107,6 +107,23 @@ public class CompactActionITCaseBase extends
ActionITCaseBase {
assertThat(snapshot.commitKind()).isEqualTo(commitKind);
}
+ protected void checkLatestSnapshot(
+ FileStoreTable table, long snapshotId, Snapshot.CommitKind
commitKind, long timeout)
+ throws Exception {
+ SnapshotManager snapshotManager = table.snapshotManager();
+ long start = System.currentTimeMillis();
+ while (!Objects.equals(snapshotManager.latestSnapshotId(),
snapshotId)) {
+ Thread.sleep(500);
+ if (System.currentTimeMillis() - start > timeout) {
+ throw new RuntimeException("can't wait for a compaction.");
+ }
+ }
+
+ Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(snapshotId);
+ assertThat(snapshot.commitKind()).isEqualTo(commitKind);
+ }
+
protected FileStoreTable prepareTable(
List<String> partitionKeys,
List<String> primaryKeys,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactRefresherTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactRefresherTest.java
new file mode 100644
index 0000000000..aed46ba74c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactRefresherTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.flink.sink.ConfigRefresher.configGroups;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CompactRefresher}. */
+public class CompactRefresherTest {
+
+ @TempDir public java.nio.file.Path tempDir;
+
+ Catalog catalog;
+
+ @BeforeEach
+ public void before() throws Exception {
+ Options options = new Options();
+ options.set(WAREHOUSE, tempDir.toString());
+ options.set(CACHE_ENABLED, false);
+ CatalogContext context = CatalogContext.create(options);
+ catalog = CatalogFactory.createCatalog(context);
+ catalog.createDatabase("default", true);
+ }
+
+ @Test
+ public void testRefreshFieldsAndConfigs() throws Exception {
+ String detectGroups = "external-paths";
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(),
detectGroups);
+ createTable(options);
+ FileStoreTable table1 = getTable();
+
+ table1.schemaManager()
+ .commitChanges(
+ SchemaChange.addColumn("c", DataTypes.INT()),
+ SchemaChange.setOption(
+
CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(),
+ "round-robin"));
+ FileStoreTable table2 = getTable();
+ try (BatchTableWrite write = table2.newBatchWriteBuilder().newWrite();
+ BatchTableCommit commit =
table2.newBatchWriteBuilder().newCommit()) {
+ write.write(GenericRow.of(1, 1, 1));
+ commit.commit(write.prepareCommit());
+ }
+
+ List<DataField> dataFields = new ArrayList<>();
+ Map<String, String> refreshedOptions = new HashMap<>();
+ Set<String> groups =
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
+ CompactRefresher writerRefresher =
+ CompactRefresher.create(
+ true,
+ table1,
+ new ConfigRefresherTest.TestWriteRefresher(
+ groups, refreshedOptions, dataFields));
+ writerRefresher.tryRefresh(
+
table2.newSnapshotReader().read().dataSplits().get(0).dataFiles());
+ assertThat(dataFields).isEqualTo(table2.schema().fields());
+ assertThat(refreshedOptions).isEqualTo(configGroups(groups,
table2.coreOptions()));
+
+ // should not refresh again
+ writerRefresher.tryRefresh(Collections.emptyList());
+ }
+
+ @Test
+ public void testRefreshOnlyFields() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ createTable(options);
+ FileStoreTable table1 = getTable();
+
+ table1.schemaManager().commitChanges(SchemaChange.addColumn("c",
DataTypes.INT()));
+ FileStoreTable table2 = getTable();
+ try (BatchTableWrite write = table2.newBatchWriteBuilder().newWrite();
+ BatchTableCommit commit =
table2.newBatchWriteBuilder().newCommit()) {
+ write.write(GenericRow.of(1, 1, 1));
+ commit.commit(write.prepareCommit());
+ }
+
+ List<DataField> dataFields = new ArrayList<>();
+ CompactRefresher writerRefresher =
+ CompactRefresher.create(
+ true,
+ table1,
+ new ConfigRefresherTest.TestWriteRefresher(
+ null, Collections.emptyMap(), dataFields));
+ writerRefresher.tryRefresh(
+
table2.newSnapshotReader().read().dataSplits().get(0).dataFiles());
+ assertThat(dataFields).isEqualTo(table2.schema().fields());
+ }
+
+ @Test
+ public void testRefreshOnlyConfigs() throws Exception {
+ String detectGroups = "external-paths";
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(),
detectGroups);
+ createTable(options);
+ FileStoreTable table1 = getTable();
+
+ table1.schemaManager()
+ .commitChanges(
+ SchemaChange.setOption(
+
CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(),
+ "round-robin"));
+ FileStoreTable table2 = getTable();
+
+ Map<String, String> refreshedOptions = new HashMap<>();
+ Set<String> groups =
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
+ CompactRefresher writerRefresher =
+ CompactRefresher.create(
+ true,
+ table1,
+ new ConfigRefresherTest.TestWriteRefresher(groups,
refreshedOptions, null));
+ writerRefresher.tryRefresh(Collections.emptyList());
+ assertThat(refreshedOptions).isEqualTo(configGroups(groups,
table2.coreOptions()));
+ }
+
+ private void createTable(Map<String, String> options) throws Exception {
+ catalog.createTable(
+ Identifier.create("default", "T"),
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .options(options)
+ .build(),
+ false);
+ }
+
+ private FileStoreTable getTable() throws Exception {
+ return (FileStoreTable) catalog.getTable(Identifier.create("default",
"T"));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/ConfigRefresherTest.java
similarity index 83%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/ConfigRefresherTest.java
index 10a793753e..9e51a1cc8d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/ConfigRefresherTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.junit.jupiter.api.BeforeEach;
@@ -36,17 +37,18 @@ import org.junit.jupiter.api.io.TempDir;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import static org.apache.paimon.flink.sink.WriterRefresher.configGroups;
+import static org.apache.paimon.flink.sink.ConfigRefresher.configGroups;
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link WriterRefresher}. */
-public class WriterRefresherTest {
+/** Test for {@link ConfigRefresher}. */
+public class ConfigRefresherTest {
@TempDir public java.nio.file.Path tempDir;
Catalog catalog;
@@ -81,12 +83,12 @@ public class WriterRefresherTest {
Map<String, String> refreshedOptions = new HashMap<>();
Set<String> groups =
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
- WriterRefresher writerRefresher =
- WriterRefresher.create(
+ ConfigRefresher configRefresher =
+ ConfigRefresher.create(
true, table1, new TestWriteRefresher(groups,
refreshedOptions));
- writerRefresher.tryRefresh();
+ configRefresher.tryRefresh();
assertThat(refreshedOptions).isEqualTo(configGroups(groups,
table2.coreOptions()));
- writerRefresher.tryRefresh();
+ configRefresher.tryRefresh();
}
@Test
@@ -95,9 +97,9 @@ public class WriterRefresherTest {
Map<String, String> options = new HashMap<>();
createTable(options);
FileStoreTable table1 = getTable();
- WriterRefresher writerRefresher =
- WriterRefresher.create(true, table1, new
TestWriteRefresher(null, null));
- assertThat(writerRefresher).isNull();
+ ConfigRefresher configRefresher =
+ ConfigRefresher.create(true, table1, new
TestWriteRefresher(null, null));
+ assertThat(configRefresher).isNull();
}
@Test
@@ -107,9 +109,9 @@ public class WriterRefresherTest {
options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(),
"");
createTable(options);
FileStoreTable table1 = getTable();
- WriterRefresher writerRefresher =
- WriterRefresher.create(true, table1, new
TestWriteRefresher(null, null));
- assertThat(writerRefresher).isNull();
+ ConfigRefresher configRefresher =
+ ConfigRefresher.create(true, table1, new
TestWriteRefresher(null, null));
+ assertThat(configRefresher).isNull();
}
@Test
@@ -126,9 +128,9 @@ public class WriterRefresherTest {
.filter(s -> !s.trim().isEmpty())
.collect(Collectors.toSet());
- WriterRefresher writerRefresher =
- WriterRefresher.create(true, table1, new
TestWriteRefresher(emptyGroups, null));
- assertThat(writerRefresher).isNull();
+ ConfigRefresher configRefresher =
+ ConfigRefresher.create(true, table1, new
TestWriteRefresher(emptyGroups, null));
+ assertThat(configRefresher).isNull();
}
@Test
@@ -144,12 +146,12 @@ public class WriterRefresherTest {
refreshedOptions.put("initial", "value");
Set<String> groups =
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
- WriterRefresher writerRefresher =
- WriterRefresher.create(
+ ConfigRefresher configRefresher =
+ ConfigRefresher.create(
true, table1, new TestWriteRefresher(groups,
refreshedOptions));
// No schema changes made, should not refresh
- writerRefresher.tryRefresh();
+ configRefresher.tryRefresh();
// Options should remain unchanged
assertThat(refreshedOptions).containsEntry("initial", "value");
@@ -180,12 +182,12 @@ public class WriterRefresherTest {
refreshedOptions.put("initial", "value");
Set<String> groups =
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
- WriterRefresher writerRefresher =
- WriterRefresher.create(
+ ConfigRefresher configRefresher =
+ ConfigRefresher.create(
true, table1, new TestWriteRefresher(groups,
refreshedOptions));
// Should not refresh when monitored config groups haven't changed
- writerRefresher.tryRefresh();
+ configRefresher.tryRefresh();
// Options should remain unchanged
assertThat(refreshedOptions).containsEntry("initial", "value");
@@ -207,14 +209,22 @@ public class WriterRefresherTest {
return (FileStoreTable) catalog.getTable(Identifier.create("default",
"T"));
}
- private static class TestWriteRefresher implements
WriterRefresher.Refresher {
+ /** Test implementation of {@link WriteRefresher} for testing purposes. */
+ protected static class TestWriteRefresher implements WriteRefresher {
private final Set<String> groups;
private final Map<String, String> options;
+ private final List<DataField> dataFields;
TestWriteRefresher(Set<String> groups, Map<String, String> options) {
+ this(groups, options, null);
+ }
+
+ TestWriteRefresher(
+ Set<String> groups, Map<String, String> options,
List<DataField> fields) {
this.groups = groups;
this.options = options;
+ this.dataFields = fields;
}
@Override
@@ -223,6 +233,10 @@ public class WriterRefresherTest {
if (groups != null) {
options.putAll(configGroups(groups, table.coreOptions()));
}
+ if (dataFields != null) {
+ dataFields.clear();
+ dataFields.addAll(table.schema().fields());
+ }
}
}
}