This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e265ad65 [fluss-common] Validate partition time when creating 
partition on auto partition table. (#2940)
0e265ad65 is described below

commit 0e265ad65db1b29098a95162f1da38814d2abe54
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Mar 27 14:24:24 2026 +0800

    [fluss-common] Validate partition time when creating partition on auto 
partition table. (#2940)
---
 .../client/write/DynamicPartitionCreator.java      | 17 ++++-
 .../apache/fluss/client/write/WriterClient.java    |  4 +-
 .../fluss/client/admin/FlussAdminITCase.java       | 66 ++++++++++++++++++++
 .../client/table/AutoPartitionedTableITCase.java   | 35 +++++++++++
 .../org/apache/fluss/utils/PartitionUtils.java     | 72 ++++++++++++++++++++++
 .../fluss/flink/catalog/FlinkCatalogITCase.java    | 17 +++--
 .../fluss/flink/source/FlinkTableSourceITCase.java | 15 ++++-
 .../server/coordinator/AutoPartitionManager.java   |  1 +
 .../server/coordinator/CoordinatorService.java     | 10 ++-
 9 files changed, 223 insertions(+), 14 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
index d1d23d39f..7a392f4af 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
@@ -24,6 +24,7 @@ import org.apache.fluss.exception.PartitionNotExistException;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.utils.AutoPartitionStrategy;
 import org.apache.fluss.utils.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -38,6 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 
 import static org.apache.fluss.utils.ExceptionUtils.stripCompletionException;
+import static org.apache.fluss.utils.PartitionUtils.validateAutoPartitionTime;
 import static org.apache.fluss.utils.Preconditions.checkArgument;
 
 /** A creator to create partition when dynamic partition create enable for 
table. */
@@ -64,7 +66,9 @@ public class DynamicPartitionCreator {
     }
 
     public void checkAndCreatePartitionAsync(
-            PhysicalTablePath physicalTablePath, List<String> partitionKeys) {
+            PhysicalTablePath physicalTablePath,
+            List<String> partitionKeys,
+            AutoPartitionStrategy autoPartitionStrategy) {
         String partitionName = physicalTablePath.getPartitionName();
         if (partitionName == null) {
             // no need to check and create partition
@@ -83,12 +87,20 @@ public class DynamicPartitionCreator {
                 // if the partition exists, we should skip creating it.
                 LOG.debug("Partition {} already exists, skipping.", 
physicalTablePath);
             } else {
+                // Validate early, before touching any state.
+                ResolvedPartitionSpec resolvedPartitionSpec =
+                        ResolvedPartitionSpec.fromPartitionName(partitionKeys, 
partitionName);
+                validateAutoPartitionTime(
+                        resolvedPartitionSpec.toPartitionSpec(),
+                        partitionKeys,
+                        autoPartitionStrategy);
+
                 // create partition if not exists.
                 // partition may not exist, we should try to create it.
                 if (inflightPartitionsToCreate.add(physicalTablePath)) {
                     // if the partition is not in inflightPartitionsToCreate, 
we should create it.
                     // this means that the partition is not being created by 
other threads.
-                    LOG.info("Dynamically creating partition partition for 
{}", physicalTablePath);
+                    LOG.info("Dynamically creating partition for {}", 
physicalTablePath);
                     createPartition(physicalTablePath, partitionKeys);
                 } else {
                     // if the partition is already in 
inflightPartitionsToCreate, we should skip
@@ -104,7 +116,6 @@ public class DynamicPartitionCreator {
         boolean idExist = false;
         // force an IO to check whether the partition exists
         try {
-            // force an IO to check whether the partition exists
             idExist = 
metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath);
         } catch (Exception e) {
             Throwable t = ExceptionUtils.stripExecutionException(e);
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
index 0e184f09f..351c7c065 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
@@ -177,7 +177,9 @@ public class WriterClient {
             TableInfo tableInfo = record.getTableInfo();
             PhysicalTablePath physicalTablePath = 
record.getPhysicalTablePath();
             dynamicPartitionCreator.checkAndCreatePartitionAsync(
-                    physicalTablePath, tableInfo.getPartitionKeys());
+                    physicalTablePath,
+                    tableInfo.getPartitionKeys(),
+                    tableInfo.getTableConfig().getAutoPartitionStrategy());
 
             // maybe create bucket assigner.
             Cluster cluster = metadataUpdater.getCluster();
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index ad3bfd775..19baf8cee 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -90,6 +90,7 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1340,6 +1341,71 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                         String.valueOf(currentYear)));
     }
 
+    @Test
+    void testCreateInvalidPartitionForAutoPartitionedTable() throws Exception {
+        String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
+        // numToRetain defaults to 7; with DAY unit, anything older than 
(today - 7 days) is
+        // out-of-date.
+        TableDescriptor partitionedTable =
+                TableDescriptor.builder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("id", DataTypes.STRING())
+                                        .column("name", DataTypes.STRING())
+                                        .column("pt", DataTypes.STRING())
+                                        .build())
+                        .distributedBy(3, "id")
+                        .partitionedBy("pt")
+                        .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true)
+                        .property(
+                                ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
+                                AutoPartitionTimeUnit.DAY)
+                        .build();
+        TablePath tablePath = TablePath.of(dbName, 
"test_create_invalid_partition_auto_table");
+        admin.createTable(tablePath, partitionedTable, true).get();
+        FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
+
+        LocalDate today = LocalDate.now();
+        DateTimeFormatter dayFormatter = 
DateTimeFormatter.ofPattern("yyyyMMdd");
+
+        // partition value does not match expected DAY format 'yyyyMMdd'
+        assertThatThrownBy(
+                        () ->
+                                admin.createPartition(
+                                                tablePath,
+                                                newPartitionSpec("pt", 
"2024-03-25"),
+                                                false)
+                                        .get())
+                .cause()
+                .isInstanceOf(InvalidPartitionException.class)
+                .hasMessageContaining("does not match the expected format 
'yyyyMMdd'")
+                .hasMessageContaining("DAY");
+
+        // (today - 8 days) is beyond the retention window of 7, should be 
rejected.
+        String outOfDatePartition = today.minusDays(8).format(dayFormatter);
+        assertThatThrownBy(
+                        () ->
+                                admin.createPartition(
+                                                tablePath,
+                                                newPartitionSpec("pt", 
outOfDatePartition),
+                                                false)
+                                        .get())
+                .cause()
+                .isInstanceOf(InvalidPartitionException.class)
+                .hasMessageContaining("is out-of-date")
+                .hasMessageContaining("earliest retained partition");
+
+        // (today - 7 days) is exactly at the retention boundary and should be 
accepted.
+        String boundaryPartition = today.minusDays(7).format(dayFormatter);
+        admin.createPartition(tablePath, newPartitionSpec("pt", 
boundaryPartition), false).get();
+        assertPartitionInfo(
+                admin.listPartitionInfos(tablePath).get(),
+                Arrays.asList(
+                        boundaryPartition,
+                        today.format(dayFormatter),
+                        today.plusDays(1).format(dayFormatter)));
+    }
+
     @Test
     void testBootstrapServerConfigAsTabletServer() throws Exception {
         Configuration newConf = clientConf;
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java
index b4f36da42..8cf4014c3 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java
@@ -27,6 +27,7 @@ import org.apache.fluss.client.table.writer.AppendWriter;
 import org.apache.fluss.client.table.writer.UpsertWriter;
 import org.apache.fluss.config.AutoPartitionTimeUnit;
 import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.exception.InvalidPartitionException;
 import org.apache.fluss.exception.PartitionNotExistException;
 import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.PhysicalTablePath;
@@ -45,7 +46,9 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
+import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -379,6 +382,38 @@ class AutoPartitionedTableITCase extends 
ClientToServerITCaseBase {
         verifyPartitionLogs(table, schema.getRowType(), 
expectPartitionAppendRows);
     }
 
+    @Test
+    void testWriteToInvalidPartitionShouldThrowException() throws Exception {
+        // Enable dynamic partition creation so the writer attempts to create 
the partition.
+        
clientConf.set(ConfigOptions.CLIENT_WRITER_DYNAMIC_CREATE_PARTITION_ENABLED, 
true);
+        TablePath tablePath = TablePath.of("test_db_1", 
"test_write_invalid_format_partition");
+        createPartitionedTable(tablePath, true);
+        FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
+
+        Table table = conn.getTable(tablePath);
+        UpsertWriter upsertWriter = table.newUpsert().createWriter();
+
+        // Write a row with a partition value that doesn't match the YEAR 
format 'yyyy'.
+        assertThatThrownBy(() -> upsertWriter.upsert(row(1, "a", 
"2024-03-25")))
+                .rootCause()
+                .isInstanceOf(InvalidPartitionException.class)
+                .hasMessageContaining(
+                        "does not match the expected format 'yyyy' for 
auto-partition time unit 'YEAR'");
+
+        // (today - 8 days) is beyond the default retention window of 7, 
should be rejected.
+        String outOfYearPartition =
+                
LocalDate.now().minusYears(8).format(DateTimeFormatter.ofPattern("yyyy"));
+        String earliestRetainedPartition =
+                
LocalDate.now().minusYears(7).format(DateTimeFormatter.ofPattern("yyyy"));
+        assertThatThrownBy(() -> upsertWriter.upsert(row(1, "a", 
outOfYearPartition)))
+                .rootCause()
+                .isInstanceOf(InvalidPartitionException.class)
+                .hasMessageContaining(
+                        String.format(
+                                "Partition value '%s' is out-of-date. The 
earliest retained partition is '%s'",
+                                outOfYearPartition, 
earliestRetainedPartition));
+    }
+
     private Schema createPartitionedTable(TablePath tablePath, boolean 
isPrimaryTable)
             throws Exception {
         Schema.Builder schemaBuilder =
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
index 9b6f4cd6f..47df48318 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
@@ -28,8 +28,10 @@ import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.types.DataTypeRoot;
 
+import java.time.Instant;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -109,6 +111,46 @@ public class PartitionUtils {
         }
     }
 
+    /**
+     * Validates that the partition time value in the given {@link 
PartitionSpec} is valid and not
+     * out-of-date when auto-partition is enabled. Throws {@link 
InvalidPartitionException} if the
+     * format doesn't match or the partition is older than the earliest 
retained one.
+     */
+    public static void validateAutoPartitionTime(
+            PartitionSpec partitionSpec,
+            List<String> partitionKeys,
+            AutoPartitionStrategy autoPartitionStrategy) {
+        if (!autoPartitionStrategy.isAutoPartitionEnabled()) {
+            return;
+        }
+        String autoPartitionKey =
+                autoPartitionStrategy.key() != null
+                        ? autoPartitionStrategy.key()
+                        : partitionKeys.get(0);
+        String partitionTime = 
partitionSpec.getSpecMap().get(autoPartitionKey);
+        AutoPartitionTimeUnit timeUnit = autoPartitionStrategy.timeUnit();
+        if (partitionTime == null || !isValidPartitionTime(partitionTime, 
timeUnit)) {
+            throw new InvalidPartitionException(
+                    String.format(
+                            "Partition value '%s' does not match the expected 
format '%s' "
+                                    + "for auto-partition time unit '%s'.",
+                            partitionTime, getPartitionTimeFormat(timeUnit), 
timeUnit));
+        }
+        ZonedDateTime currentZonedDateTime =
+                ZonedDateTime.ofInstant(Instant.now(), 
autoPartitionStrategy.timeZone().toZoneId());
+        // Get the earliest partition time that needs to be retained.
+        String lastRetainPartitionTime =
+                generateAutoPartitionTime(
+                        currentZonedDateTime, 
-autoPartitionStrategy.numToRetain(), timeUnit);
+        if (lastRetainPartitionTime.compareTo(partitionTime) > 0) {
+            throw new InvalidPartitionException(
+                    String.format(
+                            "Partition value '%s' is out-of-date. The earliest 
retained "
+                                    + "partition is '%s'.",
+                            partitionTime, lastRetainPartitionTime));
+        }
+    }
+
     /**
      * Generate {@link ResolvedPartitionSpec} for auto partition in server. 
When we auto creating a
      * partition, we need to first generate a {@link ResolvedPartitionSpec}.
@@ -157,6 +199,36 @@ public class PartitionUtils {
         return autoPartitionFieldSpec;
     }
 
+    /** Returns the time string format pattern for the given time unit. */
+    private static String getPartitionTimeFormat(AutoPartitionTimeUnit 
timeUnit) {
+        switch (timeUnit) {
+            case YEAR:
+                return YEAR_FORMAT;
+            case QUARTER:
+                return QUARTER_FORMAT;
+            case MONTH:
+                return MONTH_FORMAT;
+            case DAY:
+                return DAY_FORMAT;
+            case HOUR:
+                return HOUR_FORMAT;
+            default:
+                throw new IllegalArgumentException("Unsupported time unit: " + 
timeUnit);
+        }
+    }
+
+    /**
+     * Returns true if the given time string matches the format expected for 
the given time unit.
+     */
+    private static boolean isValidPartitionTime(String time, 
AutoPartitionTimeUnit timeUnit) {
+        try {
+            
DateTimeFormatter.ofPattern(getPartitionTimeFormat(timeUnit)).parse(time);
+            return true;
+        } catch (DateTimeParseException e) {
+            return false;
+        }
+    }
+
     private static String getFormattedTime(ZonedDateTime zonedDateTime, String 
format) {
         return DateTimeFormatter.ofPattern(format).format(zonedDateTime);
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index d9516821e..e427ed324 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -26,6 +26,7 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.InvalidConfigException;
+import org.apache.fluss.exception.InvalidPartitionException;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.flink.FlinkConnectorOptions;
 import org.apache.fluss.metadata.DataLakeFormat;
@@ -539,12 +540,16 @@ abstract class FlinkCatalogITCase {
                         .format(DateTimeFormatter.ofPattern(datetimePattern));
 
         // 2. test add partitions.
-        tEnv.executeSql(
-                String.format(
-                        "alter table %s add partition (b = 1,c = 1,hh = %s)", 
tblName, minus3hour));
-        tEnv.executeSql(
-                String.format(
-                        "alter table %s add partition (b = 1,c = 2,hh = %s)", 
tblName, minus3hour));
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        String.format(
+                                                "alter table %s add partition 
(b = 1,c = 1,hh = %s)",
+                                                tblName, minus3hour)))
+                .rootCause()
+                .isInstanceOf(InvalidPartitionException.class)
+                .hasMessageContaining(
+                        String.format("Partition value '%s' is out-of-date.", 
minus3hour));
         tEnv.executeSql(
                 String.format(
                         "alter table %s add partition (b = 1,c = 1,hh = %s)", 
tblName, minus2hour));
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index d13dcc8db..224798443 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -59,6 +59,8 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -675,12 +677,19 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                                         tableName))
                         .collect();
         assertResultsIgnoreOrder(rowIter, expectedRowValues, false);
+        String partition1 =
+                
LocalDateTime.now().minusYears(1).format(DateTimeFormatter.ofPattern("yyyy"));
+        String partition2 =
+                
LocalDateTime.now().minusYears(2).format(DateTimeFormatter.ofPattern("yyyy"));
 
         // then create some new partitions, and write rows to the new 
partitions
-        tEnv.executeSql(String.format("alter table %s add partition (c = 
'2000')", tableName));
-        tEnv.executeSql(String.format("alter table %s add partition (c = 
'2001')", tableName));
+        tEnv.executeSql(
+                String.format("alter table %s add partition (c = '%s')", 
tableName, partition1));
+        tEnv.executeSql(
+                String.format("alter table %s add partition (c = '%s')", 
tableName, partition2));
         // write data to the new partitions
-        expectedRowValues = writeRowsToPartition(conn, tablePath, 
Arrays.asList("2000", "2001"));
+        expectedRowValues =
+                writeRowsToPartition(conn, tablePath, 
Arrays.asList(partition1, partition2));
         assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
     }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
index 5fb39037f..5f1673974 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
@@ -338,6 +338,7 @@ public class AutoPartitionManager implements AutoCloseable {
         }
 
         TablePath tablePath = tableInfo.getTablePath();
+
         for (ResolvedPartitionSpec partition : partitionsToPreCreate) {
             long tableId = tableInfo.getTableId();
             int replicaFactor = 
tableInfo.getTableConfig().getReplicationFactor();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index fbb71a7b8..1cce283af 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -218,6 +218,7 @@ import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toDatabaseChan
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableBucketOffsets;
 import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
 import static 
org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
+import static org.apache.fluss.utils.PartitionUtils.validateAutoPartitionTime;
 import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
@@ -678,10 +679,17 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         // first, validate the partition spec, and get resolved partition spec.
         PartitionSpec partitionSpec = 
getPartitionSpec(request.getPartitionSpec());
         validatePartitionSpec(tablePath, table.partitionKeys, partitionSpec, 
true);
+
+        // second, check whether the partition is out-of-date.
+        validateAutoPartitionTime(
+                partitionSpec,
+                table.partitionKeys,
+                table.getTableConfig().getAutoPartitionStrategy());
+
         ResolvedPartitionSpec partitionToCreate =
                 ResolvedPartitionSpec.fromPartitionSpec(table.partitionKeys, 
partitionSpec);
 
-        // second, generate the PartitionAssignment.
+        // third, generate the PartitionAssignment.
         int replicaFactor = table.getTableConfig().getReplicationFactor();
         TabletServerInfo[] servers = metadataCache.getLiveServers();
         Map<Integer, BucketAssignment> bucketAssignments =

Reply via email to