This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new fc60308ef [lake] Avoid to generate empty split in tiering source
enumerator (#1925)
fc60308ef is described below
commit fc60308eff14f3bae80fa2b0c91d9652494972ee
Author: Liebing <[email protected]>
AuthorDate: Wed Nov 5 18:12:33 2025 +0800
[lake] Avoid to generate empty split in tiering source enumerator (#1925)
---
.../source/split/TieringSplitGenerator.java | 19 ++++
.../enumerator/TieringSourceEnumeratorTest.java | 121 +++++++++++----------
2 files changed, 80 insertions(+), 60 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
index deefda382..eb207f8ea 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
@@ -243,6 +243,13 @@ public class TieringSplitGenerator {
@Nullable Long latestOffsetOfSnapshot,
@Nullable Long lastCommittedBucketOffset,
long latestBucketOffset) {
+ if (latestBucketOffset <= 0) {
+ LOG.debug(
+ "The latestBucketOffset {} is equals or less than 0, skip
generating split for bucket {}",
+ latestBucketOffset,
+ tableBucket);
+ return Optional.empty();
+ }
// the bucket is never been tiered, read kv snapshot is more efficient
if (lastCommittedBucketOffset == null) {
@@ -280,6 +287,11 @@ public class TieringSplitGenerator {
latestBucketOffset,
0));
} else {
+ LOG.info(
+ "The lastCommittedBucketOffset {} is equals or bigger
than latestBucketOffset {}, skip generating split for bucket {}",
+ lastCommittedBucketOffset,
+ latestBucketOffset,
+ tableBucket);
return Optional.empty();
}
}
@@ -291,6 +303,13 @@ public class TieringSplitGenerator {
@Nullable String partitionName,
@Nullable Long lastCommittedBucketOffset,
long latestBucketOffset) {
+ if (latestBucketOffset <= 0) {
+ LOG.debug(
+ "The latestBucketOffset {} is equals or less than 0, skip
generating split for bucket {}",
+ latestBucketOffset,
+ tableBucket);
+ return Optional.empty();
+ }
// the bucket is never been tiered
if (lastCommittedBucketOffset == null) {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
index 79c49c45f..b93791660 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
@@ -89,23 +89,16 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
registerReader(context, enumerator, subtaskId, "localhost-" +
subtaskId);
enumerator.handleSplitRequest(subtaskId, "localhost-" +
subtaskId);
}
- waitUntilTieringTableSplitAssignmentReady(context,
DEFAULT_BUCKET_NUM, 200L);
- List<TieringSplit> expectedAssignment = new ArrayList<>();
- for (int bucketId = 0; bucketId < DEFAULT_BUCKET_NUM; bucketId++) {
- expectedAssignment.add(
- new TieringLogSplit(
- tablePath,
- new TableBucket(tableId, bucketId),
- null,
- EARLIEST_OFFSET,
- 0,
- expectNumberOfSplits));
- }
+
+ // try to assign splits
+ context.runPeriodicCallable(0);
+
List<TieringSplit> actualAssignment = new ArrayList<>();
context.getSplitsAssignmentSequence()
.forEach(a ->
a.assignment().values().forEach(actualAssignment::addAll));
- assertThat(actualAssignment).isEqualTo(expectedAssignment);
+ // no split assignment for empty buckets
+ assertThat(actualAssignment).isEmpty();
// mock finished tiered this round, check second round
context.getSplitsAssignmentSequence().clear();
@@ -282,18 +275,23 @@ class TieringSourceEnumeratorTest extends TieringTestBase
{
registerReader(context, enumerator, subtaskId, "localhost-" +
subtaskId);
enumerator.handleSplitRequest(subtaskId, "localhost-" +
subtaskId);
}
- waitUntilTieringTableSplitAssignmentReady(context,
DEFAULT_BUCKET_NUM, 200);
+ // write one row to one bucket, keep the other buckets empty
+ Map<Integer, Long> bucketOffsetOfFirstWrite =
+ appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 1);
+
+ // only non-empty buckets should generate splits
+ waitUntilTieringTableSplitAssignmentReady(context, 1, 200L);
List<TieringSplit> expectedAssignment = new ArrayList<>();
- for (int bucketId = 0; bucketId < DEFAULT_BUCKET_NUM; bucketId++) {
+ for (int bucketId : bucketOffsetOfFirstWrite.keySet()) {
expectedAssignment.add(
new TieringLogSplit(
tablePath,
new TableBucket(tableId, bucketId),
null,
EARLIEST_OFFSET,
- 0L,
- expectNumberOfSplits));
+ bucketOffsetOfFirstWrite.get(bucketId),
+ bucketOffsetOfFirstWrite.size()));
}
List<TieringSplit> actualAssignment = new ArrayList<>();
context.getSplitsAssignmentSequence()
@@ -307,7 +305,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
final Map<Integer, Long> bucketOffsetOfInitialWrite = new
HashMap<>();
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM;
tableBucket++) {
bucketOffsetOfEarliest.put(tableBucket, EARLIEST_OFFSET);
- bucketOffsetOfInitialWrite.put(tableBucket, 0L);
+ bucketOffsetOfInitialWrite.put(
+ tableBucket,
bucketOffsetOfFirstWrite.getOrDefault(tableBucket, 0L));
}
// commit and notify this table tiering task finished
coordinatorGateway
@@ -321,8 +320,9 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
.get();
enumerator.handleSourceEvent(1, new FinishedTieringEvent(tableId));
+ // write rows to every bucket
Map<Integer, Long> bucketOffsetOfSecondWrite =
- appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
+ appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 1, 10);
// request tiering table splits
for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
@@ -362,12 +362,6 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionsCreated(
tablePath,
TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
- final Map<Long, Map<Integer, Long>> bucketOffsetOfInitialWrite =
- upsertRowForPartitionedTable(
- tablePath, DEFAULT_PK_TABLE_DESCRIPTOR,
partitionNameByIds, 0, 10);
- long snapshotId = 0;
- waitUntilPartitionTableSnapshot(tableId, partitionNameByIds,
snapshotId);
-
int numSubtasks = 6;
int expectNumberOfSplits = 6;
// test get snapshot split assignment
@@ -384,38 +378,31 @@ class TieringSourceEnumeratorTest extends TieringTestBase
{
registerReader(context, enumerator, subtaskId, "localhost-" +
subtaskId);
enumerator.handleSplitRequest(subtaskId, "localhost-" +
subtaskId);
}
- waitUntilTieringTableSplitAssignmentReady(
- context, DEFAULT_BUCKET_NUM * partitionNameByIds.size(),
3000L);
- List<TieringSplit> expectedSnapshotAssignment = new ArrayList<>();
- for (Map.Entry<String, Long> partitionNameById :
partitionNameByIds.entrySet()) {
- for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM;
tableBucket++) {
- long partitionId = partitionNameById.getValue();
- expectedSnapshotAssignment.add(
- new TieringSnapshotSplit(
- tablePath,
- new TableBucket(tableId, partitionId,
tableBucket),
- partitionNameById.getKey(),
- snapshotId,
-
bucketOffsetOfInitialWrite.get(partitionId).get(tableBucket),
- expectNumberOfSplits));
- }
- }
+ // try to assign splits
+ context.runPeriodicCallable(0);
+
List<TieringSplit> actualSnapshotAssignment = new ArrayList<>();
for (SplitsAssignment<TieringSplit> splitsAssignment :
context.getSplitsAssignmentSequence()) {
splitsAssignment.assignment().values().forEach(actualSnapshotAssignment::addAll);
}
- assertThat(sortSplits(actualSnapshotAssignment))
- .isEqualTo(sortSplits(expectedSnapshotAssignment));
+
+ // no snapshot split should be assigned for empty buckets
+ assertThat(actualSnapshotAssignment).isEmpty();
// mock finished tiered this round, check second round
context.getSplitsAssignmentSequence().clear();
+ final Map<Long, Map<Integer, Long>> bucketOffsetOfInitialWrite =
new HashMap<>();
for (Map.Entry<String, Long> partitionNameById :
partitionNameByIds.entrySet()) {
- Map<Integer, Long> partitionInitialBucketOffsets = new
HashMap<>();
+ Map<Integer, Long> partitionBucketOffsetOfEarliest = new
HashMap<>();
+ Map<Integer, Long> partitionBucketOffsetOfInitialWrite = new
HashMap<>();
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM;
tableBucket++) {
- partitionInitialBucketOffsets.put(tableBucket,
EARLIEST_OFFSET);
+ partitionBucketOffsetOfEarliest.put(tableBucket,
EARLIEST_OFFSET);
+ partitionBucketOffsetOfInitialWrite.put(tableBucket, 0L);
}
+ bucketOffsetOfInitialWrite.put(
+ partitionNameById.getValue(),
partitionBucketOffsetOfInitialWrite);
// commit lake table partition
coordinatorGateway
.commitLakeTableSnapshot(
@@ -423,7 +410,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
tableId,
partitionNameById.getValue(),
1,
- partitionInitialBucketOffsets,
+ partitionBucketOffsetOfEarliest,
bucketOffsetOfInitialWrite.get(
partitionNameById.getValue())))
.get();
@@ -434,7 +421,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
Map<Long, Map<Integer, Long>> bucketOffsetOfSecondWrite =
upsertRowForPartitionedTable(
tablePath, DEFAULT_PK_TABLE_DESCRIPTOR,
partitionNameByIds, 10, 20);
- snapshotId = 1;
+ long snapshotId = 0;
waitUntilPartitionTableSnapshot(tableId, partitionNameByIds,
snapshotId);
// request tiering table splits
@@ -447,16 +434,16 @@ class TieringSourceEnumeratorTest extends TieringTestBase
{
List<TieringSplit> expectedLogAssignment = new ArrayList<>();
for (Map.Entry<String, Long> partitionNameById :
partitionNameByIds.entrySet()) {
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM;
tableBucket++) {
- long partionId = partitionNameById.getValue();
+ long partitionId = partitionNameById.getValue();
expectedLogAssignment.add(
new TieringLogSplit(
tablePath,
- new TableBucket(tableId, partionId,
tableBucket),
+ new TableBucket(tableId, partitionId,
tableBucket),
partitionNameById.getKey(),
-
bucketOffsetOfInitialWrite.get(partionId).get(tableBucket),
-
bucketOffsetOfInitialWrite.get(partionId).get(tableBucket)
+
bucketOffsetOfInitialWrite.get(partitionId).get(tableBucket),
+
bucketOffsetOfInitialWrite.get(partitionId).get(tableBucket)
+ bucketOffsetOfSecondWrite
- .get(partionId)
+ .get(partitionId)
.get(tableBucket),
expectNumberOfSplits));
}
@@ -496,21 +483,29 @@ class TieringSourceEnumeratorTest extends TieringTestBase
{
registerReader(context, enumerator, subtaskId, "localhost-" +
subtaskId);
enumerator.handleSplitRequest(subtaskId, "localhost-" +
subtaskId);
}
- waitUntilTieringTableSplitAssignmentReady(
- context, DEFAULT_BUCKET_NUM * partitionNameByIds.size(),
3000L);
+
+ Map<Long, Map<Integer, Long>> bucketOffsetOfFirstWrite =
+ appendRowForPartitionedTable(
+ tablePath,
+ DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR,
+ partitionNameByIds,
+ 0,
+ 1);
+
+ waitUntilTieringTableSplitAssignmentReady(context,
partitionNameByIds.size(), 3000L);
List<TieringSplit> expectedAssignment = new ArrayList<>();
for (Map.Entry<String, Long> partitionNameById :
partitionNameByIds.entrySet()) {
- for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM;
tableBucket++) {
- long partitionId = partitionNameById.getValue();
+ long partitionId = partitionNameById.getValue();
+ for (int tableBucket :
bucketOffsetOfFirstWrite.get(partitionId).keySet()) {
expectedAssignment.add(
new TieringLogSplit(
tablePath,
new TableBucket(tableId, partitionId,
tableBucket),
partitionNameById.getKey(),
EARLIEST_OFFSET,
- 0L,
- expectNumberOfSplits));
+
bucketOffsetOfFirstWrite.get(partitionId).get(tableBucket),
+ bucketOffsetOfFirstWrite.size()));
}
}
List<TieringSplit> actualAssignment = new ArrayList<>();
@@ -529,7 +524,11 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
Map<Integer, Long> partitionBucketOffsetOfInitialWrite = new
HashMap<>();
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM;
tableBucket++) {
partitionInitialBucketOffsets.put(tableBucket,
EARLIEST_OFFSET);
- partitionBucketOffsetOfInitialWrite.put(tableBucket, 0L);
+ partitionBucketOffsetOfInitialWrite.put(
+ tableBucket,
+ bucketOffsetOfFirstWrite
+ .getOrDefault(partitionId,
Collections.emptyMap())
+ .getOrDefault(tableBucket, 0L));
}
bucketOffsetOfInitialWrite.put(partitionId,
partitionBucketOffsetOfInitialWrite);
// commit lake table partition
@@ -551,7 +550,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
tablePath,
DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR,
partitionNameByIds,
- 0,
+ 1,
10);
// request tiering table splits
@@ -649,9 +648,11 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
void testHandleFailOverEvent() throws Throwable {
TablePath tablePath1 = TablePath.of(DEFAULT_DB,
"tiering-failover-test-log-table1");
createTable(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR);
+ appendRow(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
TablePath tablePath2 = TablePath.of(DEFAULT_DB,
"tiering-failover-test-log-table2");
createTable(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR);
+ appendRow(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
int numSubtasks = 1;
try (MockSplitEnumeratorContext<TieringSplit> context =