This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 59120b77a8 [core] Log a warning for invalid partition values instead 
of throwing an exception when enable partition mark done.  (#5978)
59120b77a8 is described below

commit 59120b77a8c088aa368753dbebfc3582e43c5d9b
Author: zhoulii <[email protected]>
AuthorDate: Thu Jul 31 13:35:43 2025 +0800

    [core] Log a warning for invalid partition values instead of throwing an 
exception when enable partition mark done.  (#5978)
---
 .../sink/listener/PartitionMarkDoneTrigger.java    | 28 ++++++++++++++++++----
 .../listener/PartitionMarkDoneTriggerTest.java     | 11 ++++-----
 2 files changed, 27 insertions(+), 12 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
index bd524acb66..439c4db3fd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
@@ -30,6 +30,8 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -43,6 +45,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
@@ -52,6 +55,7 @@ import static 
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFro
 /** Trigger to mark partitions done with streaming job. */
 public class PartitionMarkDoneTrigger {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionMarkDoneTrigger.class);
     private static final ListStateDescriptor<List<String>> 
PENDING_PARTITIONS_STATE_DESC =
             new ListStateDescriptor<>(
                     "mark-done-pending-partitions",
@@ -137,16 +141,26 @@ public class PartitionMarkDoneTrigger {
 
             long lastUpdateTime = entry.getValue();
             long partitionStartTime;
+
+            Optional<LocalDateTime> partitionLocalDateTimeOpt = 
extractDateTime(partition);
+            // skip illegal partition
+            if (!partitionLocalDateTimeOpt.isPresent()) {
+                iter.remove();
+                continue;
+            }
+
             if (watermarkEnabled) {
                 // watermark should be compared as UTC time
                 partitionStartTime =
-                        extractDateTime(partition)
+                        partitionLocalDateTimeOpt
+                                .get()
                                 .atZone(ZoneId.of("UTC"))
                                 .toInstant()
                                 .toEpochMilli();
             } else {
                 partitionStartTime =
-                        extractDateTime(partition)
+                        partitionLocalDateTimeOpt
+                                .get()
                                 .atZone(ZoneId.systemDefault())
                                 .toInstant()
                                 .toEpochMilli();
@@ -163,11 +177,15 @@ public class PartitionMarkDoneTrigger {
     }
 
     @VisibleForTesting
-    LocalDateTime extractDateTime(String partition) {
+    Optional<LocalDateTime> extractDateTime(String partition) {
         try {
-            return timeExtractor.extract(extractPartitionSpecFromPath(new 
Path(partition)));
+            return Optional.of(
+                    timeExtractor.extract(extractPartitionSpecFromPath(new 
Path(partition))));
         } catch (DateTimeParseException e) {
-            throw new RuntimeException("Can't extract datetime from partition 
" + partition, e);
+            LOG.warn(
+                    "Can't extract datetime from partition {}, please check 
configuration items 'partition.timestamp-formatter' and 
'partition.timestamp-pattern'.",
+                    partition);
+            return Optional.empty();
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTriggerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTriggerTest.java
index acc9552ceb..84914253fc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTriggerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTriggerTest.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.partition.PartitionTimeExtractor;
-import org.apache.paimon.testutils.assertj.PaimonAssertions;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -33,7 +32,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 class PartitionMarkDoneTriggerTest {
 
@@ -155,11 +153,10 @@ class PartitionMarkDoneTriggerTest {
                         toEpochMillis("2024-02-01"),
                         true);
 
-        assertThatThrownBy(() -> trigger.extractDateTime("unknown"))
-                .satisfies(
-                        PaimonAssertions.anyCauseMatches(
-                                RuntimeException.class,
-                                "Can't extract datetime from partition 
unknown"));
+        assertThat(trigger.extractDateTime("unknown")).isEmpty();
+        trigger.notifyPartition("dt=__DEFAULT_PARTITION__", 
toEpochMillis("2024-02-01"));
+        List<String> partitions = trigger.donePartitions(false, 
toEpochMillis("2024-02-03"));
+        assertThat(partitions).isEmpty();
     }
 
     private long toEpochMillis(String dt) {

Reply via email to