This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new fc60308ef [lake] Avoid to generate empty split in tiering source 
enumerator (#1925)
fc60308ef is described below

commit fc60308eff14f3bae80fa2b0c91d9652494972ee
Author: Liebing <[email protected]>
AuthorDate: Wed Nov 5 18:12:33 2025 +0800

    [lake] Avoid to generate empty split in tiering source enumerator (#1925)
---
 .../source/split/TieringSplitGenerator.java        |  19 ++++
 .../enumerator/TieringSourceEnumeratorTest.java    | 121 +++++++++++----------
 2 files changed, 80 insertions(+), 60 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
index deefda382..eb207f8ea 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java
@@ -243,6 +243,13 @@ public class TieringSplitGenerator {
             @Nullable Long latestOffsetOfSnapshot,
             @Nullable Long lastCommittedBucketOffset,
             long latestBucketOffset) {
+        if (latestBucketOffset <= 0) {
+            LOG.debug(
+                    "The latestBucketOffset {} is equals or less than 0, skip 
generating split for bucket {}",
+                    latestBucketOffset,
+                    tableBucket);
+            return Optional.empty();
+        }
 
         // the bucket is never been tiered, read kv snapshot is more efficient
         if (lastCommittedBucketOffset == null) {
@@ -280,6 +287,11 @@ public class TieringSplitGenerator {
                                 latestBucketOffset,
                                 0));
             } else {
+                LOG.info(
+                        "The lastCommittedBucketOffset {} is equals or bigger 
than latestBucketOffset {}, skip generating split for bucket {}",
+                        lastCommittedBucketOffset,
+                        latestBucketOffset,
+                        tableBucket);
                 return Optional.empty();
             }
         }
@@ -291,6 +303,13 @@ public class TieringSplitGenerator {
             @Nullable String partitionName,
             @Nullable Long lastCommittedBucketOffset,
             long latestBucketOffset) {
+        if (latestBucketOffset <= 0) {
+            LOG.debug(
+                    "The latestBucketOffset {} is equals or less than 0, skip 
generating split for bucket {}",
+                    latestBucketOffset,
+                    tableBucket);
+            return Optional.empty();
+        }
 
         // the bucket is never been tiered
         if (lastCommittedBucketOffset == null) {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
index 79c49c45f..b93791660 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
@@ -89,23 +89,16 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
                 registerReader(context, enumerator, subtaskId, "localhost-" + 
subtaskId);
                 enumerator.handleSplitRequest(subtaskId, "localhost-" + 
subtaskId);
             }
-            waitUntilTieringTableSplitAssignmentReady(context, 
DEFAULT_BUCKET_NUM, 200L);
-            List<TieringSplit> expectedAssignment = new ArrayList<>();
-            for (int bucketId = 0; bucketId < DEFAULT_BUCKET_NUM; bucketId++) {
-                expectedAssignment.add(
-                        new TieringLogSplit(
-                                tablePath,
-                                new TableBucket(tableId, bucketId),
-                                null,
-                                EARLIEST_OFFSET,
-                                0,
-                                expectNumberOfSplits));
-            }
+
+            // try to assign splits
+            context.runPeriodicCallable(0);
+
             List<TieringSplit> actualAssignment = new ArrayList<>();
             context.getSplitsAssignmentSequence()
                     .forEach(a -> 
a.assignment().values().forEach(actualAssignment::addAll));
 
-            assertThat(actualAssignment).isEqualTo(expectedAssignment);
+            // no split assignment for empty buckets
+            assertThat(actualAssignment).isEmpty();
 
             // mock finished tiered this round, check second round
             context.getSplitsAssignmentSequence().clear();
@@ -282,18 +275,23 @@ class TieringSourceEnumeratorTest extends TieringTestBase 
{
                 registerReader(context, enumerator, subtaskId, "localhost-" + 
subtaskId);
                 enumerator.handleSplitRequest(subtaskId, "localhost-" + 
subtaskId);
             }
-            waitUntilTieringTableSplitAssignmentReady(context, 
DEFAULT_BUCKET_NUM, 200);
 
+            // write one row to one bucket, keep the other buckets empty
+            Map<Integer, Long> bucketOffsetOfFirstWrite =
+                    appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 1);
+
+            // only non-empty buckets should generate splits
+            waitUntilTieringTableSplitAssignmentReady(context, 1, 200L);
             List<TieringSplit> expectedAssignment = new ArrayList<>();
-            for (int bucketId = 0; bucketId < DEFAULT_BUCKET_NUM; bucketId++) {
+            for (int bucketId : bucketOffsetOfFirstWrite.keySet()) {
                 expectedAssignment.add(
                         new TieringLogSplit(
                                 tablePath,
                                 new TableBucket(tableId, bucketId),
                                 null,
                                 EARLIEST_OFFSET,
-                                0L,
-                                expectNumberOfSplits));
+                                bucketOffsetOfFirstWrite.get(bucketId),
+                                bucketOffsetOfFirstWrite.size()));
             }
             List<TieringSplit> actualAssignment = new ArrayList<>();
             context.getSplitsAssignmentSequence()
@@ -307,7 +305,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
             final Map<Integer, Long> bucketOffsetOfInitialWrite = new 
HashMap<>();
             for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; 
tableBucket++) {
                 bucketOffsetOfEarliest.put(tableBucket, EARLIEST_OFFSET);
-                bucketOffsetOfInitialWrite.put(tableBucket, 0L);
+                bucketOffsetOfInitialWrite.put(
+                        tableBucket, 
bucketOffsetOfFirstWrite.getOrDefault(tableBucket, 0L));
             }
             // commit and notify this table tiering task finished
             coordinatorGateway
@@ -321,8 +320,9 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
                     .get();
             enumerator.handleSourceEvent(1, new FinishedTieringEvent(tableId));
 
+            // write rows to every bucket
             Map<Integer, Long> bucketOffsetOfSecondWrite =
-                    appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
+                    appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 1, 10);
 
             // request tiering table splits
             for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
@@ -362,12 +362,6 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
                 FLUSS_CLUSTER_EXTENSION.waitUntilPartitionsCreated(
                         tablePath, 
TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
 
-        final Map<Long, Map<Integer, Long>> bucketOffsetOfInitialWrite =
-                upsertRowForPartitionedTable(
-                        tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 
partitionNameByIds, 0, 10);
-        long snapshotId = 0;
-        waitUntilPartitionTableSnapshot(tableId, partitionNameByIds, 
snapshotId);
-
         int numSubtasks = 6;
         int expectNumberOfSplits = 6;
         // test get snapshot split assignment
@@ -384,38 +378,31 @@ class TieringSourceEnumeratorTest extends TieringTestBase 
{
                 registerReader(context, enumerator, subtaskId, "localhost-" + 
subtaskId);
                 enumerator.handleSplitRequest(subtaskId, "localhost-" + 
subtaskId);
             }
-            waitUntilTieringTableSplitAssignmentReady(
-                    context, DEFAULT_BUCKET_NUM * partitionNameByIds.size(), 
3000L);
 
-            List<TieringSplit> expectedSnapshotAssignment = new ArrayList<>();
-            for (Map.Entry<String, Long> partitionNameById : 
partitionNameByIds.entrySet()) {
-                for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; 
tableBucket++) {
-                    long partitionId = partitionNameById.getValue();
-                    expectedSnapshotAssignment.add(
-                            new TieringSnapshotSplit(
-                                    tablePath,
-                                    new TableBucket(tableId, partitionId, 
tableBucket),
-                                    partitionNameById.getKey(),
-                                    snapshotId,
-                                    
bucketOffsetOfInitialWrite.get(partitionId).get(tableBucket),
-                                    expectNumberOfSplits));
-                }
-            }
+            // try to assign splits
+            context.runPeriodicCallable(0);
+
             List<TieringSplit> actualSnapshotAssignment = new ArrayList<>();
             for (SplitsAssignment<TieringSplit> splitsAssignment :
                     context.getSplitsAssignmentSequence()) {
                 
splitsAssignment.assignment().values().forEach(actualSnapshotAssignment::addAll);
             }
-            assertThat(sortSplits(actualSnapshotAssignment))
-                    .isEqualTo(sortSplits(expectedSnapshotAssignment));
+
+            // no snapshot split should be assigned for empty buckets
+            assertThat(actualSnapshotAssignment).isEmpty();
 
             // mock finished tiered this round, check second round
             context.getSplitsAssignmentSequence().clear();
+            final Map<Long, Map<Integer, Long>> bucketOffsetOfInitialWrite = 
new HashMap<>();
             for (Map.Entry<String, Long> partitionNameById : 
partitionNameByIds.entrySet()) {
-                Map<Integer, Long> partitionInitialBucketOffsets = new 
HashMap<>();
+                Map<Integer, Long> partitionBucketOffsetOfEarliest = new 
HashMap<>();
+                Map<Integer, Long> partitionBucketOffsetOfInitialWrite = new 
HashMap<>();
                 for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; 
tableBucket++) {
-                    partitionInitialBucketOffsets.put(tableBucket, 
EARLIEST_OFFSET);
+                    partitionBucketOffsetOfEarliest.put(tableBucket, 
EARLIEST_OFFSET);
+                    partitionBucketOffsetOfInitialWrite.put(tableBucket, 0L);
                 }
+                bucketOffsetOfInitialWrite.put(
+                        partitionNameById.getValue(), 
partitionBucketOffsetOfInitialWrite);
                 // commit lake table partition
                 coordinatorGateway
                         .commitLakeTableSnapshot(
@@ -423,7 +410,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
                                         tableId,
                                         partitionNameById.getValue(),
                                         1,
-                                        partitionInitialBucketOffsets,
+                                        partitionBucketOffsetOfEarliest,
                                         bucketOffsetOfInitialWrite.get(
                                                 partitionNameById.getValue())))
                         .get();
@@ -434,7 +421,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
             Map<Long, Map<Integer, Long>> bucketOffsetOfSecondWrite =
                     upsertRowForPartitionedTable(
                             tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 
partitionNameByIds, 10, 20);
-            snapshotId = 1;
+            long snapshotId = 0;
             waitUntilPartitionTableSnapshot(tableId, partitionNameByIds, 
snapshotId);
 
             // request tiering table splits
@@ -447,16 +434,16 @@ class TieringSourceEnumeratorTest extends TieringTestBase 
{
             List<TieringSplit> expectedLogAssignment = new ArrayList<>();
             for (Map.Entry<String, Long> partitionNameById : 
partitionNameByIds.entrySet()) {
                 for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; 
tableBucket++) {
-                    long partionId = partitionNameById.getValue();
+                    long partitionId = partitionNameById.getValue();
                     expectedLogAssignment.add(
                             new TieringLogSplit(
                                     tablePath,
-                                    new TableBucket(tableId, partionId, 
tableBucket),
+                                    new TableBucket(tableId, partitionId, 
tableBucket),
                                     partitionNameById.getKey(),
-                                    
bucketOffsetOfInitialWrite.get(partionId).get(tableBucket),
-                                    
bucketOffsetOfInitialWrite.get(partionId).get(tableBucket)
+                                    
bucketOffsetOfInitialWrite.get(partitionId).get(tableBucket),
+                                    
bucketOffsetOfInitialWrite.get(partitionId).get(tableBucket)
                                             + bucketOffsetOfSecondWrite
-                                                    .get(partionId)
+                                                    .get(partitionId)
                                                     .get(tableBucket),
                                     expectNumberOfSplits));
                 }
@@ -496,21 +483,29 @@ class TieringSourceEnumeratorTest extends TieringTestBase 
{
                 registerReader(context, enumerator, subtaskId, "localhost-" + 
subtaskId);
                 enumerator.handleSplitRequest(subtaskId, "localhost-" + 
subtaskId);
             }
-            waitUntilTieringTableSplitAssignmentReady(
-                    context, DEFAULT_BUCKET_NUM * partitionNameByIds.size(), 
3000L);
+
+            Map<Long, Map<Integer, Long>> bucketOffsetOfFirstWrite =
+                    appendRowForPartitionedTable(
+                            tablePath,
+                            DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR,
+                            partitionNameByIds,
+                            0,
+                            1);
+
+            waitUntilTieringTableSplitAssignmentReady(context, 
partitionNameByIds.size(), 3000L);
 
             List<TieringSplit> expectedAssignment = new ArrayList<>();
             for (Map.Entry<String, Long> partitionNameById : 
partitionNameByIds.entrySet()) {
-                for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; 
tableBucket++) {
-                    long partitionId = partitionNameById.getValue();
+                long partitionId = partitionNameById.getValue();
+                for (int tableBucket : 
bucketOffsetOfFirstWrite.get(partitionId).keySet()) {
                     expectedAssignment.add(
                             new TieringLogSplit(
                                     tablePath,
                                     new TableBucket(tableId, partitionId, 
tableBucket),
                                     partitionNameById.getKey(),
                                     EARLIEST_OFFSET,
-                                    0L,
-                                    expectNumberOfSplits));
+                                    
bucketOffsetOfFirstWrite.get(partitionId).get(tableBucket),
+                                    bucketOffsetOfFirstWrite.size()));
                 }
             }
             List<TieringSplit> actualAssignment = new ArrayList<>();
@@ -529,7 +524,11 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
                 Map<Integer, Long> partitionBucketOffsetOfInitialWrite = new 
HashMap<>();
                 for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; 
tableBucket++) {
                     partitionInitialBucketOffsets.put(tableBucket, 
EARLIEST_OFFSET);
-                    partitionBucketOffsetOfInitialWrite.put(tableBucket, 0L);
+                    partitionBucketOffsetOfInitialWrite.put(
+                            tableBucket,
+                            bucketOffsetOfFirstWrite
+                                    .getOrDefault(partitionId, 
Collections.emptyMap())
+                                    .getOrDefault(tableBucket, 0L));
                 }
                 bucketOffsetOfInitialWrite.put(partitionId, 
partitionBucketOffsetOfInitialWrite);
                 // commit lake table partition
@@ -551,7 +550,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
                             tablePath,
                             DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR,
                             partitionNameByIds,
-                            0,
+                            1,
                             10);
 
             // request tiering table splits
@@ -649,9 +648,11 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
     void testHandleFailOverEvent() throws Throwable {
         TablePath tablePath1 = TablePath.of(DEFAULT_DB, 
"tiering-failover-test-log-table1");
         createTable(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR);
+        appendRow(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
 
         TablePath tablePath2 = TablePath.of(DEFAULT_DB, 
"tiering-failover-test-log-table2");
         createTable(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR);
+        appendRow(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
 
         int numSubtasks = 1;
         try (MockSplitEnumeratorContext<TieringSplit> context =

Reply via email to