This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new d6aacdd94 [server] Fix LogTieringTask to commit successfully copied
segments on partial failure (#2767)
d6aacdd94 is described below
commit d6aacdd94ad7227fa7152bea69432a784c2b9947
Author: yunhong <[email protected]>
AuthorDate: Thu Mar 5 18:18:21 2026 +0800
[server] Fix LogTieringTask to commit successfully copied segments on
partial failure (#2767)
---
.../org/apache/fluss/config/ConfigOptions.java | 10 +++
.../fluss/server/log/remote/LogTieringTask.java | 41 ++++++++--
.../fluss/server/log/remote/RemoteLogManager.java | 6 +-
.../server/log/remote/RemoteLogManagerTest.java | 28 +++++++
.../log/remote/RemoteLogMaxUploadSegmentsTest.java | 92 ++++++++++++++++++++++
.../fluss/server/log/remote/RemoteLogTestBase.java | 1 +
.../server/log/remote/TestingRemoteLogStorage.java | 24 ++++++
website/docs/maintenance/configuration.md | 13 +--
8 files changed, 201 insertions(+), 14 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 08742ebc2..90f262b07 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -798,6 +798,16 @@ public class ConfigOptions {
+ "copy segments, clean up remote log
segments, delete local log segments etc. "
+ "If the value is set to 0, it means that
the remote log storage is disabled.");
+ public static final ConfigOption<Integer>
REMOTE_LOG_TASK_MAX_UPLOAD_SEGMENTS =
+ key("remote.log.task-max-upload-segments")
+ .intType()
+ .defaultValue(5)
+ .withDescription(
+ "The maximum number of log segments to upload to
remote storage per "
+ + "tiering task execution. This limits the
upload batch size to "
+ + "prevent overwhelming the remote storage
when there is a large "
+ + "backlog of segments to upload.");
+
public static final ConfigOption<MemorySize>
REMOTE_LOG_INDEX_FILE_CACHE_SIZE =
key("remote.log.index-file-cache-size")
.memoryType()
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
index ac1c2f39b..8c7d0d883 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
@@ -58,6 +58,7 @@ public class LogTieringTask implements Runnable {
private final RemoteLogStorage remoteLogStorage;
private final CoordinatorGateway coordinatorGateway;
private final Clock clock;
+ private final int maxUploadSegmentsPerTask;
// The copied offset is empty initially for a new leader LogTieringTask,
and needs to
// be fetched inside the task's run() method.
@@ -70,7 +71,8 @@ public class LogTieringTask implements Runnable {
RemoteLogTablet remoteLog,
RemoteLogStorage remoteLogStorage,
CoordinatorGateway coordinatorGateway,
- Clock clock) {
+ Clock clock,
+ int maxUploadSegmentsPerTask) {
this.replica = replica;
this.remoteLog = remoteLog;
this.physicalTablePath = replica.getPhysicalTablePath();
@@ -78,6 +80,7 @@ public class LogTieringTask implements Runnable {
this.remoteLogStorage = remoteLogStorage;
this.coordinatorGateway = coordinatorGateway;
this.clock = clock;
+ this.maxUploadSegmentsPerTask = maxUploadSegmentsPerTask;
}
@Override
@@ -233,7 +236,12 @@ public class LogTieringTask implements Runnable {
* Copy the given log segments to remote and add the successfully copied
segment to the {@code
* copiedSegments} parameter.
*
- * @return the end offset of the last segment copied to remote.
+ * <p>If a segment copy fails (e.g., due to rate limiting or transient
errors), the method stops
+ * copying further segments but retains all previously successful copies
so they can still be
+ * committed, avoiding wasted uploads.
+ *
+ * @return the end offset of the last segment successfully copied to
remote, or -1 if no
+ * segments were copied.
*/
private long copyLogSegmentFilesToRemote(
LogTablet log,
@@ -251,10 +259,10 @@ public class LogTieringTask implements Runnable {
logFileName,
physicalTablePath,
tableBucket.getBucket());
- endOffset = enrichedSegment.nextSegmentOffset;
+ long segmentEndOffset = enrichedSegment.nextSegmentOffset;
File writerIdSnapshotFile =
-
log.writerStateManager().fetchSnapshot(endOffset).orElse(null);
+
log.writerStateManager().fetchSnapshot(segmentEndOffset).orElse(null);
LogSegmentFiles logSegmentFiles =
new LogSegmentFiles(
logFile.toPath(),
@@ -270,7 +278,7 @@ public class LogTieringTask implements Runnable {
.tableBucket(tableBucket)
.remoteLogSegmentId(remoteLogSegmentId)
.remoteLogStartOffset(segment.getBaseOffset())
- .remoteLogEndOffset(endOffset)
+ .remoteLogEndOffset(segmentEndOffset)
.maxTimestamp(segment.maxTimestampSoFar())
.segmentSizeInBytes(sizeInBytes)
.build();
@@ -278,7 +286,16 @@ public class LogTieringTask implements Runnable {
remoteLogStorage.copyLogSegmentFiles(copyRemoteLogSegment,
logSegmentFiles);
} catch (RemoteStorageException e) {
metricGroup.remoteLogCopyErrors().inc();
- throw e;
+ LOG.warn(
+ "Failed to copy {} of table {} bucket {} to remote
storage. "
+ + "Stopping further segment copies. "
+ + "{} segment(s) already copied successfully
will be committed.",
+ logFileName,
+ physicalTablePath,
+ tableBucket.getBucket(),
+ copiedSegments.size(),
+ e);
+ break;
}
LOG.info(
"Copied {} of table {} bucket {} to remote storage as
remote log segment: {}.",
@@ -289,6 +306,7 @@ public class LogTieringTask implements Runnable {
metricGroup.remoteLogCopyRequests().inc();
metricGroup.remoteLogCopyBytes().inc(sizeInBytes);
copiedSegments.add(copyRemoteLogSegment);
+ endOffset = segmentEndOffset;
}
return endOffset;
}
@@ -433,12 +451,16 @@ public class LogTieringTask implements Runnable {
}
/**
- * Segments which match the following criteria are eligible for copying to
remote storage:
+ * Returns up to {@code maxUploadSegmentsPerTask} segments eligible for
copying to remote
+ * storage. A segment is eligible if it meets the following criteria:
*
* <p>1. Segment is not the active segment.
*
* <p>2. Segment end-offset is less than the highWatermark as remote
storage should contain only
* committed/acked records.
+ *
+ * <p>The number of returned segments is capped at {@code
maxUploadSegmentsPerTask} to prevent
+ * overwhelming the remote storage when there is a large backlog.
*/
private List<EnrichedLogSegment> candidateLogSegments(
LogTablet log, long fromOffset, long highWatermark) {
@@ -451,6 +473,11 @@ public class LogTieringTask implements Runnable {
long curSegBaseOffset = currentSeg.getBaseOffset();
if (curSegBaseOffset <= highWatermark) {
candidateLogSegments.add(new
EnrichedLogSegment(previousSeg, curSegBaseOffset));
+ // Limit the number of segments to upload per task
execution to prevent
+ // overwhelming the remote storage when there is a large
backlog.
+ if (candidateLogSegments.size() >=
maxUploadSegmentsPerTask) {
+ break;
+ }
}
}
// Discard the last active segment
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
index c85d5ae65..ba34143af 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
@@ -70,6 +70,7 @@ public class RemoteLogManager implements Closeable {
public static final String RLM_SCHEDULED_THREAD_PREFIX =
"fluss-remote-log-manager-thread-pool";
private final long taskInterval;
+ private final int maxUploadSegmentsPerTask;
private final RemoteLogIndexCache remoteLogIndexCache;
private final RemoteLogStorage remoteLogStorage;
private final CoordinatorGateway coordinatorGateway;
@@ -118,6 +119,8 @@ public class RemoteLogManager implements Closeable {
remoteLogStorage,
dataDir);
this.taskInterval =
conf.get(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION).toMillis();
+ this.maxUploadSegmentsPerTask =
+ conf.getInt(ConfigOptions.REMOTE_LOG_TASK_MAX_UPLOAD_SEGMENTS);
this.rlManagerScheduledThreadPool = scheduledExecutor;
this.clock = clock;
}
@@ -290,7 +293,8 @@ public class RemoteLogManager implements Closeable {
remoteLog,
remoteLogStorage,
coordinatorGateway,
- clock);
+ clock,
+ maxUploadSegmentsPerTask);
LOG.info(
"Created a new remote log task for table-bucket{}:
{} and getting scheduled",
tableBucket,
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
index 2834505c9..2ac4e2096 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
@@ -664,6 +664,34 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
assertThat(logTablet.getSegments()).hasSize(3);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCopySegmentPartialFailureCommitsSuccessfulOnes(boolean
partitionTable)
+ throws Exception {
+ TableBucket tb = makeTableBucket(partitionTable);
+ // Need to make leader by ReplicaManager.
+ makeLogTableAsLeader(tb, partitionTable);
+
addMultiSegmentsToLogTablet(replicaManager.getReplicaOrException(tb).getLogTablet(),
5);
+ // 5 segments total, 4 candidates (1 active segment excluded).
+
+ // Inject failure: let the first 2 segment copies succeed, then fail
on the 3rd.
+ remoteLogStorage.copySegmentFailAfterNCopies.set(2);
+
+ remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+
+ // Verify: The manifest should contain exactly 2 segments (the
successfully copied ones).
+ RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb);
+ List<RemoteLogSegment> manifestSegments =
remoteLog.allRemoteLogSegments();
+ assertThat(manifestSegments).hasSize(2);
+
+ // Verify: Remote storage should contain exactly the 2 committed
segment files.
+ assertThat(listRemoteLogFiles(tb))
+ .isEqualTo(
+ manifestSegments.stream()
+ .map(s -> s.remoteLogSegmentId().toString())
+ .collect(Collectors.toSet()));
+ }
+
private TableBucket makeTableBucket(boolean partitionTable) {
return makeTableBucket(DATA1_TABLE_ID, partitionTable);
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogMaxUploadSegmentsTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogMaxUploadSegmentsTest.java
new file mode 100644
index 000000000..5144fc703
--- /dev/null
+++
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogMaxUploadSegmentsTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.log.remote;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.remote.RemoteLogSegment;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LogTieringTask} max upload segments per task limit. */
+class RemoteLogMaxUploadSegmentsTest extends RemoteLogTestBase {
+
+ @Override
+ public Configuration getServerConf() {
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.LOG_INDEX_INTERVAL_SIZE,
MemorySize.parse("1b"));
+ conf.set(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE,
MemorySize.parse("1mb"));
+ conf.set(ConfigOptions.REMOTE_FS_WRITE_BUFFER_SIZE,
MemorySize.parse("10b"));
+ // Use default value (5) for REMOTE_LOG_TASK_MAX_UPLOAD_SEGMENTS.
+ return conf;
+ }
+
+ @BeforeEach
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testMaxUploadSegmentsPerTaskLimit(boolean partitionTable) throws
Exception {
+ // Default maxUploadSegmentsPerTask is 5, so with 10 segments (9
candidates),
+ // only 5 should be uploaded per task execution.
+ TableBucket tb = makeTableBucket(partitionTable);
+ makeLogTableAsLeader(tb, partitionTable);
+
addMultiSegmentsToLogTablet(replicaManager.getReplicaOrException(tb).getLogTablet(),
10);
+ // 10 segments total, 9 candidates (1 active segment excluded).
+
+ // First tiering task execution - should upload only 5 segments.
+ remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+ RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb);
+ List<RemoteLogSegment> manifestSegments =
remoteLog.allRemoteLogSegments();
+ assertThat(manifestSegments).hasSize(5);
+ assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(0L);
+ assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(50L);
+
+ // Second tiering task execution - should upload the remaining 4
segments.
+ remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+ manifestSegments = remoteLog.allRemoteLogSegments();
+ assertThat(manifestSegments).hasSize(9);
+ assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(90L);
+ // Verify remote storage has all 9 segment files.
+ assertThat(listRemoteLogFiles(tb))
+ .isEqualTo(
+ manifestSegments.stream()
+ .map(s -> s.remoteLogSegmentId().toString())
+ .collect(Collectors.toSet()));
+ }
+
+ private TableBucket makeTableBucket(boolean partitionTable) {
+ if (partitionTable) {
+ return new TableBucket(DATA1_TABLE_ID, 0L, 0);
+ } else {
+ return new TableBucket(DATA1_TABLE_ID, 0);
+ }
+ }
+}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTestBase.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTestBase.java
index 7a0a8a125..13ab2f7ba 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTestBase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTestBase.java
@@ -56,6 +56,7 @@ public class RemoteLogTestBase extends ReplicaTestBase {
conf.set(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE,
MemorySize.parse("1mb"));
conf.set(ConfigOptions.REMOTE_FS_WRITE_BUFFER_SIZE,
MemorySize.parse("10b"));
+ conf.setInt(ConfigOptions.REMOTE_LOG_TASK_MAX_UPLOAD_SEGMENTS,
Integer.MAX_VALUE);
return conf;
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TestingRemoteLogStorage.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TestingRemoteLogStorage.java
index 75802dc02..ef0b63336 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TestingRemoteLogStorage.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TestingRemoteLogStorage.java
@@ -20,10 +20,12 @@ package org.apache.fluss.server.log.remote;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.RemoteStorageException;
import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.remote.RemoteLogSegment;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* A testing implementation of {@link
org.apache.fluss.server.log.remote.RemoteLogStorage} which can
@@ -33,11 +35,33 @@ public class TestingRemoteLogStorage extends
DefaultRemoteLogStorage {
public final AtomicBoolean writeManifestFail = new AtomicBoolean(false);
+ /**
+ * When set to a non-negative value N, the first N calls to {@link
#copyLogSegmentFiles} will
+ * succeed and the (N+1)th call will throw a {@link
RemoteStorageException}. A negative value
+ * (default) disables this failure injection.
+ */
+ public final AtomicInteger copySegmentFailAfterNCopies = new
AtomicInteger(-1);
+
+ private final AtomicInteger copySegmentCount = new AtomicInteger(0);
+
public TestingRemoteLogStorage(Configuration conf, ExecutorService
ioExecutor)
throws IOException {
super(conf, ioExecutor);
}
+ @Override
+ public void copyLogSegmentFiles(
+ RemoteLogSegment remoteLogSegment, LogSegmentFiles logSegmentFiles)
+ throws RemoteStorageException {
+ int failAfter = copySegmentFailAfterNCopies.get();
+ if (failAfter >= 0 && copySegmentCount.get() >= failAfter) {
+ throw new RemoteStorageException(
+ "Simulated copy failure after " + failAfter + " successful
copies");
+ }
+ super.copyLogSegmentFiles(remoteLogSegment, logSegmentFiles);
+ copySegmentCount.incrementAndGet();
+ }
+
@Override
public FsPath writeRemoteLogManifestSnapshot(RemoteLogManifest manifest)
throws RemoteStorageException {
diff --git a/website/docs/maintenance/configuration.md
b/website/docs/maintenance/configuration.md
index ae9116a7c..6304e439a 100644
--- a/website/docs/maintenance/configuration.md
+++ b/website/docs/maintenance/configuration.md
@@ -124,12 +124,13 @@ during the Fluss cluster working.
## Log Tiered Storage
-| Option | Type | Default | Description
|
-|-------------------------------------|------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| remote.log.task-interval-duration | Duration | 1min | Interval at
which remote log manager runs the scheduled tasks like copy segments, clean up
remote log segments, delete local log segments etc. If the value is set to 0s,
it means that the remote log storage is disabled. |
-| remote.log.index-file-cache-size | MemorySize | 1gb | The total size
of the space allocated to store index files fetched from remote storage in the
local storage.
|
-| remote.log-manager.thread-pool-size | Integer | 4 | Size of the
thread pool used in scheduling tasks to copy segments, fetch remote log indexes
and clean up remote log segments.
|
-| remote.log.data-transfer-thread-num | Integer | 4 | **Deprecated**:
This option is deprecated. Please use `server.io-pool.size` instead. The number
of threads the server uses to transfer (download and upload) remote log file
can be data file, index file and remote log metadata file. |
+| Option | Type | Default | Description
|
+|-------------------------------------|------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| remote.log.task-interval-duration | Duration | 1min | Interval at
which remote log manager runs the scheduled tasks like copy segments, clean up
remote log segments, delete local log segments etc. If the value is set to 0s,
it means that the remote log storage is disabled. |
+| remote.log.task-max-upload-segments | Integer | 5 | The maximum
number of log segments to upload to remote storage per tiering task execution.
This limits the upload batch size to prevent overwhelming the remote storage
when there is a large backlog of segments to upload. |
+| remote.log.index-file-cache-size | MemorySize | 1gb | The total size
of the space allocated to store index files fetched from remote storage in the
local storage.
|
+| remote.log-manager.thread-pool-size | Integer | 4 | Size of the
thread pool used in scheduling tasks to copy segments, fetch remote log indexes
and clean up remote log segments.
|
+| remote.log.data-transfer-thread-num | Integer | 4 | **Deprecated**:
This option is deprecated. Please use `server.io-pool.size` instead. The number
of threads the server uses to transfer (download and upload) remote log file
can be data file, index file and remote log metadata file. |
## Kv