This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 41cc4140 [server] Optimize auto drop partition perfomance (#1047)
41cc4140 is described below
commit 41cc41408372d1e7ff62ad6deef1e43a26b2c340
Author: Liebing <[email protected]>
AuthorDate: Fri Jul 4 14:40:57 2025 +0800
[server] Optimize auto drop partition perfomance (#1047)
---
.../server/coordinator/AutoPartitionManager.java | 100 ++++++---
.../coordinator/AutoPartitionManagerTest.java | 228 +++++++++++++++++----
2 files changed, 265 insertions(+), 63 deletions(-)
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java
index 502af8f0..ed2fac5f 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java
@@ -50,11 +50,13 @@ import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NavigableSet;
+import java.util.NavigableMap;
import java.util.Set;
-import java.util.TreeSet;
+import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
@@ -96,8 +98,10 @@ public class AutoPartitionManager implements AutoCloseable {
// now, only consider day partition, todo: need to consider all partition
unit
private final Map<Long, Integer> autoCreateDayPartitionDelayMinutes = new
HashMap<>();
+ // table id -> (value of auto partition time key -> partition name set)
+ // for single partition key, the partition name set will be null to reduce
memory usage
@GuardedBy("lock")
- private final Map<Long, TreeSet<String>> partitionsByTable = new
HashMap<>();
+ private final Map<Long, TreeMap<String, Set<String>>> partitionsByTable =
new HashMap<>();
private final Lock lock = new ReentrantLock();
@@ -140,11 +144,14 @@ public class AutoPartitionManager implements
AutoCloseable {
lock,
() -> {
autoPartitionTables.put(tableId, tableInfo);
- Set<String> partitionSet =
+ TreeMap<String, Set<String>> partitionMap =
partitionsByTable.computeIfAbsent(
- tableInfo.getTableId(), k -> new
TreeSet<>());
- checkNotNull(partitionSet, "Partition set is null.");
- partitionSet.addAll(partitions);
+ tableInfo.getTableId(), k -> new
TreeMap<>());
+ checkNotNull(partitionMap, "Partition map is null.");
+ partitions.forEach(
+ partitionName ->
+ addPartitionToPartitionsByTable(
+ tableInfo, partitionMap,
partitionName));
if
(tableInfo.getTableConfig().getAutoPartitionStrategy().timeUnit()
== AutoPartitionTimeUnit.DAY) {
// get the delay minutes to create partition
@@ -191,7 +198,10 @@ public class AutoPartitionManager implements AutoCloseable
{
lock,
() -> {
if (autoPartitionTables.containsKey(tableId)) {
- partitionsByTable.get(tableId).add(partitionName);
+ addPartitionToPartitionsByTable(
+ autoPartitionTables.get(tableId),
+ partitionsByTable.get(tableId),
+ partitionName);
}
});
}
@@ -224,6 +234,33 @@ public class AutoPartitionManager implements AutoCloseable
{
}
}
+ private String extractAutoPartitionValue(TableInfo tableInfo, String
partitionName) {
+ // for single partition key table, the full partition name is the auto
partition value
+ if (tableInfo.getPartitionKeys().size() == 1) {
+ return partitionName;
+ }
+
+ String autoPartitionKey =
tableInfo.getTableConfig().getAutoPartitionStrategy().key();
+ int autoPartitionKeyIndex =
tableInfo.getPartitionKeys().indexOf(autoPartitionKey);
+ return partitionName.split("\\$")[autoPartitionKeyIndex];
+ }
+
+ private void addPartitionToPartitionsByTable(
+ TableInfo tableInfo,
+ NavigableMap<String, Set<String>> partitionMap,
+ String partitionName) {
+ if (tableInfo.getPartitionKeys().size() > 1) {
+ Set<String> partitionSet =
+ partitionMap.computeIfAbsent(
+ extractAutoPartitionValue(tableInfo,
partitionName),
+ k -> new HashSet<>());
+ checkNotNull(partitionSet, "Partition set is null.");
+ partitionSet.add(partitionName);
+ } else {
+ partitionMap.put(partitionName, null);
+ }
+ }
+
private void doAutoPartition() {
Instant now = clock.instant();
inLock(lock, () -> doAutoPartition(now, autoPartitionTables.keySet(),
false));
@@ -248,10 +285,15 @@ public class AutoPartitionManager implements
AutoCloseable {
createPartitionInstant =
now.minus(Duration.ofMinutes(delayMinutes));
}
}
- TreeSet<String> currentPartitions =
- partitionsByTable.computeIfAbsent(tableId, k -> new
TreeSet<>());
+
TableInfo tableInfo = autoPartitionTables.get(tableId);
TablePath tablePath = tableInfo.getTablePath();
+ TreeMap<String, Set<String>> currentPartitions =
+ partitionsByTable.computeIfAbsent(
+ tableId,
+ tableInfo.getPartitionKeys().size() > 1
+ ? k -> new TreeMap<>()
+ : k -> null);
TableRegistration table;
try {
table = metadataManager.getTableRegistration(tablePath);
@@ -281,7 +323,9 @@ public class AutoPartitionManager implements AutoCloseable {
}
private void createPartitions(
- TableInfo tableInfo, Instant currentInstant, TreeSet<String>
currentPartitions) {
+ TableInfo tableInfo,
+ Instant currentInstant,
+ TreeMap<String, Set<String>> currentPartitions) {
// get the partitions needed to create
List<ResolvedPartitionSpec> partitionsToPreCreate =
partitionNamesToPreCreate(
@@ -307,7 +351,8 @@ public class AutoPartitionManager implements AutoCloseable {
metadataManager.createPartition(
tablePath, tableId, partitionAssignment, partition,
false);
- currentPartitions.add(partition.getPartitionName());
+ // only single partition key table supports automatic creation
of partitions
+ currentPartitions.put(partition.getPartitionName(), null);
LOG.info(
"Auto partitioning created partition {} for table
[{}].",
partition,
@@ -343,7 +388,7 @@ public class AutoPartitionManager implements AutoCloseable {
List<String> partitionKeys,
Instant currentInstant,
AutoPartitionStrategy autoPartitionStrategy,
- TreeSet<String> currentPartitions) {
+ TreeMap<String, Set<String>> currentPartitions) {
AutoPartitionTimeUnit autoPartitionTimeUnit =
autoPartitionStrategy.timeUnit();
ZonedDateTime currentZonedDateTime =
ZonedDateTime.ofInstant(
@@ -356,7 +401,7 @@ public class AutoPartitionManager implements AutoCloseable {
generateAutoPartition(
partitionKeys, currentZonedDateTime, idx,
autoPartitionTimeUnit);
// if the partition already exists, we don't need to create it,
otherwise, create it
- if (!currentPartitions.contains(partition.getPartitionName())) {
+ if (!currentPartitions.containsKey(partition.getPartitionName())) {
partitionsToCreate.add(partition);
}
}
@@ -368,14 +413,12 @@ public class AutoPartitionManager implements
AutoCloseable {
List<String> partitionKeys,
Instant currentInstant,
AutoPartitionStrategy autoPartitionStrategy,
- NavigableSet<String> currentPartitions) {
+ NavigableMap<String, Set<String>> currentPartitions) {
int numToRetain = autoPartitionStrategy.numToRetain();
// negative value means not to drop partitions
if (numToRetain < 0) {
return;
}
- String autoPartitionKey =
- partitionKeys.size() == 1 ? partitionKeys.get(0) :
autoPartitionStrategy.key();
ZonedDateTime currentZonedDateTime =
ZonedDateTime.ofInstant(
@@ -398,12 +441,20 @@ public class AutoPartitionManager implements
AutoCloseable {
// (a=2,dt=20250505,b=1) (a=2,dt=20250506,b=1) (a=2,dt=20250507,b=1)
// then partition of pattern:
// (a=?,dt=20250506,b=?) (a=?,dt=20250507,b=?) will be retained.
- int timePartitionKeyIndex = partitionKeys.indexOf(autoPartitionKey);
- // Todo: refactoring currentPartitions to sort by the partition time
key, then it is
- // efficient to handle large partition set.
- for (String partitionName : new ArrayList<>(currentPartitions)) {
- String currentTime =
partitionName.split("\\$")[timePartitionKeyIndex];
- if (currentTime.compareTo(lastRetainPartitionTime) < 0) {
+ Iterator<Map.Entry<String, Set<String>>> iterator =
+
currentPartitions.headMap(lastRetainPartitionTime).entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, Set<String>> entry = iterator.next();
+
+ Iterator<String> dropIterator;
+ if (entry.getValue() == null) {
+ dropIterator = new
HashSet<>(Collections.singleton(entry.getKey())).iterator();
+ } else {
+ dropIterator = entry.getValue().iterator();
+ }
+
+ while (dropIterator.hasNext()) {
+ String partitionName = dropIterator.next();
// drop the partition
try {
metadataManager.dropPartition(
@@ -418,12 +469,13 @@ public class AutoPartitionManager implements
AutoCloseable {
}
// only remove when zk success, this reflects to the
partitionsByTable
- currentPartitions.remove(partitionName);
+ dropIterator.remove();
LOG.info(
"Auto partitioning deleted partition {} for table
[{}].",
partitionName,
tablePath);
}
+ iterator.remove();
}
}
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java
index 5b94546e..b015f709 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java
@@ -83,7 +83,7 @@ class AutoPartitionManagerTest {
}
static Stream<Arguments> parameters() {
- // numPreCreate = 4, numRetention = 2
+ // numPreCreate = 4 (for table with single partition key only),
numRetention = 2
return Stream.of(
Arguments.of(
TestParams.builder(AutoPartitionTimeUnit.HOUR)
@@ -110,6 +110,33 @@ class AutoPartitionManagerTest {
"2024091008",
"2024091009")
.build()),
+ Arguments.of(
+ TestParams.builder(AutoPartitionTimeUnit.HOUR, true)
+ .startTime("2024-09-10T01:00:00")
+ // table with multiple partition keys not
supports automatic
+ // creation
+ .expectedPartitions()
+ .manualCreatedPartitions(
+ "2024091001$A",
+ "2024091001$B",
+ "2024091001$C",
+ "2024091002$A",
+ "2024091002$B",
+ "2024091003$A",
+ "2024091004$A",
+ "2024091005$A")
+ .manualDroppedPartitions("2024091002$A")
+ .advanceClock(c -> c.plusHours(3))
+ // current time partition is "2024091004"
+ .expectedPartitionsAfterAdvance(
+ "2024091002$B",
+ "2024091003$A",
+ "2024091004$A",
+ "2024091005$A")
+ .advanceClock2(c -> c.plusHours(2))
+ // current time partition is "2024091006"
+ .expectedPartitionsFinal("2024091004$A",
"2024091005$A")
+ .build()),
Arguments.of(
TestParams.builder(AutoPartitionTimeUnit.DAY)
.startTime("2024-09-10T00:00:00")
@@ -136,6 +163,26 @@ class AutoPartitionManagerTest {
"20240917",
"20240918")
.build()),
+ Arguments.of(
+ TestParams.builder(AutoPartitionTimeUnit.DAY, true)
+ .startTime("2024-09-10T00:00:00")
+ .expectedPartitions()
+ .manualCreatedPartitions(
+ "20240910$A",
+ "20240910$B",
+ "20240910$C",
+ "20240911$A",
+ "20240911$B",
+ "20240912$A",
+ "20240913$A",
+ "20240914$A")
+ .manualDroppedPartition("20240911$A")
+ .advanceClock(c ->
c.plusDays(3).plus(Duration.ofHours(23)))
+ .expectedPartitionsAfterAdvance(
+ "20240911$B", "20240912$A",
"20240913$A", "20240914$A")
+ .advanceClock2(c -> c.plusDays(2))
+ .expectedPartitionsFinal("20240913$A",
"20240914$A")
+ .build()),
Arguments.of(
TestParams.builder(AutoPartitionTimeUnit.MONTH)
.startTime("2024-09-10T00:00:00")
@@ -150,6 +197,27 @@ class AutoPartitionManagerTest {
.expectedPartitionsFinal(
"202412", "202501", "202502",
"202503", "202504", "202505")
.build()),
+ Arguments.of(
+ TestParams.builder(AutoPartitionTimeUnit.MONTH, true)
+ .startTime("2024-09-10T00:00:00")
+ .expectedPartitions()
+ .manualCreatedPartitions(
+ "202409$A",
+ "202409$B",
+ "202409$C",
+ "202410$A",
+ "202410$B",
+ "202411$A",
+ "202412$A",
+ "202413$A")
+ .manualDroppedPartition("202410$A")
+ .advanceClock(c -> c.plusMonths(3))
+ // current partition is "202412"
+ .expectedPartitionsAfterAdvance(
+ "202410$B", "202411$A", "202412$A",
"202413$A")
+ .advanceClock2(c -> c.plusMonths(2))
+ .expectedPartitionsFinal("202412$A",
"202413$A")
+ .build()),
Arguments.of(
TestParams.builder(AutoPartitionTimeUnit.QUARTER)
.startTime("2024-09-10T00:00:00")
@@ -164,6 +232,22 @@ class AutoPartitionManagerTest {
.expectedPartitionsFinal(
"20252", "20253", "20254", "20261",
"20262", "20263")
.build()),
+ Arguments.of(
+ TestParams.builder(AutoPartitionTimeUnit.QUARTER, true)
+ .startTime("2024-09-10T00:00:00")
+ .expectedPartitions()
+ .manualCreatedPartitions(
+ "20243$A", "20243$B", "20243$C",
"20244$A", "20244$B",
+ "20251$A", "20252$A", "20253$B",
"20254$C")
+ .manualDroppedPartition("20243$A")
+ .advanceClock(c -> c.plusMonths(3 * 3))
+ // current partition is "20252"
+ .expectedPartitionsAfterAdvance(
+ "20244$A", "20244$B", "20251$A",
"20252$A", "20253$B",
+ "20254$C")
+ .advanceClock2(c -> c.plusMonths(2 * 3))
+ .expectedPartitionsFinal("20252$A", "20253$B",
"20254$C")
+ .build()),
Arguments.of(
TestParams.builder(AutoPartitionTimeUnit.YEAR)
.startTime("2024-09-10T00:00:00")
@@ -177,6 +261,21 @@ class AutoPartitionManagerTest {
.advanceClock2(c -> c.plusYears(2))
.expectedPartitionsFinal(
"2027", "2028", "2029", "2030",
"2031", "2032")
+ .build()),
+ Arguments.of(
+ TestParams.builder(AutoPartitionTimeUnit.YEAR, true)
+ .startTime("2024-09-10T00:00:00")
+ .expectedPartitions()
+ .manualCreatedPartitions(
+ "2024$A", "2024$B", "2024$C",
"2025$A", "2025$B", "2026$A",
+ "2027$B", "2028$C")
+ .manualDroppedPartition("2025$B")
+ .advanceClock(c -> c.plusYears(3))
+ // current partition is "2027", retain "2025",
"2026"
+ .expectedPartitionsAfterAdvance(
+ "2025$A", "2026$A", "2027$B", "2028$C")
+ .advanceClock2(c -> c.plusYears(2))
+ .expectedPartitionsFinal("2027$B", "2028$C")
.build()));
}
@@ -196,7 +295,8 @@ class AutoPartitionManagerTest {
periodicExecutor);
autoPartitionManager.start();
- TableInfo table = createPartitionedTable(2, 4, params.timeUnit);
+ TableInfo table =
+ createPartitionedTable(2, 4, params.timeUnit,
params.multiplePartitionKeys);
TablePath tablePath = table.getTablePath();
autoPartitionManager.addAutoPartitionTable(table, true);
// the first auto-partition task is a non-periodic task
@@ -206,7 +306,6 @@ class AutoPartitionManagerTest {
// pre-create 4 partitions including current partition
assertThat(partitions.keySet()).containsExactlyInAnyOrder(params.expectedPartitions);
- // manually create a partition.
int replicaFactor = table.getTableConfig().getReplicationFactor();
Map<Integer, BucketAssignment> bucketAssignments =
generateAssignment(
@@ -221,22 +320,26 @@ class AutoPartitionManagerTest {
long tableId = table.getTableId();
PartitionAssignment partitionAssignment =
new PartitionAssignment(tableId, bucketAssignments);
- metadataManager.createPartition(
- tablePath,
- tableId,
- partitionAssignment,
- fromPartitionName(table.getPartitionKeys(),
params.manualCreatedPartition),
- false);
- // mock the partition is created in zk.
- autoPartitionManager.addPartition(tableId,
params.manualCreatedPartition);
-
- // manually drop a partition.
- metadataManager.dropPartition(
- tablePath,
- fromPartitionName(table.getPartitionKeys(),
params.manualDroppedPartition),
- false);
- // mock the partition is dropped in zk.
- autoPartitionManager.removePartition(tableId,
params.manualDroppedPartition);
+
+ // manually create partitions.
+ for (String partitionName : params.manualCreatedPartitions) {
+ metadataManager.createPartition(
+ tablePath,
+ tableId,
+ partitionAssignment,
+ fromPartitionName(table.getPartitionKeys(), partitionName),
+ false);
+ // mock the partition is created in zk.
+ autoPartitionManager.addPartition(tableId, partitionName);
+ }
+
+ // manually drop partitions.
+ for (String partitionName : params.manualDroppedPartitions) {
+ metadataManager.dropPartition(
+ tablePath, fromPartitionName(table.getPartitionKeys(),
partitionName), false);
+ // mock the partition is dropped in zk.
+ autoPartitionManager.removePartition(tableId, partitionName);
+ }
clock.advanceTime(params.advanceDuration);
periodicExecutor.triggerPeriodicScheduledTasks();
@@ -438,9 +541,10 @@ class AutoPartitionManagerTest {
private static class TestParams {
final AutoPartitionTimeUnit timeUnit;
+ final boolean multiplePartitionKeys;
final long startTimeMs;
- final String manualCreatedPartition;
- final String manualDroppedPartition;
+ final String[] manualCreatedPartitions;
+ final String[] manualDroppedPartitions;
final String[] expectedPartitions;
final Duration advanceDuration;
final String[] expectedPartitionsAfterAdvance;
@@ -449,18 +553,20 @@ class AutoPartitionManagerTest {
private TestParams(
AutoPartitionTimeUnit timeUnit,
+ boolean multiplePartitionKeys,
long startTimeMs,
- String manualCreatedPartition,
- String manualDroppedPartition,
+ String[] manualCreatedPartitions,
+ String[] manualDroppedPartitions,
String[] expectedPartitions,
Duration advanceDuration,
String[] expectedPartitionsAfterAdvance,
Duration advanceDuration2,
String[] expectedPartitionsFinal) {
this.timeUnit = timeUnit;
+ this.multiplePartitionKeys = multiplePartitionKeys;
this.startTimeMs = startTimeMs;
- this.manualCreatedPartition = manualCreatedPartition;
- this.manualDroppedPartition = manualDroppedPartition;
+ this.manualCreatedPartitions = manualCreatedPartitions;
+ this.manualDroppedPartitions = manualDroppedPartitions;
this.expectedPartitions = expectedPartitions;
this.advanceDuration = advanceDuration;
this.expectedPartitionsAfterAdvance =
expectedPartitionsAfterAdvance;
@@ -470,27 +576,36 @@ class AutoPartitionManagerTest {
@Override
public String toString() {
- return timeUnit.toString();
+ return timeUnit.toString()
+ + " | "
+ + (multiplePartitionKeys ? "Multiple Partition Keys" :
"Single Partition Key");
}
static TestParamsBuilder builder(AutoPartitionTimeUnit timeUnit) {
- return new TestParamsBuilder(timeUnit);
+ return new TestParamsBuilder(timeUnit, false);
+ }
+
+ static TestParamsBuilder builder(
+ AutoPartitionTimeUnit timeUnit, boolean multiplePartitionKeys)
{
+ return new TestParamsBuilder(timeUnit, multiplePartitionKeys);
}
}
private static class TestParamsBuilder {
AutoPartitionTimeUnit timeUnit;
+ boolean multiplePartitionKeys;
ZonedDateTime startTime;
String[] expectedPartitions;
- String manualCreatedPartition;
- String manualDroppedPartition;
+ String[] manualCreatedPartitions;
+ String[] manualDroppedPartitions;
long advanceSeconds;
String[] expectedPartitionsAfterAdvance;
long advanceSeconds2;
String[] expectedPartitionsFinal;
- TestParamsBuilder(AutoPartitionTimeUnit timeUnit) {
+ TestParamsBuilder(AutoPartitionTimeUnit timeUnit, boolean
multiplePartitionKeys) {
this.timeUnit = timeUnit;
+ this.multiplePartitionKeys = multiplePartitionKeys;
}
public TestParamsBuilder startTime(String startTime) {
@@ -504,12 +619,22 @@ class AutoPartitionManagerTest {
}
public TestParamsBuilder manualCreatedPartition(String
manualCreatedPartition) {
- this.manualCreatedPartition = manualCreatedPartition;
+ this.manualCreatedPartitions = new String[]
{manualCreatedPartition};
+ return this;
+ }
+
+ public TestParamsBuilder manualCreatedPartitions(String...
manualCreatedPartitions) {
+ this.manualCreatedPartitions = manualCreatedPartitions;
return this;
}
public TestParamsBuilder manualDroppedPartition(String
manualDroppedPartition) {
- this.manualDroppedPartition = manualDroppedPartition;
+ this.manualDroppedPartitions = new String[]
{manualDroppedPartition};
+ return this;
+ }
+
+ public TestParamsBuilder manualDroppedPartitions(String...
manualDroppedPartitions) {
+ this.manualDroppedPartitions = manualDroppedPartitions;
return this;
}
@@ -544,9 +669,10 @@ class AutoPartitionManagerTest {
public TestParams build() {
return new TestParams(
timeUnit,
+ multiplePartitionKeys,
startTime.toInstant().toEpochMilli(),
- manualCreatedPartition,
- manualDroppedPartition,
+ manualCreatedPartitions,
+ manualDroppedPartitions,
expectedPartitions,
Duration.ofSeconds(advanceSeconds),
expectedPartitionsAfterAdvance,
@@ -560,8 +686,21 @@ class AutoPartitionManagerTest {
private TableInfo createPartitionedTable(
int partitionRetentionNum, int partitionPreCreateNum,
AutoPartitionTimeUnit timeUnit)
throws Exception {
+ return createPartitionedTable(
+ partitionRetentionNum, partitionPreCreateNum, timeUnit, false);
+ }
+
+ private TableInfo createPartitionedTable(
+ int partitionRetentionNum,
+ int partitionPreCreateNum,
+ AutoPartitionTimeUnit timeUnit,
+ boolean multiplePartitionKeys)
+ throws Exception {
long tableId = 1;
- TablePath tablePath = TablePath.of("db", "test_partition_" +
UUID.randomUUID());
+ TablePath tablePath =
+ multiplePartitionKeys
+ ? TablePath.of("db", "test_multiple_partition_keys_" +
UUID.randomUUID())
+ : TablePath.of("db", "test_partition_" +
UUID.randomUUID());
TableDescriptor descriptor =
TableDescriptor.builder()
.schema(
@@ -569,21 +708,32 @@ class AutoPartitionManagerTest {
.column("id", DataTypes.INT())
.column("dt", DataTypes.STRING())
.column("a", DataTypes.BIGINT())
+ .column("b", DataTypes.BIGINT())
.column("ts", DataTypes.TIMESTAMP())
- .primaryKey("id", "dt")
+ .primaryKey(
+ multiplePartitionKeys
+ ? new String[] {"id",
"dt", "a", "b"}
+ : new String[] {"id",
"dt"})
.build())
- .comment("partitioned table")
+ .comment(
+ multiplePartitionKeys
+ ? "partitioned table with multiple
partition keys"
+ : "partitioned table")
.distributedBy(16)
- .partitionedBy("dt")
+ .partitionedBy(
+ multiplePartitionKeys
+ ? new String[] {"dt", "a"}
+ : new String[] {"dt"})
.property(ConfigOptions.TABLE_REPLICATION_FACTOR, 3)
.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED,
true)
+ .property(ConfigOptions.TABLE_AUTO_PARTITION_KEY, "dt")
.property(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, timeUnit)
.property(
ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION,
partitionRetentionNum)
.property(
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE,
- partitionPreCreateNum)
+ multiplePartitionKeys ? 0 :
partitionPreCreateNum)
.build();
long currentMillis = System.currentTimeMillis();
TableInfo tableInfo =