This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ba4b951e1 [core] Support watermark partition markdone mode (#5284)
0ba4b951e1 is described below

commit 0ba4b951e112ccb308e574c46c293e11fab47837
Author: xiangyu0xf <[email protected]>
AuthorDate: Tue Mar 25 15:53:58 2025 +0800

    [core] Support watermark partition markdone mode (#5284)
---
 .../generated/flink_connector_configuration.html   |   6 +
 .../apache/paimon/flink/FlinkConnectorOptions.java |  34 ++++
 .../flink/sink/partition/PartitionMarkDone.java    |  81 +++++++++-
 .../sink/partition/PartitionMarkDoneTrigger.java   |  29 +++-
 .../partition/WatermarkPartitionMarkDoneTest.java  | 172 +++++++++++++++++++++
 5 files changed, 313 insertions(+), 9 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 85cfdae55d..6c273e91e1 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -86,6 +86,12 @@ under the License.
             <td>Duration</td>
             <td>Set a time duration when a partition has no new data after 
this time duration, mark the done status to indicate that the data is 
ready.</td>
         </tr>
+        <tr>
+            <td><h5>partition.mark-done-action.mode</h5></td>
+            <td style="word-wrap: break-word;">process-time</td>
+            <td><p>Enum</p></td>
+            <td>How to trigger partition mark done action.<br /><br />Possible 
values:<ul><li>"process-time": Based on the time of the machine, mark the 
partition done once the processing time passes period time plus 
delay.</li><li>"watermark": Based on the watermark of the input, mark the 
partition done once the watermark passes period time plus delay.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>partition.time-interval</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index a9eaddae69..e63356add8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -356,6 +356,12 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "Allow sink committer and writer operator to be 
chained together");
 
+    public static final ConfigOption<PartitionMarkDoneActionMode> 
PARTITION_MARK_DONE_MODE =
+            key("partition.mark-done-action.mode")
+                    .enumType(PartitionMarkDoneActionMode.class)
+                    .defaultValue(PartitionMarkDoneActionMode.PROCESS_TIME)
+                    .withDescription("How to trigger partition mark done 
action.");
+
     public static final ConfigOption<Duration> PARTITION_IDLE_TIME_TO_DONE =
             key("partition.idle-time-to-done")
                     .durationType()
@@ -544,4 +550,32 @@ public class FlinkConnectorOptions {
             return text(description);
         }
     }
+
+    /** The mode for partition mark done. */
+    public enum PartitionMarkDoneActionMode implements DescribedEnum {
+        PROCESS_TIME(
+                "process-time",
+                "Based on the time of the machine, mark the partition done 
once the processing time passes period time plus delay."),
+        WATERMARK(
+                "watermark",
+                "Based on the watermark of the input, mark the partition done 
once the watermark passes period time plus delay.");
+
+        private final String value;
+        private final String description;
+
+        PartitionMarkDoneActionMode(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index 6f360c7823..7e7aa57495 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink.partition;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.data.BinaryRow;
+import 
org.apache.paimon.flink.FlinkConnectorOptions.PartitionMarkDoneActionMode;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
@@ -32,24 +33,33 @@ import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.PartitionPathUtils;
 
 import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_MODE;
 
 /** Mark partition done. */
 public class PartitionMarkDone implements PartitionListener {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionMarkDone.class);
+
     private final InternalRowPartitionComputer partitionComputer;
     private final PartitionMarkDoneTrigger trigger;
     private final List<PartitionMarkDoneAction> actions;
     private final boolean waitCompaction;
+    private final PartitionMarkDoneActionMode partitionMarkDoneActionMode;
 
     public static Optional<PartitionMarkDone> create(
             ClassLoader cl,
@@ -86,7 +96,12 @@ public class PartitionMarkDone implements PartitionListener {
                                 || coreOptions.mergeEngine() == 
MergeEngine.FIRST_ROW);
 
         return Optional.of(
-                new PartitionMarkDone(partitionComputer, trigger, actions, 
waitCompaction));
+                new PartitionMarkDone(
+                        partitionComputer,
+                        trigger,
+                        actions,
+                        waitCompaction,
+                        options.get(PARTITION_MARK_DONE_MODE)));
     }
 
     private static boolean disablePartitionMarkDone(
@@ -108,15 +123,25 @@ public class PartitionMarkDone implements 
PartitionListener {
             InternalRowPartitionComputer partitionComputer,
             PartitionMarkDoneTrigger trigger,
             List<PartitionMarkDoneAction> actions,
-            boolean waitCompaction) {
+            boolean waitCompaction,
+            PartitionMarkDoneActionMode partitionMarkDoneActionMode) {
         this.partitionComputer = partitionComputer;
         this.trigger = trigger;
         this.actions = actions;
         this.waitCompaction = waitCompaction;
+        this.partitionMarkDoneActionMode = partitionMarkDoneActionMode;
     }
 
     @Override
     public void notifyCommittable(List<ManifestCommittable> committables) {
+        if (partitionMarkDoneActionMode == 
PartitionMarkDoneActionMode.WATERMARK) {
+            markDoneByWatermark(committables);
+        } else {
+            markDoneByProcessTime(committables);
+        }
+    }
+
+    private void markDoneByProcessTime(List<ManifestCommittable> committables) 
{
         Set<BinaryRow> partitions = new HashSet<>();
         boolean endInput = false;
         for (ManifestCommittable committable : committables) {
@@ -141,6 +166,58 @@ public class PartitionMarkDone implements 
PartitionListener {
         markDone(trigger.donePartitions(endInput), actions);
     }
 
+    private void markDoneByWatermark(List<ManifestCommittable> committables) {
+        // extract watermarks from committables and update partition watermarks
+        Tuple2<Map<BinaryRow, Long>, Boolean> extractedWatermarks =
+                extractPartitionWatermarks(committables);
+        Map<BinaryRow, Long> partitionWatermarks = extractedWatermarks.f0;
+        boolean endInput = extractedWatermarks.f1;
+        Optional<Long> latestWatermark = 
partitionWatermarks.values().stream().max(Long::compareTo);
+
+        if (!latestWatermark.isPresent()) {
+            LOG.warn("No watermark found in this batch of committables, skip 
partition mark done.");
+            return;
+        }
+
+        partitionWatermarks.forEach(
+                (row, value) -> {
+                    String partition =
+                            PartitionPathUtils.generatePartitionPath(
+                                    partitionComputer.generatePartValues(row));
+                    trigger.notifyPartition(partition, value);
+                });
+
+        markDone(trigger.donePartitions(endInput, latestWatermark.get(), 
true), actions);
+    }
+
+    private Tuple2<Map<BinaryRow, Long>, Boolean> extractPartitionWatermarks(
+            List<ManifestCommittable> committables) {
+        boolean endInput = false;
+        Map<BinaryRow, Long> partitionWatermarks = new HashMap<>();
+        for (ManifestCommittable committable : committables) {
+            Long watermark = committable.watermark();
+            if (watermark != null) {
+                for (CommitMessage commitMessage : 
committable.fileCommittables()) {
+                    CommitMessageImpl message = (CommitMessageImpl) 
commitMessage;
+                    if (waitCompaction
+                            || !message.indexIncrement().isEmpty()
+                            || !message.newFilesIncrement().isEmpty()) {
+                        partitionWatermarks.compute(
+                                message.partition(),
+                                (partition, old) ->
+                                        old == null ? watermark : 
Math.max(old, watermark));
+                    }
+                }
+            }
+
+            if (committable.identifier() == Long.MAX_VALUE) {
+                endInput = true;
+            }
+        }
+
+        return Tuple2.of(partitionWatermarks, endInput);
+    }
+
     public static void markDone(List<String> partitions, 
List<PartitionMarkDoneAction> actions) {
         for (String partition : partitions) {
             try {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
index 0094c55118..1a6605cd1c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
@@ -111,11 +111,16 @@ public class PartitionMarkDoneTrigger {
     }
 
     public List<String> donePartitions(boolean endInput) {
-        return donePartitions(endInput, System.currentTimeMillis());
+        return donePartitions(endInput, System.currentTimeMillis(), false);
     }
 
-    @VisibleForTesting
     List<String> donePartitions(boolean endInput, long currentTimeMillis) {
+        return donePartitions(endInput, currentTimeMillis, false);
+    }
+
+    @VisibleForTesting
+    List<String> donePartitions(
+            boolean endInput, long currentTimeMillis, boolean 
watermarkEnabled) {
         if (endInput && markDoneWhenEndInput) {
             return new ArrayList<>(pendingPartitions.keySet());
         }
@@ -131,11 +136,21 @@ public class PartitionMarkDoneTrigger {
             String partition = entry.getKey();
 
             long lastUpdateTime = entry.getValue();
-            long partitionStartTime =
-                    extractDateTime(partition)
-                            .atZone(ZoneId.systemDefault())
-                            .toInstant()
-                            .toEpochMilli();
+            long partitionStartTime;
+            if (watermarkEnabled) {
+                // watermark should be compared as UTC time
+                partitionStartTime =
+                        extractDateTime(partition)
+                                .atZone(ZoneId.of("UTC"))
+                                .toInstant()
+                                .toEpochMilli();
+            } else {
+                partitionStartTime =
+                        extractDateTime(partition)
+                                .atZone(ZoneId.systemDefault())
+                                .toInstant()
+                                .toEpochMilli();
+            }
             long partitionEndTime = partitionStartTime + timeInterval;
             lastUpdateTime = Math.max(lastUpdateTime, partitionEndTime);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/WatermarkPartitionMarkDoneTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/WatermarkPartitionMarkDoneTest.java
new file mode 100644
index 0000000000..0b292d4da6
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/WatermarkPartitionMarkDoneTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.partition;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sink.Committer;
+import org.apache.paimon.flink.sink.StoreCommitter;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.partition.file.SuccessFile;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Collections.emptyList;
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FlinkConnectorOptions.PartitionMarkDoneActionMode}. */
+public class WatermarkPartitionMarkDoneTest extends TableTestBase {
+    @Test
+    public void testWaterMarkPartitionMarkDone() throws Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.STRING())
+                        .column("b", DataTypes.INT())
+                        .column("c", DataTypes.INT())
+                        .partitionKeys("a")
+                        .primaryKey("a", "b")
+                        .option(PARTITION_MARK_DONE_ACTION.key(), 
"success-file")
+                        .option(FILE_FORMAT.key(), 
CoreOptions.FILE_FORMAT_AVRO)
+                        
.option(FlinkConnectorOptions.PARTITION_MARK_DONE_MODE.key(), "watermark")
+                        
.option(FlinkConnectorOptions.PARTITION_TIME_INTERVAL.key(), "1 h")
+                        
.option(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE.key(), "15 min")
+                        
.option(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyy-MM-dd HH")
+                        .option(BUCKET.key(), "1")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+
+        TableWriteImpl<?> write = table.newWrite("user1");
+        TableCommitImpl commit = table.newCommit("user1");
+        OperatorMetricGroup metricGroup = 
UnregisteredMetricsGroup.createOperatorMetricGroup();
+        StoreCommitter committer =
+                new StoreCommitter(
+                        table,
+                        commit,
+                        Committer.createContext(
+                                "user1",
+                                metricGroup,
+                                true,
+                                false,
+                                new 
PartitionMarkDoneTest.MockOperatorStateStore(),
+                                1,
+                                1));
+
+        write.write(GenericRow.of(BinaryString.fromString("2025-03-01 12"), 1, 
1));
+        write.write(GenericRow.of(BinaryString.fromString("2025-03-01 13"), 1, 
1));
+        write.write(GenericRow.of(BinaryString.fromString("2025-03-01 14"), 1, 
1));
+        ManifestCommittable committable1 =
+                new ManifestCommittable(
+                        1L,
+                        LocalDateTime.parse("2025-03-01T12:50:30.00")
+                                .atZone(ZoneId.of("UTC"))
+                                .toInstant()
+                                .toEpochMilli());
+
+        write.prepareCommit(true, 
1L).forEach(committable1::addFileCommittable);
+        committer.commit(Collections.singletonList(committable1));
+
+        // No partition will be marked done.
+        validatePartitions(
+                table,
+                emptyList(),
+                Arrays.asList("2025-03-01 12", "2025-03-01 13", "2025-03-01 
14"));
+
+        write.write(GenericRow.of(BinaryString.fromString("2025-03-01 15"), 1, 
1));
+        ManifestCommittable committable2 =
+                new ManifestCommittable(
+                        2L,
+                        LocalDateTime.parse("2025-03-01T14:30:30.00")
+                                .atZone(ZoneId.of("UTC"))
+                                .toInstant()
+                                .toEpochMilli());
+        write.prepareCommit(true, 
2L).forEach(committable2::addFileCommittable);
+        committer.commit(Collections.singletonList(committable2));
+
+        // Partitions before 2025-03-01T13:15 will be marked done.
+        validatePartitions(
+                table,
+                Arrays.asList("2025-03-01 12", "2025-03-01 13"),
+                Arrays.asList("2025-03-01 14", "2025-03-01 15"));
+
+        write.write(GenericRow.of(BinaryString.fromString("2025-03-01 16"), 2, 
1));
+        ManifestCommittable committable3 =
+                new ManifestCommittable(
+                        3L,
+                        LocalDateTime.parse("2025-03-01T16:20:30.00")
+                                .atZone(ZoneId.of("UTC"))
+                                .toInstant()
+                                .toEpochMilli());
+        write.prepareCommit(true, 
3L).forEach(committable3::addFileCommittable);
+        committer.commit(Collections.singletonList(committable3));
+
+        // Partitions before 2025-03-01T15:15 will be marked done.
+        validatePartitions(
+                table,
+                Arrays.asList("2025-03-01 12", "2025-03-01 13", "2025-03-01 
14", "2025-03-01 15"),
+                Collections.singletonList("2025-03-01 16"));
+
+        committer.close();
+    }
+
+    private void validatePartitions(
+            FileStoreTable table, List<String> donePartitions, List<String> 
pendingPartitions)
+            throws Exception {
+        for (String partition : donePartitions) {
+            LocalFileIO fileIO = new LocalFileIO();
+            Path successPath =
+                    new Path(table.location(), String.format("a=%s/_SUCCESS", 
partition));
+            SuccessFile successFile = SuccessFile.safelyFromPath(fileIO, 
successPath);
+            assertThat(successFile).isNotNull();
+        }
+
+        for (String partition : pendingPartitions) {
+            LocalFileIO fileIO = new LocalFileIO();
+            Path dir = new Path(table.location(), String.format("a=%s", 
partition));
+            assertThat(fileIO.exists(dir)).isTrue();
+            Path successPath =
+                    new Path(table.location(), String.format("a=%s/_SUCCESS", 
partition));
+            SuccessFile successFile = SuccessFile.safelyFromPath(fileIO, 
successPath);
+            assertThat(successFile).isNull();
+        }
+    }
+}

Reply via email to