This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 41cc4140 [server] Optimize auto drop partition perfomance (#1047)
41cc4140 is described below

commit 41cc41408372d1e7ff62ad6deef1e43a26b2c340
Author: Liebing <[email protected]>
AuthorDate: Fri Jul 4 14:40:57 2025 +0800

    [server] Optimize auto drop partition perfomance (#1047)
---
 .../server/coordinator/AutoPartitionManager.java   | 100 ++++++---
 .../coordinator/AutoPartitionManagerTest.java      | 228 +++++++++++++++++----
 2 files changed, 265 insertions(+), 63 deletions(-)

diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java
index 502af8f0..ed2fac5f 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java
@@ -50,11 +50,13 @@ import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
+import java.util.NavigableMap;
 import java.util.Set;
-import java.util.TreeSet;
+import java.util.TreeMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
@@ -96,8 +98,10 @@ public class AutoPartitionManager implements AutoCloseable {
     // now, only consider day partition, todo: need to consider all partition 
unit
     private final Map<Long, Integer> autoCreateDayPartitionDelayMinutes = new 
HashMap<>();
 
+    // table id -> (value of auto partition time key -> partition name set)
+    // for single partition key, the partition name set will be null to reduce 
memory usage
     @GuardedBy("lock")
-    private final Map<Long, TreeSet<String>> partitionsByTable = new 
HashMap<>();
+    private final Map<Long, TreeMap<String, Set<String>>> partitionsByTable = 
new HashMap<>();
 
     private final Lock lock = new ReentrantLock();
 
@@ -140,11 +144,14 @@ public class AutoPartitionManager implements 
AutoCloseable {
                 lock,
                 () -> {
                     autoPartitionTables.put(tableId, tableInfo);
-                    Set<String> partitionSet =
+                    TreeMap<String, Set<String>> partitionMap =
                             partitionsByTable.computeIfAbsent(
-                                    tableInfo.getTableId(), k -> new 
TreeSet<>());
-                    checkNotNull(partitionSet, "Partition set is null.");
-                    partitionSet.addAll(partitions);
+                                    tableInfo.getTableId(), k -> new 
TreeMap<>());
+                    checkNotNull(partitionMap, "Partition map is null.");
+                    partitions.forEach(
+                            partitionName ->
+                                    addPartitionToPartitionsByTable(
+                                            tableInfo, partitionMap, 
partitionName));
                     if 
(tableInfo.getTableConfig().getAutoPartitionStrategy().timeUnit()
                             == AutoPartitionTimeUnit.DAY) {
                         // get the delay minutes to create partition
@@ -191,7 +198,10 @@ public class AutoPartitionManager implements AutoCloseable 
{
                 lock,
                 () -> {
                     if (autoPartitionTables.containsKey(tableId)) {
-                        partitionsByTable.get(tableId).add(partitionName);
+                        addPartitionToPartitionsByTable(
+                                autoPartitionTables.get(tableId),
+                                partitionsByTable.get(tableId),
+                                partitionName);
                     }
                 });
     }
@@ -224,6 +234,33 @@ public class AutoPartitionManager implements AutoCloseable 
{
         }
     }
 
+    private String extractAutoPartitionValue(TableInfo tableInfo, String 
partitionName) {
+        // for single partition key table, the full partition name is the auto 
partition value
+        if (tableInfo.getPartitionKeys().size() == 1) {
+            return partitionName;
+        }
+
+        String autoPartitionKey = 
tableInfo.getTableConfig().getAutoPartitionStrategy().key();
+        int autoPartitionKeyIndex = 
tableInfo.getPartitionKeys().indexOf(autoPartitionKey);
+        return partitionName.split("\\$")[autoPartitionKeyIndex];
+    }
+
+    private void addPartitionToPartitionsByTable(
+            TableInfo tableInfo,
+            NavigableMap<String, Set<String>> partitionMap,
+            String partitionName) {
+        if (tableInfo.getPartitionKeys().size() > 1) {
+            Set<String> partitionSet =
+                    partitionMap.computeIfAbsent(
+                            extractAutoPartitionValue(tableInfo, 
partitionName),
+                            k -> new HashSet<>());
+            checkNotNull(partitionSet, "Partition set is null.");
+            partitionSet.add(partitionName);
+        } else {
+            partitionMap.put(partitionName, null);
+        }
+    }
+
     private void doAutoPartition() {
         Instant now = clock.instant();
         inLock(lock, () -> doAutoPartition(now, autoPartitionTables.keySet(), 
false));
@@ -248,10 +285,15 @@ public class AutoPartitionManager implements 
AutoCloseable {
                     createPartitionInstant = 
now.minus(Duration.ofMinutes(delayMinutes));
                 }
             }
-            TreeSet<String> currentPartitions =
-                    partitionsByTable.computeIfAbsent(tableId, k -> new 
TreeSet<>());
+
             TableInfo tableInfo = autoPartitionTables.get(tableId);
             TablePath tablePath = tableInfo.getTablePath();
+            TreeMap<String, Set<String>> currentPartitions =
+                    partitionsByTable.computeIfAbsent(
+                            tableId,
+                            tableInfo.getPartitionKeys().size() > 1
+                                    ? k -> new TreeMap<>()
+                                    : k -> null);
             TableRegistration table;
             try {
                 table = metadataManager.getTableRegistration(tablePath);
@@ -281,7 +323,9 @@ public class AutoPartitionManager implements AutoCloseable {
     }
 
     private void createPartitions(
-            TableInfo tableInfo, Instant currentInstant, TreeSet<String> 
currentPartitions) {
+            TableInfo tableInfo,
+            Instant currentInstant,
+            TreeMap<String, Set<String>> currentPartitions) {
         // get the partitions needed to create
         List<ResolvedPartitionSpec> partitionsToPreCreate =
                 partitionNamesToPreCreate(
@@ -307,7 +351,8 @@ public class AutoPartitionManager implements AutoCloseable {
 
                 metadataManager.createPartition(
                         tablePath, tableId, partitionAssignment, partition, 
false);
-                currentPartitions.add(partition.getPartitionName());
+                // only single partition key table supports automatic creation 
of partitions
+                currentPartitions.put(partition.getPartitionName(), null);
                 LOG.info(
                         "Auto partitioning created partition {} for table 
[{}].",
                         partition,
@@ -343,7 +388,7 @@ public class AutoPartitionManager implements AutoCloseable {
             List<String> partitionKeys,
             Instant currentInstant,
             AutoPartitionStrategy autoPartitionStrategy,
-            TreeSet<String> currentPartitions) {
+            TreeMap<String, Set<String>> currentPartitions) {
         AutoPartitionTimeUnit autoPartitionTimeUnit = 
autoPartitionStrategy.timeUnit();
         ZonedDateTime currentZonedDateTime =
                 ZonedDateTime.ofInstant(
@@ -356,7 +401,7 @@ public class AutoPartitionManager implements AutoCloseable {
                     generateAutoPartition(
                             partitionKeys, currentZonedDateTime, idx, 
autoPartitionTimeUnit);
             // if the partition already exists, we don't need to create it, 
otherwise, create it
-            if (!currentPartitions.contains(partition.getPartitionName())) {
+            if (!currentPartitions.containsKey(partition.getPartitionName())) {
                 partitionsToCreate.add(partition);
             }
         }
@@ -368,14 +413,12 @@ public class AutoPartitionManager implements 
AutoCloseable {
             List<String> partitionKeys,
             Instant currentInstant,
             AutoPartitionStrategy autoPartitionStrategy,
-            NavigableSet<String> currentPartitions) {
+            NavigableMap<String, Set<String>> currentPartitions) {
         int numToRetain = autoPartitionStrategy.numToRetain();
         // negative value means not to drop partitions
         if (numToRetain < 0) {
             return;
         }
-        String autoPartitionKey =
-                partitionKeys.size() == 1 ? partitionKeys.get(0) : 
autoPartitionStrategy.key();
 
         ZonedDateTime currentZonedDateTime =
                 ZonedDateTime.ofInstant(
@@ -398,12 +441,20 @@ public class AutoPartitionManager implements 
AutoCloseable {
         // (a=2,dt=20250505,b=1) (a=2,dt=20250506,b=1) (a=2,dt=20250507,b=1)
         // then partition of pattern:
         // (a=?,dt=20250506,b=?) (a=?,dt=20250507,b=?) will be retained.
-        int timePartitionKeyIndex = partitionKeys.indexOf(autoPartitionKey);
-        // Todo: refactoring currentPartitions to sort by the partition time 
key, then it is
-        // efficient to handle large partition set.
-        for (String partitionName : new ArrayList<>(currentPartitions)) {
-            String currentTime = 
partitionName.split("\\$")[timePartitionKeyIndex];
-            if (currentTime.compareTo(lastRetainPartitionTime) < 0) {
+        Iterator<Map.Entry<String, Set<String>>> iterator =
+                
currentPartitions.headMap(lastRetainPartitionTime).entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<String, Set<String>> entry = iterator.next();
+
+            Iterator<String> dropIterator;
+            if (entry.getValue() == null) {
+                dropIterator = new 
HashSet<>(Collections.singleton(entry.getKey())).iterator();
+            } else {
+                dropIterator = entry.getValue().iterator();
+            }
+
+            while (dropIterator.hasNext()) {
+                String partitionName = dropIterator.next();
                 // drop the partition
                 try {
                     metadataManager.dropPartition(
@@ -418,12 +469,13 @@ public class AutoPartitionManager implements 
AutoCloseable {
                 }
 
                 // only remove when zk success, this reflects to the 
partitionsByTable
-                currentPartitions.remove(partitionName);
+                dropIterator.remove();
                 LOG.info(
                         "Auto partitioning deleted partition {} for table 
[{}].",
                         partitionName,
                         tablePath);
             }
+            iterator.remove();
         }
     }
 
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java
index 5b94546e..b015f709 100644
--- 
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java
@@ -83,7 +83,7 @@ class AutoPartitionManagerTest {
     }
 
     static Stream<Arguments> parameters() {
-        // numPreCreate = 4, numRetention = 2
+        // numPreCreate = 4 (for table with single partition key only), 
numRetention = 2
         return Stream.of(
                 Arguments.of(
                         TestParams.builder(AutoPartitionTimeUnit.HOUR)
@@ -110,6 +110,33 @@ class AutoPartitionManagerTest {
                                         "2024091008",
                                         "2024091009")
                                 .build()),
+                Arguments.of(
+                        TestParams.builder(AutoPartitionTimeUnit.HOUR, true)
+                                .startTime("2024-09-10T01:00:00")
+                                // table with multiple partition keys not 
supports automatic
+                                // creation
+                                .expectedPartitions()
+                                .manualCreatedPartitions(
+                                        "2024091001$A",
+                                        "2024091001$B",
+                                        "2024091001$C",
+                                        "2024091002$A",
+                                        "2024091002$B",
+                                        "2024091003$A",
+                                        "2024091004$A",
+                                        "2024091005$A")
+                                .manualDroppedPartitions("2024091002$A")
+                                .advanceClock(c -> c.plusHours(3))
+                                // current time partition is "2024091004"
+                                .expectedPartitionsAfterAdvance(
+                                        "2024091002$B",
+                                        "2024091003$A",
+                                        "2024091004$A",
+                                        "2024091005$A")
+                                .advanceClock2(c -> c.plusHours(2))
+                                // current time partition is "2024091006"
+                                .expectedPartitionsFinal("2024091004$A", 
"2024091005$A")
+                                .build()),
                 Arguments.of(
                         TestParams.builder(AutoPartitionTimeUnit.DAY)
                                 .startTime("2024-09-10T00:00:00")
@@ -136,6 +163,26 @@ class AutoPartitionManagerTest {
                                         "20240917",
                                         "20240918")
                                 .build()),
+                Arguments.of(
+                        TestParams.builder(AutoPartitionTimeUnit.DAY, true)
+                                .startTime("2024-09-10T00:00:00")
+                                .expectedPartitions()
+                                .manualCreatedPartitions(
+                                        "20240910$A",
+                                        "20240910$B",
+                                        "20240910$C",
+                                        "20240911$A",
+                                        "20240911$B",
+                                        "20240912$A",
+                                        "20240913$A",
+                                        "20240914$A")
+                                .manualDroppedPartition("20240911$A")
+                                .advanceClock(c -> 
c.plusDays(3).plus(Duration.ofHours(23)))
+                                .expectedPartitionsAfterAdvance(
+                                        "20240911$B", "20240912$A", 
"20240913$A", "20240914$A")
+                                .advanceClock2(c -> c.plusDays(2))
+                                .expectedPartitionsFinal("20240913$A", 
"20240914$A")
+                                .build()),
                 Arguments.of(
                         TestParams.builder(AutoPartitionTimeUnit.MONTH)
                                 .startTime("2024-09-10T00:00:00")
@@ -150,6 +197,27 @@ class AutoPartitionManagerTest {
                                 .expectedPartitionsFinal(
                                         "202412", "202501", "202502", 
"202503", "202504", "202505")
                                 .build()),
+                Arguments.of(
+                        TestParams.builder(AutoPartitionTimeUnit.MONTH, true)
+                                .startTime("2024-09-10T00:00:00")
+                                .expectedPartitions()
+                                .manualCreatedPartitions(
+                                        "202409$A",
+                                        "202409$B",
+                                        "202409$C",
+                                        "202410$A",
+                                        "202410$B",
+                                        "202411$A",
+                                        "202412$A",
+                                        "202413$A")
+                                .manualDroppedPartition("202410$A")
+                                .advanceClock(c -> c.plusMonths(3))
+                                // current partition is "202412"
+                                .expectedPartitionsAfterAdvance(
+                                        "202410$B", "202411$A", "202412$A", 
"202413$A")
+                                .advanceClock2(c -> c.plusMonths(2))
+                                .expectedPartitionsFinal("202412$A", 
"202413$A")
+                                .build()),
                 Arguments.of(
                         TestParams.builder(AutoPartitionTimeUnit.QUARTER)
                                 .startTime("2024-09-10T00:00:00")
@@ -164,6 +232,22 @@ class AutoPartitionManagerTest {
                                 .expectedPartitionsFinal(
                                         "20252", "20253", "20254", "20261", 
"20262", "20263")
                                 .build()),
+                Arguments.of(
+                        TestParams.builder(AutoPartitionTimeUnit.QUARTER, true)
+                                .startTime("2024-09-10T00:00:00")
+                                .expectedPartitions()
+                                .manualCreatedPartitions(
+                                        "20243$A", "20243$B", "20243$C", 
"20244$A", "20244$B",
+                                        "20251$A", "20252$A", "20253$B", 
"20254$C")
+                                .manualDroppedPartition("20243$A")
+                                .advanceClock(c -> c.plusMonths(3 * 3))
+                                // current partition is "20252"
+                                .expectedPartitionsAfterAdvance(
+                                        "20244$A", "20244$B", "20251$A", 
"20252$A", "20253$B",
+                                        "20254$C")
+                                .advanceClock2(c -> c.plusMonths(2 * 3))
+                                .expectedPartitionsFinal("20252$A", "20253$B", 
"20254$C")
+                                .build()),
                 Arguments.of(
                         TestParams.builder(AutoPartitionTimeUnit.YEAR)
                                 .startTime("2024-09-10T00:00:00")
@@ -177,6 +261,21 @@ class AutoPartitionManagerTest {
                                 .advanceClock2(c -> c.plusYears(2))
                                 .expectedPartitionsFinal(
                                         "2027", "2028", "2029", "2030", 
"2031", "2032")
+                                .build()),
+                Arguments.of(
+                        TestParams.builder(AutoPartitionTimeUnit.YEAR, true)
+                                .startTime("2024-09-10T00:00:00")
+                                .expectedPartitions()
+                                .manualCreatedPartitions(
+                                        "2024$A", "2024$B", "2024$C", 
"2025$A", "2025$B", "2026$A",
+                                        "2027$B", "2028$C")
+                                .manualDroppedPartition("2025$B")
+                                .advanceClock(c -> c.plusYears(3))
+                                // current partition is "2027", retain "2025", 
"2026"
+                                .expectedPartitionsAfterAdvance(
+                                        "2025$A", "2026$A", "2027$B", "2028$C")
+                                .advanceClock2(c -> c.plusYears(2))
+                                .expectedPartitionsFinal("2027$B", "2028$C")
                                 .build()));
     }
 
@@ -196,7 +295,8 @@ class AutoPartitionManagerTest {
                         periodicExecutor);
         autoPartitionManager.start();
 
-        TableInfo table = createPartitionedTable(2, 4, params.timeUnit);
+        TableInfo table =
+                createPartitionedTable(2, 4, params.timeUnit, 
params.multiplePartitionKeys);
         TablePath tablePath = table.getTablePath();
         autoPartitionManager.addAutoPartitionTable(table, true);
         // the first auto-partition task is a non-periodic task
@@ -206,7 +306,6 @@ class AutoPartitionManagerTest {
         // pre-create 4 partitions including current partition
         
assertThat(partitions.keySet()).containsExactlyInAnyOrder(params.expectedPartitions);
 
-        // manually create a partition.
         int replicaFactor = table.getTableConfig().getReplicationFactor();
         Map<Integer, BucketAssignment> bucketAssignments =
                 generateAssignment(
@@ -221,22 +320,26 @@ class AutoPartitionManagerTest {
         long tableId = table.getTableId();
         PartitionAssignment partitionAssignment =
                 new PartitionAssignment(tableId, bucketAssignments);
-        metadataManager.createPartition(
-                tablePath,
-                tableId,
-                partitionAssignment,
-                fromPartitionName(table.getPartitionKeys(), 
params.manualCreatedPartition),
-                false);
-        // mock the partition is created in zk.
-        autoPartitionManager.addPartition(tableId, 
params.manualCreatedPartition);
-
-        // manually drop a partition.
-        metadataManager.dropPartition(
-                tablePath,
-                fromPartitionName(table.getPartitionKeys(), 
params.manualDroppedPartition),
-                false);
-        // mock the partition is dropped in zk.
-        autoPartitionManager.removePartition(tableId, 
params.manualDroppedPartition);
+
+        // manually create partitions.
+        for (String partitionName : params.manualCreatedPartitions) {
+            metadataManager.createPartition(
+                    tablePath,
+                    tableId,
+                    partitionAssignment,
+                    fromPartitionName(table.getPartitionKeys(), partitionName),
+                    false);
+            // mock the partition is created in zk.
+            autoPartitionManager.addPartition(tableId, partitionName);
+        }
+
+        // manually drop partitions.
+        for (String partitionName : params.manualDroppedPartitions) {
+            metadataManager.dropPartition(
+                    tablePath, fromPartitionName(table.getPartitionKeys(), 
partitionName), false);
+            // mock the partition is dropped in zk.
+            autoPartitionManager.removePartition(tableId, partitionName);
+        }
 
         clock.advanceTime(params.advanceDuration);
         periodicExecutor.triggerPeriodicScheduledTasks();
@@ -438,9 +541,10 @@ class AutoPartitionManagerTest {
 
     private static class TestParams {
         final AutoPartitionTimeUnit timeUnit;
+        final boolean multiplePartitionKeys;
         final long startTimeMs;
-        final String manualCreatedPartition;
-        final String manualDroppedPartition;
+        final String[] manualCreatedPartitions;
+        final String[] manualDroppedPartitions;
         final String[] expectedPartitions;
         final Duration advanceDuration;
         final String[] expectedPartitionsAfterAdvance;
@@ -449,18 +553,20 @@ class AutoPartitionManagerTest {
 
         private TestParams(
                 AutoPartitionTimeUnit timeUnit,
+                boolean multiplePartitionKeys,
                 long startTimeMs,
-                String manualCreatedPartition,
-                String manualDroppedPartition,
+                String[] manualCreatedPartitions,
+                String[] manualDroppedPartitions,
                 String[] expectedPartitions,
                 Duration advanceDuration,
                 String[] expectedPartitionsAfterAdvance,
                 Duration advanceDuration2,
                 String[] expectedPartitionsFinal) {
             this.timeUnit = timeUnit;
+            this.multiplePartitionKeys = multiplePartitionKeys;
             this.startTimeMs = startTimeMs;
-            this.manualCreatedPartition = manualCreatedPartition;
-            this.manualDroppedPartition = manualDroppedPartition;
+            this.manualCreatedPartitions = manualCreatedPartitions;
+            this.manualDroppedPartitions = manualDroppedPartitions;
             this.expectedPartitions = expectedPartitions;
             this.advanceDuration = advanceDuration;
             this.expectedPartitionsAfterAdvance = 
expectedPartitionsAfterAdvance;
@@ -470,27 +576,36 @@ class AutoPartitionManagerTest {
 
         @Override
         public String toString() {
-            return timeUnit.toString();
+            return timeUnit.toString()
+                    + " | "
+                    + (multiplePartitionKeys ? "Multiple Partition Keys" : 
"Single Partition Key");
         }
 
         static TestParamsBuilder builder(AutoPartitionTimeUnit timeUnit) {
-            return new TestParamsBuilder(timeUnit);
+            return new TestParamsBuilder(timeUnit, false);
+        }
+
+        static TestParamsBuilder builder(
+                AutoPartitionTimeUnit timeUnit, boolean multiplePartitionKeys) 
{
+            return new TestParamsBuilder(timeUnit, multiplePartitionKeys);
         }
     }
 
     private static class TestParamsBuilder {
         AutoPartitionTimeUnit timeUnit;
+        boolean multiplePartitionKeys;
         ZonedDateTime startTime;
         String[] expectedPartitions;
-        String manualCreatedPartition;
-        String manualDroppedPartition;
+        String[] manualCreatedPartitions;
+        String[] manualDroppedPartitions;
         long advanceSeconds;
         String[] expectedPartitionsAfterAdvance;
         long advanceSeconds2;
         String[] expectedPartitionsFinal;
 
-        TestParamsBuilder(AutoPartitionTimeUnit timeUnit) {
+        TestParamsBuilder(AutoPartitionTimeUnit timeUnit, boolean 
multiplePartitionKeys) {
             this.timeUnit = timeUnit;
+            this.multiplePartitionKeys = multiplePartitionKeys;
         }
 
         public TestParamsBuilder startTime(String startTime) {
@@ -504,12 +619,22 @@ class AutoPartitionManagerTest {
         }
 
         public TestParamsBuilder manualCreatedPartition(String 
manualCreatedPartition) {
-            this.manualCreatedPartition = manualCreatedPartition;
+            this.manualCreatedPartitions = new String[] 
{manualCreatedPartition};
+            return this;
+        }
+
+        public TestParamsBuilder manualCreatedPartitions(String... 
manualCreatedPartitions) {
+            this.manualCreatedPartitions = manualCreatedPartitions;
             return this;
         }
 
         public TestParamsBuilder manualDroppedPartition(String 
manualDroppedPartition) {
-            this.manualDroppedPartition = manualDroppedPartition;
+            this.manualDroppedPartitions = new String[] 
{manualDroppedPartition};
+            return this;
+        }
+
+        public TestParamsBuilder manualDroppedPartitions(String... 
manualDroppedPartitions) {
+            this.manualDroppedPartitions = manualDroppedPartitions;
             return this;
         }
 
@@ -544,9 +669,10 @@ class AutoPartitionManagerTest {
         public TestParams build() {
             return new TestParams(
                     timeUnit,
+                    multiplePartitionKeys,
                     startTime.toInstant().toEpochMilli(),
-                    manualCreatedPartition,
-                    manualDroppedPartition,
+                    manualCreatedPartitions,
+                    manualDroppedPartitions,
                     expectedPartitions,
                     Duration.ofSeconds(advanceSeconds),
                     expectedPartitionsAfterAdvance,
@@ -560,8 +686,21 @@ class AutoPartitionManagerTest {
     private TableInfo createPartitionedTable(
             int partitionRetentionNum, int partitionPreCreateNum, 
AutoPartitionTimeUnit timeUnit)
             throws Exception {
+        return createPartitionedTable(
+                partitionRetentionNum, partitionPreCreateNum, timeUnit, false);
+    }
+
+    private TableInfo createPartitionedTable(
+            int partitionRetentionNum,
+            int partitionPreCreateNum,
+            AutoPartitionTimeUnit timeUnit,
+            boolean multiplePartitionKeys)
+            throws Exception {
         long tableId = 1;
-        TablePath tablePath = TablePath.of("db", "test_partition_" + 
UUID.randomUUID());
+        TablePath tablePath =
+                multiplePartitionKeys
+                        ? TablePath.of("db", "test_multiple_partition_keys_" + 
UUID.randomUUID())
+                        : TablePath.of("db", "test_partition_" + 
UUID.randomUUID());
         TableDescriptor descriptor =
                 TableDescriptor.builder()
                         .schema(
@@ -569,21 +708,32 @@ class AutoPartitionManagerTest {
                                         .column("id", DataTypes.INT())
                                         .column("dt", DataTypes.STRING())
                                         .column("a", DataTypes.BIGINT())
+                                        .column("b", DataTypes.BIGINT())
                                         .column("ts", DataTypes.TIMESTAMP())
-                                        .primaryKey("id", "dt")
+                                        .primaryKey(
+                                                multiplePartitionKeys
+                                                        ? new String[] {"id", 
"dt", "a", "b"}
+                                                        : new String[] {"id", 
"dt"})
                                         .build())
-                        .comment("partitioned table")
+                        .comment(
+                                multiplePartitionKeys
+                                        ? "partitioned table with multiple 
partition keys"
+                                        : "partitioned table")
                         .distributedBy(16)
-                        .partitionedBy("dt")
+                        .partitionedBy(
+                                multiplePartitionKeys
+                                        ? new String[] {"dt", "a"}
+                                        : new String[] {"dt"})
                         .property(ConfigOptions.TABLE_REPLICATION_FACTOR, 3)
                         .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true)
+                        .property(ConfigOptions.TABLE_AUTO_PARTITION_KEY, "dt")
                         
.property(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, timeUnit)
                         .property(
                                 
ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION,
                                 partitionRetentionNum)
                         .property(
                                 
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE,
-                                partitionPreCreateNum)
+                                multiplePartitionKeys ? 0 : 
partitionPreCreateNum)
                         .build();
         long currentMillis = System.currentTimeMillis();
         TableInfo tableInfo =

Reply via email to