This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 24b61b4fd [core] Ignore partition expire exception (#2156)
24b61b4fd is described below
commit 24b61b4fd6fcfcd9134b79f4b324788698515287
Author: wgcn <[email protected]>
AuthorDate: Mon Oct 23 09:49:44 2023 +0800
[core] Ignore partition expire exception (#2156)
---
.../apache/paimon/operation/PartitionExpire.java | 4 +++
.../paimon/partition/PartitionTimeExtractor.java | 41 ++++++++++++++++------
.../paimon/operation/PartitionExpireTest.java | 24 +++++++++++++
3 files changed, 59 insertions(+), 10 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index e20c47eea..47b9f31d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -92,7 +92,11 @@ public class PartitionExpire {
List<Map<String, String>> expired = new ArrayList<>();
for (BinaryRow partition : partitions) {
Object[] array = toObjectArrayConverter.convert(partition);
+
LocalDateTime partTime = timeExtractor.extract(partitionKeys,
Arrays.asList(array));
+ if (partTime == null) {
+ continue;
+ }
if (expireDateTime.isAfter(partTime)) {
expired.add(toPartitionString(array));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
index 7b6fe5ece..caf0e7a1f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
@@ -18,6 +18,11 @@
package org.apache.paimon.partition;
+import org.apache.paimon.operation.FileStoreCommitImpl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.time.LocalDate;
@@ -32,6 +37,8 @@ import java.time.temporal.ChronoField;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static java.time.temporal.ChronoField.DAY_OF_MONTH;
import static java.time.temporal.ChronoField.HOUR_OF_DAY;
@@ -43,6 +50,7 @@ import static java.time.temporal.ChronoField.YEAR;
/** Time extractor to extract time from partition values. */
public class PartitionTimeExtractor {
+ private static final Logger LOG =
LoggerFactory.getLogger(FileStoreCommitImpl.class);
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
new DateTimeFormatterBuilder()
.appendValue(YEAR, 1, 10, SignStyle.NORMAL)
@@ -83,18 +91,31 @@ public class PartitionTimeExtractor {
}
public LocalDateTime extract(List<String> partitionKeys, List<Object>
partitionValues) {
- String timestampString;
- if (pattern == null) {
- timestampString = partitionValues.get(0).toString();
- } else {
- timestampString = pattern;
- for (int i = 0; i < partitionKeys.size(); i++) {
- timestampString =
- timestampString.replaceAll(
- "\\$" + partitionKeys.get(i),
partitionValues.get(i).toString());
+ LocalDateTime dateTime = null;
+ try {
+ String timestampString;
+ if (pattern == null) {
+ timestampString = partitionValues.get(0).toString();
+ } else {
+ timestampString = pattern;
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ timestampString =
+ timestampString.replaceAll(
+ "\\$" + partitionKeys.get(i),
+ partitionValues.get(i).toString());
+ }
}
+ dateTime = toLocalDateTime(timestampString, this.formatter);
+ } catch (Exception e) {
+ String paritionInfos =
+ IntStream.range(0, partitionKeys.size())
+ .mapToObj(i -> partitionKeys.get(i) + ":" +
partitionValues.get(i))
+ .collect(Collectors.joining(","));
+ LOG.warn(
+ "Parition {} can't be extract datetime to expire,Please
check the partition expiration configuration or manually delete the partition
using the drop-partition command. ",
+ paritionInfos);
}
- return toLocalDateTime(timestampString, this.formatter);
+ return dateTime;
}
private static LocalDateTime toLocalDateTime(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 19018d93b..564d17787 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -36,6 +36,7 @@ import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -95,6 +96,29 @@ public class PartitionExpireTest {
"Can not set 'partition.expiration-time' for
non-partitioned table");
}
+ @Test
+ public void testIllegalPartition() throws Exception {
+ SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
path);
+ schemaManager.createTable(
+ new Schema(
+ RowType.of(VarCharType.STRING_TYPE,
VarCharType.STRING_TYPE).getFields(),
+ Collections.singletonList("f0"),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ ""));
+ table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+ write("20230101", "11");
+ write("abcd", "12");
+ write("20230101", "12");
+ write("20230103", "31");
+ write("20230103", "32");
+ write("20230105", "51");
+ PartitionExpire expire = newExpire();
+ expire.setLastCheck(date(1));
+ Assertions.assertDoesNotThrow(() -> expire.expire(date(8),
Long.MAX_VALUE));
+ assertThat(read()).containsExactlyInAnyOrder("abcd:12");
+ }
+
@Test
public void test() throws Exception {
SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
path);