This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0ba4b951e1 [core] Support watermark partition markdone mode (#5284)
0ba4b951e1 is described below
commit 0ba4b951e112ccb308e574c46c293e11fab47837
Author: xiangyu0xf <[email protected]>
AuthorDate: Tue Mar 25 15:53:58 2025 +0800
[core] Support watermark partition markdone mode (#5284)
---
.../generated/flink_connector_configuration.html | 6 +
.../apache/paimon/flink/FlinkConnectorOptions.java | 34 ++++
.../flink/sink/partition/PartitionMarkDone.java | 81 +++++++++-
.../sink/partition/PartitionMarkDoneTrigger.java | 29 +++-
.../partition/WatermarkPartitionMarkDoneTest.java | 172 +++++++++++++++++++++
5 files changed, 313 insertions(+), 9 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 85cfdae55d..6c273e91e1 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -86,6 +86,12 @@ under the License.
<td>Duration</td>
<td>Set a time duration when a partition has no new data after
this time duration, mark the done status to indicate that the data is
ready.</td>
</tr>
+ <tr>
+ <td><h5>partition.mark-done-action.mode</h5></td>
+ <td style="word-wrap: break-word;">process-time</td>
+ <td><p>Enum</p></td>
+ <td>How to trigger partition mark done action.<br /><br />Possible
values:<ul><li>"process-time": Based on the time of the machine, mark the
partition done once the processing time passes period time plus
delay.</li><li>"watermark": Based on the watermark of the input, mark the
partition done once the watermark passes period time plus delay.</li></ul></td>
+ </tr>
<tr>
<td><h5>partition.time-interval</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index a9eaddae69..e63356add8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -356,6 +356,12 @@ public class FlinkConnectorOptions {
.withDescription(
"Allow sink committer and writer operator to be
chained together");
+ public static final ConfigOption<PartitionMarkDoneActionMode>
PARTITION_MARK_DONE_MODE =
+ key("partition.mark-done-action.mode")
+ .enumType(PartitionMarkDoneActionMode.class)
+ .defaultValue(PartitionMarkDoneActionMode.PROCESS_TIME)
+ .withDescription("How to trigger partition mark done
action.");
+
public static final ConfigOption<Duration> PARTITION_IDLE_TIME_TO_DONE =
key("partition.idle-time-to-done")
.durationType()
@@ -544,4 +550,32 @@ public class FlinkConnectorOptions {
return text(description);
}
}
+
+ /** The mode for partition mark done. */
+ public enum PartitionMarkDoneActionMode implements DescribedEnum {
+ PROCESS_TIME(
+ "process-time",
+ "Based on the time of the machine, mark the partition done
once the processing time passes period time plus delay."),
+ WATERMARK(
+ "watermark",
+ "Based on the watermark of the input, mark the partition done
once the watermark passes period time plus delay.");
+
+ private final String value;
+ private final String description;
+
+ PartitionMarkDoneActionMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index 6f360c7823..7e7aa57495 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink.partition;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.data.BinaryRow;
+import
org.apache.paimon.flink.FlinkConnectorOptions.PartitionMarkDoneActionMode;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
@@ -32,24 +33,33 @@ import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;
import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_MODE;
/** Mark partition done. */
public class PartitionMarkDone implements PartitionListener {
+ private static final Logger LOG =
LoggerFactory.getLogger(PartitionMarkDone.class);
+
private final InternalRowPartitionComputer partitionComputer;
private final PartitionMarkDoneTrigger trigger;
private final List<PartitionMarkDoneAction> actions;
private final boolean waitCompaction;
+ private final PartitionMarkDoneActionMode partitionMarkDoneActionMode;
public static Optional<PartitionMarkDone> create(
ClassLoader cl,
@@ -86,7 +96,12 @@ public class PartitionMarkDone implements PartitionListener {
|| coreOptions.mergeEngine() ==
MergeEngine.FIRST_ROW);
return Optional.of(
- new PartitionMarkDone(partitionComputer, trigger, actions,
waitCompaction));
+ new PartitionMarkDone(
+ partitionComputer,
+ trigger,
+ actions,
+ waitCompaction,
+ options.get(PARTITION_MARK_DONE_MODE)));
}
private static boolean disablePartitionMarkDone(
@@ -108,15 +123,25 @@ public class PartitionMarkDone implements
PartitionListener {
InternalRowPartitionComputer partitionComputer,
PartitionMarkDoneTrigger trigger,
List<PartitionMarkDoneAction> actions,
- boolean waitCompaction) {
+ boolean waitCompaction,
+ PartitionMarkDoneActionMode partitionMarkDoneActionMode) {
this.partitionComputer = partitionComputer;
this.trigger = trigger;
this.actions = actions;
this.waitCompaction = waitCompaction;
+ this.partitionMarkDoneActionMode = partitionMarkDoneActionMode;
}
@Override
public void notifyCommittable(List<ManifestCommittable> committables) {
+ if (partitionMarkDoneActionMode ==
PartitionMarkDoneActionMode.WATERMARK) {
+ markDoneByWatermark(committables);
+ } else {
+ markDoneByProcessTime(committables);
+ }
+ }
+
+ private void markDoneByProcessTime(List<ManifestCommittable> committables)
{
Set<BinaryRow> partitions = new HashSet<>();
boolean endInput = false;
for (ManifestCommittable committable : committables) {
@@ -141,6 +166,58 @@ public class PartitionMarkDone implements
PartitionListener {
markDone(trigger.donePartitions(endInput), actions);
}
+ private void markDoneByWatermark(List<ManifestCommittable> committables) {
+ // extract watermarks from committables and update partition watermarks
+ Tuple2<Map<BinaryRow, Long>, Boolean> extractedWatermarks =
+ extractPartitionWatermarks(committables);
+ Map<BinaryRow, Long> partitionWatermarks = extractedWatermarks.f0;
+ boolean endInput = extractedWatermarks.f1;
+ Optional<Long> latestWatermark =
partitionWatermarks.values().stream().max(Long::compareTo);
+
+ if (!latestWatermark.isPresent()) {
+ LOG.warn("No watermark found in this batch of committables, skip
partition mark done.");
+ return;
+ }
+
+ partitionWatermarks.forEach(
+ (row, value) -> {
+ String partition =
+ PartitionPathUtils.generatePartitionPath(
+ partitionComputer.generatePartValues(row));
+ trigger.notifyPartition(partition, value);
+ });
+
+ markDone(trigger.donePartitions(endInput, latestWatermark.get(),
true), actions);
+ }
+
+ private Tuple2<Map<BinaryRow, Long>, Boolean> extractPartitionWatermarks(
+ List<ManifestCommittable> committables) {
+ boolean endInput = false;
+ Map<BinaryRow, Long> partitionWatermarks = new HashMap<>();
+ for (ManifestCommittable committable : committables) {
+ Long watermark = committable.watermark();
+ if (watermark != null) {
+ for (CommitMessage commitMessage :
committable.fileCommittables()) {
+ CommitMessageImpl message = (CommitMessageImpl)
commitMessage;
+ if (waitCompaction
+ || !message.indexIncrement().isEmpty()
+ || !message.newFilesIncrement().isEmpty()) {
+ partitionWatermarks.compute(
+ message.partition(),
+ (partition, old) ->
+ old == null ? watermark :
Math.max(old, watermark));
+ }
+ }
+ }
+
+ if (committable.identifier() == Long.MAX_VALUE) {
+ endInput = true;
+ }
+ }
+
+ return Tuple2.of(partitionWatermarks, endInput);
+ }
+
public static void markDone(List<String> partitions,
List<PartitionMarkDoneAction> actions) {
for (String partition : partitions) {
try {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
index 0094c55118..1a6605cd1c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
@@ -111,11 +111,16 @@ public class PartitionMarkDoneTrigger {
}
public List<String> donePartitions(boolean endInput) {
- return donePartitions(endInput, System.currentTimeMillis());
+ return donePartitions(endInput, System.currentTimeMillis(), false);
}
- @VisibleForTesting
List<String> donePartitions(boolean endInput, long currentTimeMillis) {
+ return donePartitions(endInput, currentTimeMillis, false);
+ }
+
+ @VisibleForTesting
+ List<String> donePartitions(
+ boolean endInput, long currentTimeMillis, boolean
watermarkEnabled) {
if (endInput && markDoneWhenEndInput) {
return new ArrayList<>(pendingPartitions.keySet());
}
@@ -131,11 +136,21 @@ public class PartitionMarkDoneTrigger {
String partition = entry.getKey();
long lastUpdateTime = entry.getValue();
- long partitionStartTime =
- extractDateTime(partition)
- .atZone(ZoneId.systemDefault())
- .toInstant()
- .toEpochMilli();
+ long partitionStartTime;
+ if (watermarkEnabled) {
+ // watermark should be compared as UTC time
+ partitionStartTime =
+ extractDateTime(partition)
+ .atZone(ZoneId.of("UTC"))
+ .toInstant()
+ .toEpochMilli();
+ } else {
+ partitionStartTime =
+ extractDateTime(partition)
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+ }
long partitionEndTime = partitionStartTime + timeInterval;
lastUpdateTime = Math.max(lastUpdateTime, partitionEndTime);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/WatermarkPartitionMarkDoneTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/WatermarkPartitionMarkDoneTest.java
new file mode 100644
index 0000000000..0b292d4da6
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/WatermarkPartitionMarkDoneTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.partition;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sink.Committer;
+import org.apache.paimon.flink.sink.StoreCommitter;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.partition.file.SuccessFile;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Collections.emptyList;
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FlinkConnectorOptions.PartitionMarkDoneActionMode}. */
+public class WatermarkPartitionMarkDoneTest extends TableTestBase {
+ @Test
+ public void testWaterMarkPartitionMarkDone() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.STRING())
+ .column("b", DataTypes.INT())
+ .column("c", DataTypes.INT())
+ .partitionKeys("a")
+ .primaryKey("a", "b")
+ .option(PARTITION_MARK_DONE_ACTION.key(),
"success-file")
+ .option(FILE_FORMAT.key(),
CoreOptions.FILE_FORMAT_AVRO)
+
.option(FlinkConnectorOptions.PARTITION_MARK_DONE_MODE.key(), "watermark")
+
.option(FlinkConnectorOptions.PARTITION_TIME_INTERVAL.key(), "1 h")
+
.option(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE.key(), "15 min")
+
.option(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyy-MM-dd HH")
+ .option(BUCKET.key(), "1")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+
+ TableWriteImpl<?> write = table.newWrite("user1");
+ TableCommitImpl commit = table.newCommit("user1");
+ OperatorMetricGroup metricGroup =
UnregisteredMetricsGroup.createOperatorMetricGroup();
+ StoreCommitter committer =
+ new StoreCommitter(
+ table,
+ commit,
+ Committer.createContext(
+ "user1",
+ metricGroup,
+ true,
+ false,
+ new
PartitionMarkDoneTest.MockOperatorStateStore(),
+ 1,
+ 1));
+
+ write.write(GenericRow.of(BinaryString.fromString("2025-03-01 12"), 1,
1));
+ write.write(GenericRow.of(BinaryString.fromString("2025-03-01 13"), 1,
1));
+ write.write(GenericRow.of(BinaryString.fromString("2025-03-01 14"), 1,
1));
+ ManifestCommittable committable1 =
+ new ManifestCommittable(
+ 1L,
+ LocalDateTime.parse("2025-03-01T12:50:30.00")
+ .atZone(ZoneId.of("UTC"))
+ .toInstant()
+ .toEpochMilli());
+
+ write.prepareCommit(true,
1L).forEach(committable1::addFileCommittable);
+ committer.commit(Collections.singletonList(committable1));
+
+ // No partition will be marked done.
+ validatePartitions(
+ table,
+ emptyList(),
+ Arrays.asList("2025-03-01 12", "2025-03-01 13", "2025-03-01
14"));
+
+ write.write(GenericRow.of(BinaryString.fromString("2025-03-01 15"), 1,
1));
+ ManifestCommittable committable2 =
+ new ManifestCommittable(
+ 2L,
+ LocalDateTime.parse("2025-03-01T14:30:30.00")
+ .atZone(ZoneId.of("UTC"))
+ .toInstant()
+ .toEpochMilli());
+ write.prepareCommit(true,
2L).forEach(committable2::addFileCommittable);
+ committer.commit(Collections.singletonList(committable2));
+
+ // Partitions before 2025-03-01T13:15 will be marked done.
+ validatePartitions(
+ table,
+ Arrays.asList("2025-03-01 12", "2025-03-01 13"),
+ Arrays.asList("2025-03-01 14", "2025-03-01 15"));
+
+ write.write(GenericRow.of(BinaryString.fromString("2025-03-01 16"), 2,
1));
+ ManifestCommittable committable3 =
+ new ManifestCommittable(
+ 3L,
+ LocalDateTime.parse("2025-03-01T16:20:30.00")
+ .atZone(ZoneId.of("UTC"))
+ .toInstant()
+ .toEpochMilli());
+ write.prepareCommit(true,
3L).forEach(committable3::addFileCommittable);
+ committer.commit(Collections.singletonList(committable3));
+
+ // Partitions before 2025-03-01T15:15 will be marked done.
+ validatePartitions(
+ table,
+ Arrays.asList("2025-03-01 12", "2025-03-01 13", "2025-03-01
14", "2025-03-01 15"),
+ Collections.singletonList("2025-03-01 16"));
+
+ committer.close();
+ }
+
+ private void validatePartitions(
+ FileStoreTable table, List<String> donePartitions, List<String>
pendingPartitions)
+ throws Exception {
+ for (String partition : donePartitions) {
+ LocalFileIO fileIO = new LocalFileIO();
+ Path successPath =
+ new Path(table.location(), String.format("a=%s/_SUCCESS",
partition));
+ SuccessFile successFile = SuccessFile.safelyFromPath(fileIO,
successPath);
+ assertThat(successFile).isNotNull();
+ }
+
+ for (String partition : pendingPartitions) {
+ LocalFileIO fileIO = new LocalFileIO();
+ Path dir = new Path(table.location(), String.format("a=%s",
partition));
+ assertThat(fileIO.exists(dir)).isTrue();
+ Path successPath =
+ new Path(table.location(), String.format("a=%s/_SUCCESS",
partition));
+ SuccessFile successFile = SuccessFile.safelyFromPath(fileIO,
successPath);
+ assertThat(successFile).isNull();
+ }
+ }
+}