This is an automated email from the ASF dual-hosted git repository.
hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 0e265ad65 [fluss-common] Validate partition time when creating
partition on auto partition table. (#2940)
0e265ad65 is described below
commit 0e265ad65db1b29098a95162f1da38814d2abe54
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Mar 27 14:24:24 2026 +0800
[fluss-common] Validate partition time when creating partition on auto
partition table. (#2940)
---
.../client/write/DynamicPartitionCreator.java | 17 ++++-
.../apache/fluss/client/write/WriterClient.java | 4 +-
.../fluss/client/admin/FlussAdminITCase.java | 66 ++++++++++++++++++++
.../client/table/AutoPartitionedTableITCase.java | 35 +++++++++++
.../org/apache/fluss/utils/PartitionUtils.java | 72 ++++++++++++++++++++++
.../fluss/flink/catalog/FlinkCatalogITCase.java | 17 +++--
.../fluss/flink/source/FlinkTableSourceITCase.java | 15 ++++-
.../server/coordinator/AutoPartitionManager.java | 1 +
.../server/coordinator/CoordinatorService.java | 10 ++-
9 files changed, 223 insertions(+), 14 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
index d1d23d39f..7a392f4af 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
@@ -24,6 +24,7 @@ import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.utils.AutoPartitionStrategy;
import org.apache.fluss.utils.ExceptionUtils;
import org.slf4j.Logger;
@@ -38,6 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import static org.apache.fluss.utils.ExceptionUtils.stripCompletionException;
+import static org.apache.fluss.utils.PartitionUtils.validateAutoPartitionTime;
import static org.apache.fluss.utils.Preconditions.checkArgument;
/** A creator to create partition when dynamic partition create enable for
table. */
@@ -64,7 +66,9 @@ public class DynamicPartitionCreator {
}
public void checkAndCreatePartitionAsync(
- PhysicalTablePath physicalTablePath, List<String> partitionKeys) {
+ PhysicalTablePath physicalTablePath,
+ List<String> partitionKeys,
+ AutoPartitionStrategy autoPartitionStrategy) {
String partitionName = physicalTablePath.getPartitionName();
if (partitionName == null) {
// no need to check and create partition
@@ -83,12 +87,20 @@ public class DynamicPartitionCreator {
// if the partition exists, we should skip creating it.
LOG.debug("Partition {} already exists, skipping.",
physicalTablePath);
} else {
+ // Validate early, before touching any state.
+ ResolvedPartitionSpec resolvedPartitionSpec =
+ ResolvedPartitionSpec.fromPartitionName(partitionKeys,
partitionName);
+ validateAutoPartitionTime(
+ resolvedPartitionSpec.toPartitionSpec(),
+ partitionKeys,
+ autoPartitionStrategy);
+
// create partition if not exists.
// partition may not exist, we should try to create it.
if (inflightPartitionsToCreate.add(physicalTablePath)) {
// if the partition is not in inflightPartitionsToCreate,
we should create it.
// this means that the partition is not being created by
other threads.
- LOG.info("Dynamically creating partition partition for
{}", physicalTablePath);
+ LOG.info("Dynamically creating partition for {}",
physicalTablePath);
createPartition(physicalTablePath, partitionKeys);
} else {
// if the partition is already in
inflightPartitionsToCreate, we should skip
@@ -104,7 +116,6 @@ public class DynamicPartitionCreator {
boolean idExist = false;
// force an IO to check whether the partition exists
try {
- // force an IO to check whether the partition exists
idExist =
metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath);
} catch (Exception e) {
Throwable t = ExceptionUtils.stripExecutionException(e);
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
index 0e184f09f..351c7c065 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
@@ -177,7 +177,9 @@ public class WriterClient {
TableInfo tableInfo = record.getTableInfo();
PhysicalTablePath physicalTablePath =
record.getPhysicalTablePath();
dynamicPartitionCreator.checkAndCreatePartitionAsync(
- physicalTablePath, tableInfo.getPartitionKeys());
+ physicalTablePath,
+ tableInfo.getPartitionKeys(),
+ tableInfo.getTableConfig().getAutoPartitionStrategy());
// maybe create bucket assigner.
Cluster cluster = metadataUpdater.getCluster();
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index ad3bfd775..19baf8cee 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -90,6 +90,7 @@ import javax.annotation.Nullable;
import java.time.Duration;
import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -1340,6 +1341,71 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
String.valueOf(currentYear)));
}
+ @Test
+ void testCreateInvalidPartitionForAutoPartitionedTable() throws Exception {
+ String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
+ // numToRetain defaults to 7; with DAY unit, anything older than
(today - 7 days) is
+ // out-of-date.
+ TableDescriptor partitionedTable =
+ TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("id", DataTypes.STRING())
+ .column("name", DataTypes.STRING())
+ .column("pt", DataTypes.STRING())
+ .build())
+ .distributedBy(3, "id")
+ .partitionedBy("pt")
+ .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED,
true)
+ .property(
+ ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
+ AutoPartitionTimeUnit.DAY)
+ .build();
+ TablePath tablePath = TablePath.of(dbName,
"test_create_invalid_partition_auto_table");
+ admin.createTable(tablePath, partitionedTable, true).get();
+ FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
+
+ LocalDate today = LocalDate.now();
+ DateTimeFormatter dayFormatter =
DateTimeFormatter.ofPattern("yyyyMMdd");
+
+ // partition value does not match expected DAY format 'yyyyMMdd'
+ assertThatThrownBy(
+ () ->
+ admin.createPartition(
+ tablePath,
+ newPartitionSpec("pt",
"2024-03-25"),
+ false)
+ .get())
+ .cause()
+ .isInstanceOf(InvalidPartitionException.class)
+ .hasMessageContaining("does not match the expected format
'yyyyMMdd'")
+ .hasMessageContaining("DAY");
+
+ // (today - 8 days) is beyond the retention window of 7, should be
rejected.
+ String outOfDatePartition = today.minusDays(8).format(dayFormatter);
+ assertThatThrownBy(
+ () ->
+ admin.createPartition(
+ tablePath,
+ newPartitionSpec("pt",
outOfDatePartition),
+ false)
+ .get())
+ .cause()
+ .isInstanceOf(InvalidPartitionException.class)
+ .hasMessageContaining("is out-of-date")
+ .hasMessageContaining("earliest retained partition");
+
+ // (today - 7 days) is exactly at the retention boundary and should be
accepted.
+ String boundaryPartition = today.minusDays(7).format(dayFormatter);
+ admin.createPartition(tablePath, newPartitionSpec("pt",
boundaryPartition), false).get();
+ assertPartitionInfo(
+ admin.listPartitionInfos(tablePath).get(),
+ Arrays.asList(
+ boundaryPartition,
+ today.format(dayFormatter),
+ today.plusDays(1).format(dayFormatter)));
+ }
+
@Test
void testBootstrapServerConfigAsTabletServer() throws Exception {
Configuration newConf = clientConf;
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java
index b4f36da42..8cf4014c3 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java
@@ -27,6 +27,7 @@ import org.apache.fluss.client.table.writer.AppendWriter;
import org.apache.fluss.client.table.writer.UpsertWriter;
import org.apache.fluss.config.AutoPartitionTimeUnit;
import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.exception.InvalidPartitionException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.PhysicalTablePath;
@@ -45,7 +46,9 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
+import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -379,6 +382,38 @@ class AutoPartitionedTableITCase extends
ClientToServerITCaseBase {
verifyPartitionLogs(table, schema.getRowType(),
expectPartitionAppendRows);
}
+ @Test
+ void testWriteToInvalidPartitionShouldThrowException() throws Exception {
+ // Enable dynamic partition creation so the writer attempts to create
the partition.
+
clientConf.set(ConfigOptions.CLIENT_WRITER_DYNAMIC_CREATE_PARTITION_ENABLED,
true);
+ TablePath tablePath = TablePath.of("test_db_1",
"test_write_invalid_format_partition");
+ createPartitionedTable(tablePath, true);
+ FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
+
+ Table table = conn.getTable(tablePath);
+ UpsertWriter upsertWriter = table.newUpsert().createWriter();
+
+ // Write a row with a partition value that doesn't match the YEAR
format 'yyyy'.
+ assertThatThrownBy(() -> upsertWriter.upsert(row(1, "a",
"2024-03-25")))
+ .rootCause()
+ .isInstanceOf(InvalidPartitionException.class)
+ .hasMessageContaining(
+ "does not match the expected format 'yyyy' for
auto-partition time unit 'YEAR'");
+
+ // (today - 8 days) is beyond the default retention window of 7,
should be rejected.
+ String outOfYearPartition =
+
LocalDate.now().minusYears(8).format(DateTimeFormatter.ofPattern("yyyy"));
+ String earliestRetainedPartition =
+
LocalDate.now().minusYears(7).format(DateTimeFormatter.ofPattern("yyyy"));
+ assertThatThrownBy(() -> upsertWriter.upsert(row(1, "a",
outOfYearPartition)))
+ .rootCause()
+ .isInstanceOf(InvalidPartitionException.class)
+ .hasMessageContaining(
+ String.format(
+ "Partition value '%s' is out-of-date. The
earliest retained partition is '%s'",
+ outOfYearPartition,
earliestRetainedPartition));
+ }
+
private Schema createPartitionedTable(TablePath tablePath, boolean
isPrimaryTable)
throws Exception {
Schema.Builder schemaBuilder =
diff --git
a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
index 9b6f4cd6f..47df48318 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
@@ -28,8 +28,10 @@ import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.types.DataTypeRoot;
+import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -109,6 +111,46 @@ public class PartitionUtils {
}
}
+ /**
+ * Validates that the partition time value in the given {@link
PartitionSpec} is valid and not
+ * out-of-date when auto-partition is enabled. Throws {@link
InvalidPartitionException} if the
+ * format doesn't match or the partition is older than the earliest
retained one.
+ */
+ public static void validateAutoPartitionTime(
+ PartitionSpec partitionSpec,
+ List<String> partitionKeys,
+ AutoPartitionStrategy autoPartitionStrategy) {
+ if (!autoPartitionStrategy.isAutoPartitionEnabled()) {
+ return;
+ }
+ String autoPartitionKey =
+ autoPartitionStrategy.key() != null
+ ? autoPartitionStrategy.key()
+ : partitionKeys.get(0);
+ String partitionTime =
partitionSpec.getSpecMap().get(autoPartitionKey);
+ AutoPartitionTimeUnit timeUnit = autoPartitionStrategy.timeUnit();
+ if (partitionTime == null || !isValidPartitionTime(partitionTime,
timeUnit)) {
+ throw new InvalidPartitionException(
+ String.format(
+ "Partition value '%s' does not match the expected
format '%s' "
+ + "for auto-partition time unit '%s'.",
+ partitionTime, getPartitionTimeFormat(timeUnit),
timeUnit));
+ }
+ ZonedDateTime currentZonedDateTime =
+ ZonedDateTime.ofInstant(Instant.now(),
autoPartitionStrategy.timeZone().toZoneId());
+ // Get the earliest partition time that needs to be retained.
+ String lastRetainPartitionTime =
+ generateAutoPartitionTime(
+ currentZonedDateTime,
-autoPartitionStrategy.numToRetain(), timeUnit);
+ if (lastRetainPartitionTime.compareTo(partitionTime) > 0) {
+ throw new InvalidPartitionException(
+ String.format(
+ "Partition value '%s' is out-of-date. The earliest
retained "
+ + "partition is '%s'.",
+ partitionTime, lastRetainPartitionTime));
+ }
+ }
+
/**
* Generate {@link ResolvedPartitionSpec} for auto partition in server.
When we auto creating a
* partition, we need to first generate a {@link ResolvedPartitionSpec}.
@@ -157,6 +199,36 @@ public class PartitionUtils {
return autoPartitionFieldSpec;
}
+ /** Returns the time string format pattern for the given time unit. */
+ private static String getPartitionTimeFormat(AutoPartitionTimeUnit
timeUnit) {
+ switch (timeUnit) {
+ case YEAR:
+ return YEAR_FORMAT;
+ case QUARTER:
+ return QUARTER_FORMAT;
+ case MONTH:
+ return MONTH_FORMAT;
+ case DAY:
+ return DAY_FORMAT;
+ case HOUR:
+ return HOUR_FORMAT;
+ default:
+ throw new IllegalArgumentException("Unsupported time unit: " +
timeUnit);
+ }
+ }
+
+ /**
+ * Returns true if the given time string matches the format expected for
the given time unit.
+ */
+ private static boolean isValidPartitionTime(String time,
AutoPartitionTimeUnit timeUnit) {
+ try {
+
DateTimeFormatter.ofPattern(getPartitionTimeFormat(timeUnit)).parse(time);
+ return true;
+ } catch (DateTimeParseException e) {
+ return false;
+ }
+ }
+
private static String getFormattedTime(ZonedDateTime zonedDateTime, String
format) {
return DateTimeFormatter.ofPattern(format).format(zonedDateTime);
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index d9516821e..e427ed324 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -26,6 +26,7 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidConfigException;
+import org.apache.fluss.exception.InvalidPartitionException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.metadata.DataLakeFormat;
@@ -539,12 +540,16 @@ abstract class FlinkCatalogITCase {
.format(DateTimeFormatter.ofPattern(datetimePattern));
// 2. test add partitions.
- tEnv.executeSql(
- String.format(
- "alter table %s add partition (b = 1,c = 1,hh = %s)",
tblName, minus3hour));
- tEnv.executeSql(
- String.format(
- "alter table %s add partition (b = 1,c = 2,hh = %s)",
tblName, minus3hour));
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "alter table %s add partition
(b = 1,c = 1,hh = %s)",
+ tblName, minus3hour)))
+ .rootCause()
+ .isInstanceOf(InvalidPartitionException.class)
+ .hasMessageContaining(
+ String.format("Partition value '%s' is out-of-date.",
minus3hour));
tEnv.executeSql(
String.format(
"alter table %s add partition (b = 1,c = 1,hh = %s)",
tblName, minus2hour));
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index d13dcc8db..224798443 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -59,6 +59,8 @@ import javax.annotation.Nullable;
import java.time.Duration;
import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -675,12 +677,19 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
tableName))
.collect();
assertResultsIgnoreOrder(rowIter, expectedRowValues, false);
+ String partition1 =
+
LocalDateTime.now().minusYears(1).format(DateTimeFormatter.ofPattern("yyyy"));
+ String partition2 =
+
LocalDateTime.now().minusYears(2).format(DateTimeFormatter.ofPattern("yyyy"));
// then create some new partitions, and write rows to the new
partitions
- tEnv.executeSql(String.format("alter table %s add partition (c =
'2000')", tableName));
- tEnv.executeSql(String.format("alter table %s add partition (c =
'2001')", tableName));
+ tEnv.executeSql(
+ String.format("alter table %s add partition (c = '%s')",
tableName, partition1));
+ tEnv.executeSql(
+ String.format("alter table %s add partition (c = '%s')",
tableName, partition2));
// write data to the new partitions
- expectedRowValues = writeRowsToPartition(conn, tablePath,
Arrays.asList("2000", "2001"));
+ expectedRowValues =
+ writeRowsToPartition(conn, tablePath,
Arrays.asList(partition1, partition2));
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
index 5fb39037f..5f1673974 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
@@ -338,6 +338,7 @@ public class AutoPartitionManager implements AutoCloseable {
}
TablePath tablePath = tableInfo.getTablePath();
+
for (ResolvedPartitionSpec partition : partitionsToPreCreate) {
long tableId = tableInfo.getTableId();
int replicaFactor =
tableInfo.getTableConfig().getReplicationFactor();
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index fbb71a7b8..1cce283af 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -218,6 +218,7 @@ import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toDatabaseChan
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableBucketOffsets;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
import static
org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
+import static org.apache.fluss.utils.PartitionUtils.validateAutoPartitionTime;
import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -678,10 +679,17 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
// first, validate the partition spec, and get resolved partition spec.
PartitionSpec partitionSpec =
getPartitionSpec(request.getPartitionSpec());
validatePartitionSpec(tablePath, table.partitionKeys, partitionSpec,
true);
+
+ // second, check whether the partition is out-of-date.
+ validateAutoPartitionTime(
+ partitionSpec,
+ table.partitionKeys,
+ table.getTableConfig().getAutoPartitionStrategy());
+
ResolvedPartitionSpec partitionToCreate =
ResolvedPartitionSpec.fromPartitionSpec(table.partitionKeys,
partitionSpec);
- // second, generate the PartitionAssignment.
+ // third, generate the PartitionAssignment.
int replicaFactor = table.getTableConfig().getReplicationFactor();
TabletServerInfo[] servers = metadataCache.getLiveServers();
Map<Integer, BucketAssignment> bucketAssignments =