This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 24b61b4fd [core] Ignore partition expire exception (#2156)
24b61b4fd is described below

commit 24b61b4fd6fcfcd9134b79f4b324788698515287
Author: wgcn <[email protected]>
AuthorDate: Mon Oct 23 09:49:44 2023 +0800

    [core] Ignore partition expire exception (#2156)
---
 .../apache/paimon/operation/PartitionExpire.java   |  4 +++
 .../paimon/partition/PartitionTimeExtractor.java   | 41 ++++++++++++++++------
 .../paimon/operation/PartitionExpireTest.java      | 24 +++++++++++++
 3 files changed, 59 insertions(+), 10 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index e20c47eea..47b9f31d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -92,7 +92,11 @@ public class PartitionExpire {
         List<Map<String, String>> expired = new ArrayList<>();
         for (BinaryRow partition : partitions) {
             Object[] array = toObjectArrayConverter.convert(partition);
+
             LocalDateTime partTime = timeExtractor.extract(partitionKeys, 
Arrays.asList(array));
+            if (partTime == null) {
+                continue;
+            }
             if (expireDateTime.isAfter(partTime)) {
                 expired.add(toPartitionString(array));
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
index 7b6fe5ece..caf0e7a1f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
@@ -18,6 +18,11 @@
 
 package org.apache.paimon.partition;
 
+import org.apache.paimon.operation.FileStoreCommitImpl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.time.LocalDate;
@@ -32,6 +37,8 @@ import java.time.temporal.ChronoField;
 import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static java.time.temporal.ChronoField.DAY_OF_MONTH;
 import static java.time.temporal.ChronoField.HOUR_OF_DAY;
@@ -43,6 +50,7 @@ import static java.time.temporal.ChronoField.YEAR;
 /** Time extractor to extract time from partition values. */
 public class PartitionTimeExtractor {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreCommitImpl.class);
     private static final DateTimeFormatter TIMESTAMP_FORMATTER =
             new DateTimeFormatterBuilder()
                     .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
@@ -83,18 +91,31 @@ public class PartitionTimeExtractor {
     }
 
     public LocalDateTime extract(List<String> partitionKeys, List<Object> 
partitionValues) {
-        String timestampString;
-        if (pattern == null) {
-            timestampString = partitionValues.get(0).toString();
-        } else {
-            timestampString = pattern;
-            for (int i = 0; i < partitionKeys.size(); i++) {
-                timestampString =
-                        timestampString.replaceAll(
-                                "\\$" + partitionKeys.get(i), 
partitionValues.get(i).toString());
+        LocalDateTime dateTime = null;
+        try {
+            String timestampString;
+            if (pattern == null) {
+                timestampString = partitionValues.get(0).toString();
+            } else {
+                timestampString = pattern;
+                for (int i = 0; i < partitionKeys.size(); i++) {
+                    timestampString =
+                            timestampString.replaceAll(
+                                    "\\$" + partitionKeys.get(i),
+                                    partitionValues.get(i).toString());
+                }
             }
+            dateTime = toLocalDateTime(timestampString, this.formatter);
+        } catch (Exception e) {
+            String paritionInfos =
+                    IntStream.range(0, partitionKeys.size())
+                            .mapToObj(i -> partitionKeys.get(i) + ":" + 
partitionValues.get(i))
+                            .collect(Collectors.joining(","));
+            LOG.warn(
+                    "Parition {} can't be extract datetime to expire,Please 
check the partition expiration configuration or manually delete the partition 
using the drop-partition command. ",
+                    paritionInfos);
         }
-        return toLocalDateTime(timestampString, this.formatter);
+        return dateTime;
     }
 
     private static LocalDateTime toLocalDateTime(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 19018d93b..564d17787 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -36,6 +36,7 @@ import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -95,6 +96,29 @@ public class PartitionExpireTest {
                         "Can not set 'partition.expiration-time' for 
non-partitioned table");
     }
 
+    @Test
+    public void testIllegalPartition() throws Exception {
+        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
path);
+        schemaManager.createTable(
+                new Schema(
+                        RowType.of(VarCharType.STRING_TYPE, 
VarCharType.STRING_TYPE).getFields(),
+                        Collections.singletonList("f0"),
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        ""));
+        table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+        write("20230101", "11");
+        write("abcd", "12");
+        write("20230101", "12");
+        write("20230103", "31");
+        write("20230103", "32");
+        write("20230105", "51");
+        PartitionExpire expire = newExpire();
+        expire.setLastCheck(date(1));
+        Assertions.assertDoesNotThrow(() -> expire.expire(date(8), 
Long.MAX_VALUE));
+        assertThat(read()).containsExactlyInAnyOrder("abcd:12");
+    }
+
     @Test
     public void test() throws Exception {
         SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
path);

Reply via email to