This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new d6aacdd94 [server] Fix LogTieringTask to commit successfully copied 
segments on partial failure (#2767)
d6aacdd94 is described below

commit d6aacdd94ad7227fa7152bea69432a784c2b9947
Author: yunhong <[email protected]>
AuthorDate: Thu Mar 5 18:18:21 2026 +0800

    [server] Fix LogTieringTask to commit successfully copied segments on 
partial failure (#2767)
---
 .../org/apache/fluss/config/ConfigOptions.java     | 10 +++
 .../fluss/server/log/remote/LogTieringTask.java    | 41 ++++++++--
 .../fluss/server/log/remote/RemoteLogManager.java  |  6 +-
 .../server/log/remote/RemoteLogManagerTest.java    | 28 +++++++
 .../log/remote/RemoteLogMaxUploadSegmentsTest.java | 92 ++++++++++++++++++++++
 .../fluss/server/log/remote/RemoteLogTestBase.java |  1 +
 .../server/log/remote/TestingRemoteLogStorage.java | 24 ++++++
 website/docs/maintenance/configuration.md          | 13 +--
 8 files changed, 201 insertions(+), 14 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java 
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 08742ebc2..90f262b07 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -798,6 +798,16 @@ public class ConfigOptions {
                                     + "copy segments, clean up remote log 
segments, delete local log segments etc. "
                                     + "If the value is set to 0, it means that 
the remote log storage is disabled.");
 
+    public static final ConfigOption<Integer> 
REMOTE_LOG_TASK_MAX_UPLOAD_SEGMENTS =
+            key("remote.log.task-max-upload-segments")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The maximum number of log segments to upload to 
remote storage per "
+                                    + "tiering task execution. This limits the 
upload batch size to "
+                                    + "prevent overwhelming the remote storage 
when there is a large "
+                                    + "backlog of segments to upload.");
+
     public static final ConfigOption<MemorySize> 
REMOTE_LOG_INDEX_FILE_CACHE_SIZE =
             key("remote.log.index-file-cache-size")
                     .memoryType()
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
index ac1c2f39b..8c7d0d883 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
@@ -58,6 +58,7 @@ public class LogTieringTask implements Runnable {
     private final RemoteLogStorage remoteLogStorage;
     private final CoordinatorGateway coordinatorGateway;
     private final Clock clock;
+    private final int maxUploadSegmentsPerTask;
 
     // The copied offset is empty initially for a new leader LogTieringTask, 
and needs to
     // be fetched inside the task's run() method.
@@ -70,7 +71,8 @@ public class LogTieringTask implements Runnable {
             RemoteLogTablet remoteLog,
             RemoteLogStorage remoteLogStorage,
             CoordinatorGateway coordinatorGateway,
-            Clock clock) {
+            Clock clock,
+            int maxUploadSegmentsPerTask) {
         this.replica = replica;
         this.remoteLog = remoteLog;
         this.physicalTablePath = replica.getPhysicalTablePath();
@@ -78,6 +80,7 @@ public class LogTieringTask implements Runnable {
         this.remoteLogStorage = remoteLogStorage;
         this.coordinatorGateway = coordinatorGateway;
         this.clock = clock;
+        this.maxUploadSegmentsPerTask = maxUploadSegmentsPerTask;
     }
 
     @Override
@@ -233,7 +236,12 @@ public class LogTieringTask implements Runnable {
      * Copy the given log segments to remote and add the successfully copied 
segment to the {@code
      * copiedSegments} parameter.
      *
-     * @return the end offset of the last segment copied to remote.
+     * <p>If a segment copy fails (e.g., due to rate limiting or transient 
errors), the method stops
+     * copying further segments but retains all previously successful copies 
so they can still be
+     * committed, avoiding wasted uploads.
+     *
+     * @return the end offset of the last segment successfully copied to 
remote, or -1 if no
+     *     segments were copied.
      */
     private long copyLogSegmentFilesToRemote(
             LogTablet log,
@@ -251,10 +259,10 @@ public class LogTieringTask implements Runnable {
                     logFileName,
                     physicalTablePath,
                     tableBucket.getBucket());
-            endOffset = enrichedSegment.nextSegmentOffset;
+            long segmentEndOffset = enrichedSegment.nextSegmentOffset;
 
             File writerIdSnapshotFile =
-                    
log.writerStateManager().fetchSnapshot(endOffset).orElse(null);
+                    
log.writerStateManager().fetchSnapshot(segmentEndOffset).orElse(null);
             LogSegmentFiles logSegmentFiles =
                     new LogSegmentFiles(
                             logFile.toPath(),
@@ -270,7 +278,7 @@ public class LogTieringTask implements Runnable {
                             .tableBucket(tableBucket)
                             .remoteLogSegmentId(remoteLogSegmentId)
                             .remoteLogStartOffset(segment.getBaseOffset())
-                            .remoteLogEndOffset(endOffset)
+                            .remoteLogEndOffset(segmentEndOffset)
                             .maxTimestamp(segment.maxTimestampSoFar())
                             .segmentSizeInBytes(sizeInBytes)
                             .build();
@@ -278,7 +286,16 @@ public class LogTieringTask implements Runnable {
                 remoteLogStorage.copyLogSegmentFiles(copyRemoteLogSegment, 
logSegmentFiles);
             } catch (RemoteStorageException e) {
                 metricGroup.remoteLogCopyErrors().inc();
-                throw e;
+                LOG.warn(
+                        "Failed to copy {} of table {} bucket {} to remote 
storage. "
+                                + "Stopping further segment copies. "
+                                + "{} segment(s) already copied successfully 
will be committed.",
+                        logFileName,
+                        physicalTablePath,
+                        tableBucket.getBucket(),
+                        copiedSegments.size(),
+                        e);
+                break;
             }
             LOG.info(
                     "Copied {} of table {} bucket {} to remote storage as 
remote log segment: {}.",
@@ -289,6 +306,7 @@ public class LogTieringTask implements Runnable {
             metricGroup.remoteLogCopyRequests().inc();
             metricGroup.remoteLogCopyBytes().inc(sizeInBytes);
             copiedSegments.add(copyRemoteLogSegment);
+            endOffset = segmentEndOffset;
         }
         return endOffset;
     }
@@ -433,12 +451,16 @@ public class LogTieringTask implements Runnable {
     }
 
     /**
-     * Segments which match the following criteria are eligible for copying to 
remote storage:
+     * Returns up to {@code maxUploadSegmentsPerTask} segments eligible for 
copying to remote
+     * storage. A segment is eligible if it meets the following criteria:
      *
      * <p>1. Segment is not the active segment.
      *
      * <p>2. Segment end-offset is less than the highWatermark as remote 
storage should contain only
      * committed/acked records.
+     *
+     * <p>The number of returned segments is capped at {@code 
maxUploadSegmentsPerTask} to prevent
+     * overwhelming the remote storage when there is a large backlog.
      */
     private List<EnrichedLogSegment> candidateLogSegments(
             LogTablet log, long fromOffset, long highWatermark) {
@@ -451,6 +473,11 @@ public class LogTieringTask implements Runnable {
                 long curSegBaseOffset = currentSeg.getBaseOffset();
                 if (curSegBaseOffset <= highWatermark) {
                     candidateLogSegments.add(new 
EnrichedLogSegment(previousSeg, curSegBaseOffset));
+                    // Limit the number of segments to upload per task 
execution to prevent
+                    // overwhelming the remote storage when there is a large 
backlog.
+                    if (candidateLogSegments.size() >= 
maxUploadSegmentsPerTask) {
+                        break;
+                    }
                 }
             }
             // Discard the last active segment
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
index c85d5ae65..ba34143af 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
@@ -70,6 +70,7 @@ public class RemoteLogManager implements Closeable {
     public static final String RLM_SCHEDULED_THREAD_PREFIX = 
"fluss-remote-log-manager-thread-pool";
 
     private final long taskInterval;
+    private final int maxUploadSegmentsPerTask;
     private final RemoteLogIndexCache remoteLogIndexCache;
     private final RemoteLogStorage remoteLogStorage;
     private final CoordinatorGateway coordinatorGateway;
@@ -118,6 +119,8 @@ public class RemoteLogManager implements Closeable {
                         remoteLogStorage,
                         dataDir);
         this.taskInterval = 
conf.get(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION).toMillis();
+        this.maxUploadSegmentsPerTask =
+                conf.getInt(ConfigOptions.REMOTE_LOG_TASK_MAX_UPLOAD_SEGMENTS);
         this.rlManagerScheduledThreadPool = scheduledExecutor;
         this.clock = clock;
     }
@@ -290,7 +293,8 @@ public class RemoteLogManager implements Closeable {
                                     remoteLog,
                                     remoteLogStorage,
                                     coordinatorGateway,
-                                    clock);
+                                    clock,
+                                    maxUploadSegmentsPerTask);
                     LOG.info(
                             "Created a new remote log task for table-bucket{}: 
{} and getting scheduled",
                             tableBucket,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
index 2834505c9..2ac4e2096 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
@@ -664,6 +664,34 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
         assertThat(logTablet.getSegments()).hasSize(3);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testCopySegmentPartialFailureCommitsSuccessfulOnes(boolean 
partitionTable)
+            throws Exception {
+        TableBucket tb = makeTableBucket(partitionTable);
+        // Need to make leader by ReplicaManager.
+        makeLogTableAsLeader(tb, partitionTable);
+        
addMultiSegmentsToLogTablet(replicaManager.getReplicaOrException(tb).getLogTablet(),
 5);
+        // 5 segments total, 4 candidates (1 active segment excluded).
+
+        // Inject failure: let the first 2 segment copies succeed, then fail 
on the 3rd.
+        remoteLogStorage.copySegmentFailAfterNCopies.set(2);
+
+        remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+
+        // Verify: The manifest should contain exactly 2 segments (the 
successfully copied ones).
+        RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb);
+        List<RemoteLogSegment> manifestSegments = 
remoteLog.allRemoteLogSegments();
+        assertThat(manifestSegments).hasSize(2);
+
+        // Verify: Remote storage should contain exactly the 2 committed 
segment files.
+        assertThat(listRemoteLogFiles(tb))
+                .isEqualTo(
+                        manifestSegments.stream()
+                                .map(s -> s.remoteLogSegmentId().toString())
+                                .collect(Collectors.toSet()));
+    }
+
     private TableBucket makeTableBucket(boolean partitionTable) {
         return makeTableBucket(DATA1_TABLE_ID, partitionTable);
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogMaxUploadSegmentsTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogMaxUploadSegmentsTest.java
new file mode 100644
index 000000000..5144fc703
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogMaxUploadSegmentsTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.log.remote;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.remote.RemoteLogSegment;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LogTieringTask} max upload segments per task limit. */
+class RemoteLogMaxUploadSegmentsTest extends RemoteLogTestBase {
+
+    @Override
+    public Configuration getServerConf() {
+        Configuration conf = new Configuration();
+        conf.set(ConfigOptions.LOG_INDEX_INTERVAL_SIZE, 
MemorySize.parse("1b"));
+        conf.set(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE, 
MemorySize.parse("1mb"));
+        conf.set(ConfigOptions.REMOTE_FS_WRITE_BUFFER_SIZE, 
MemorySize.parse("10b"));
+        // Use default value (5) for REMOTE_LOG_TASK_MAX_UPLOAD_SEGMENTS.
+        return conf;
+    }
+
+    @BeforeEach
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testMaxUploadSegmentsPerTaskLimit(boolean partitionTable) throws 
Exception {
+        // Default maxUploadSegmentsPerTask is 5, so with 10 segments (9 
candidates),
+        // only 5 should be uploaded per task execution.
+        TableBucket tb = makeTableBucket(partitionTable);
+        makeLogTableAsLeader(tb, partitionTable);
+        
addMultiSegmentsToLogTablet(replicaManager.getReplicaOrException(tb).getLogTablet(),
 10);
+        // 10 segments total, 9 candidates (1 active segment excluded).
+
+        // First tiering task execution - should upload only 5 segments.
+        remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+        RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb);
+        List<RemoteLogSegment> manifestSegments = 
remoteLog.allRemoteLogSegments();
+        assertThat(manifestSegments).hasSize(5);
+        assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(0L);
+        assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(50L);
+
+        // Second tiering task execution - should upload the remaining 4 
segments.
+        remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+        manifestSegments = remoteLog.allRemoteLogSegments();
+        assertThat(manifestSegments).hasSize(9);
+        assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(90L);
+        // Verify remote storage has all 9 segment files.
+        assertThat(listRemoteLogFiles(tb))
+                .isEqualTo(
+                        manifestSegments.stream()
+                                .map(s -> s.remoteLogSegmentId().toString())
+                                .collect(Collectors.toSet()));
+    }
+
+    private TableBucket makeTableBucket(boolean partitionTable) {
+        if (partitionTable) {
+            return new TableBucket(DATA1_TABLE_ID, 0L, 0);
+        } else {
+            return new TableBucket(DATA1_TABLE_ID, 0);
+        }
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTestBase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTestBase.java
index 7a0a8a125..13ab2f7ba 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTestBase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTestBase.java
@@ -56,6 +56,7 @@ public class RemoteLogTestBase extends ReplicaTestBase {
 
         conf.set(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE, 
MemorySize.parse("1mb"));
         conf.set(ConfigOptions.REMOTE_FS_WRITE_BUFFER_SIZE, 
MemorySize.parse("10b"));
+        conf.setInt(ConfigOptions.REMOTE_LOG_TASK_MAX_UPLOAD_SEGMENTS, 
Integer.MAX_VALUE);
         return conf;
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TestingRemoteLogStorage.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TestingRemoteLogStorage.java
index 75802dc02..ef0b63336 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TestingRemoteLogStorage.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TestingRemoteLogStorage.java
@@ -20,10 +20,12 @@ package org.apache.fluss.server.log.remote;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.RemoteStorageException;
 import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.remote.RemoteLogSegment;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * A testing implementation of {@link 
org.apache.fluss.server.log.remote.RemoteLogStorage} which can
@@ -33,11 +35,33 @@ public class TestingRemoteLogStorage extends 
DefaultRemoteLogStorage {
 
     public final AtomicBoolean writeManifestFail = new AtomicBoolean(false);
 
+    /**
+     * When set to a non-negative value N, the first N calls to {@link 
#copyLogSegmentFiles} will
+     * succeed and the (N+1)th call will throw a {@link 
RemoteStorageException}. A negative value
+     * (default) disables this failure injection.
+     */
+    public final AtomicInteger copySegmentFailAfterNCopies = new 
AtomicInteger(-1);
+
+    private final AtomicInteger copySegmentCount = new AtomicInteger(0);
+
     public TestingRemoteLogStorage(Configuration conf, ExecutorService 
ioExecutor)
             throws IOException {
         super(conf, ioExecutor);
     }
 
+    @Override
+    public void copyLogSegmentFiles(
+            RemoteLogSegment remoteLogSegment, LogSegmentFiles logSegmentFiles)
+            throws RemoteStorageException {
+        int failAfter = copySegmentFailAfterNCopies.get();
+        if (failAfter >= 0 && copySegmentCount.get() >= failAfter) {
+            throw new RemoteStorageException(
+                    "Simulated copy failure after " + failAfter + " successful 
copies");
+        }
+        super.copyLogSegmentFiles(remoteLogSegment, logSegmentFiles);
+        copySegmentCount.incrementAndGet();
+    }
+
     @Override
     public FsPath writeRemoteLogManifestSnapshot(RemoteLogManifest manifest)
             throws RemoteStorageException {
diff --git a/website/docs/maintenance/configuration.md 
b/website/docs/maintenance/configuration.md
index ae9116a7c..6304e439a 100644
--- a/website/docs/maintenance/configuration.md
+++ b/website/docs/maintenance/configuration.md
@@ -124,12 +124,13 @@ during the Fluss cluster working.
 
 ## Log Tiered Storage
 
-| Option                              | Type       | Default | Description     
                                                                                
                                                                                
                                                        |
-|-------------------------------------|------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| remote.log.task-interval-duration   | Duration   | 1min    | Interval at 
which remote log manager runs the scheduled tasks like copy segments, clean up 
remote log segments, delete local log segments etc. If the value is set to 0s, 
it means that the remote log storage is disabled.             |
-| remote.log.index-file-cache-size    | MemorySize | 1gb     | The total size 
of the space allocated to store index files fetched from remote storage in the 
local storage.                                                                  
                                                          |
-| remote.log-manager.thread-pool-size | Integer    | 4       | Size of the 
thread pool used in scheduling tasks to copy segments, fetch remote log indexes 
and clean up remote log segments.                                               
                                                            |
-| remote.log.data-transfer-thread-num | Integer    | 4       | **Deprecated**: 
This option is deprecated. Please use `server.io-pool.size` instead. The number 
of threads the server uses to transfer (download and upload) remote log file 
can be data file, index file and remote log metadata file. |
+| Option                              | Type       | Default | Description     
                                                                                
                                                                                
                                                          |
+|-------------------------------------|------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| remote.log.task-interval-duration   | Duration   | 1min    | Interval at 
which remote log manager runs the scheduled tasks like copy segments, clean up 
remote log segments, delete local log segments etc. If the value is set to 0s, 
it means that the remote log storage is disabled.               |
+| remote.log.task-max-upload-segments | Integer    | 5       | The maximum 
number of log segments to upload to remote storage per tiering task execution. 
This limits the upload batch size to prevent overwhelming the remote storage 
when there is a large backlog of segments to upload.              |
+| remote.log.index-file-cache-size    | MemorySize | 1gb     | The total size 
of the space allocated to store index files fetched from remote storage in the 
local storage.                                                                  
                                                            |
+| remote.log-manager.thread-pool-size | Integer    | 4       | Size of the 
thread pool used in scheduling tasks to copy segments, fetch remote log indexes 
and clean up remote log segments.                                               
                                                              |
+| remote.log.data-transfer-thread-num | Integer    | 4       | **Deprecated**: 
This option is deprecated. Please use `server.io-pool.size` instead. The number 
of threads the server uses to transfer (download and upload) remote log file 
can be data file, index file and remote log metadata file.   |
 
 ## Kv
 

Reply via email to