This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 420194a6e [client] RemoteLogDownloader supports download log files in 
parallel and in priority (#1579)
420194a6e is described below

commit 420194a6e182f9eda317c378a56df5fe0dd93cf1
Author: Jark Wu <[email protected]>
AuthorDate: Mon Aug 25 19:01:25 2025 +0800

    [client] RemoteLogDownloader supports download log files in parallel and in 
priority (#1579)
---
 .../client/table/scanner/RemoteFileDownloader.java |  68 ++++-
 .../client/table/scanner/log/LogFetchBuffer.java   |  69 ++---
 .../table/scanner/log/LogFetchCollector.java       |   4 +-
 .../fluss/client/table/scanner/log/LogFetcher.java |   5 +-
 .../table/scanner/log/RemoteLogDownloader.java     | 160 +++++++-----
 .../table/scanner/log/RemotePendingFetch.java      |  16 +-
 .../table/scanner/log/LogFetchBufferTest.java      |  45 ++--
 .../table/scanner/log/RemoteLogDownloaderTest.java | 286 ++++++++++++++++-----
 .../fluss/rpc/util/CommonRpcMessageUtils.java      |   6 +-
 fluss-rpc/src/main/proto/FlussApi.proto            |   1 +
 .../fluss/server/utils/ServerRpcMessageUtils.java  |   3 +-
 11 files changed, 463 insertions(+), 200 deletions(-)

diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/RemoteFileDownloader.java
 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/RemoteFileDownloader.java
index d2c81b9a5..bcde64769 100644
--- 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/RemoteFileDownloader.java
+++ 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/RemoteFileDownloader.java
@@ -17,30 +17,38 @@
 
 package com.alibaba.fluss.client.table.scanner;
 
+import com.alibaba.fluss.fs.FSDataInputStream;
+import com.alibaba.fluss.fs.FileSystem;
+import com.alibaba.fluss.fs.FsPath;
 import com.alibaba.fluss.fs.FsPathAndFileName;
 import com.alibaba.fluss.fs.utils.FileDownloadSpec;
 import com.alibaba.fluss.fs.utils.FileDownloadUtils;
 import com.alibaba.fluss.utils.CloseableRegistry;
+import com.alibaba.fluss.utils.IOUtils;
 import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 /**
- * The downloader to download the remote files (like kv snapshots files, log 
segment files) to be
- * read.
+ * The downloader that has a IO thread pool to download the remote files (like 
kv snapshots files,
+ * log segment files).
  */
 public class RemoteFileDownloader implements Closeable {
 
-    protected final ExecutorService snapshotDownLoadThreadPool;
+    protected final ExecutorService downloadThreadPool;
 
     public RemoteFileDownloader(int threadNum) {
-        snapshotDownLoadThreadPool =
+        downloadThreadPool =
                 Executors.newFixedThreadPool(
                         threadNum,
                         new ExecutorThreadFactory(
@@ -52,9 +60,55 @@ public class RemoteFileDownloader implements Closeable {
                                 
Thread.currentThread().getContextClassLoader()));
     }
 
+    /**
+     * Downloads the file from the given remote file path to the target 
directory asynchronously,
+     * returns a Future object of the number of downloaded bytes. The Future 
will fail if the
+     * download fails after retrying for RETRY_COUNT times.
+     */
+    public CompletableFuture<Long> downloadFileAsync(
+            FsPathAndFileName fsPathAndFileName, Path targetDirectory) {
+        CompletableFuture<Long> future = new CompletableFuture<>();
+        downloadThreadPool.submit(
+                () -> {
+                    try {
+                        Path targetFilePath =
+                                
targetDirectory.resolve(fsPathAndFileName.getFileName());
+                        FsPath remoteFilePath = fsPathAndFileName.getPath();
+                        long downloadBytes = downloadFile(targetFilePath, 
remoteFilePath);
+                        future.complete(downloadBytes);
+                    } catch (Exception e) {
+                        future.completeExceptionally(e);
+                    }
+                });
+        return future;
+    }
+
+    /**
+     * Copies the file from a remote file path to the given target file path, 
returns the number of
+     * downloaded bytes.
+     */
+    protected long downloadFile(Path targetFilePath, FsPath remoteFilePath) 
throws IOException {
+        List<Closeable> closeableRegistry = new ArrayList<>(2);
+        try {
+            FileSystem fileSystem = remoteFilePath.getFileSystem();
+            FSDataInputStream inputStream = fileSystem.open(remoteFilePath);
+            closeableRegistry.add(inputStream);
+
+            Files.createDirectories(targetFilePath.getParent());
+            OutputStream outputStream = Files.newOutputStream(targetFilePath);
+            closeableRegistry.add(outputStream);
+
+            return IOUtils.copyBytes(inputStream, outputStream, false);
+        } catch (Exception ex) {
+            throw new IOException(ex);
+        } finally {
+            closeableRegistry.forEach(IOUtils::closeQuietly);
+        }
+    }
+
     @Override
     public void close() throws IOException {
-        snapshotDownLoadThreadPool.shutdownNow();
+        downloadThreadPool.shutdownNow();
     }
 
     public void transferAllToDirectory(
@@ -65,8 +119,6 @@ public class RemoteFileDownloader implements Closeable {
         FileDownloadSpec fileDownloadSpec =
                 new FileDownloadSpec(fsPathAndFileNames, targetDirectory);
         FileDownloadUtils.transferAllDataToDirectory(
-                Collections.singleton(fileDownloadSpec),
-                closeableRegistry,
-                snapshotDownLoadThreadPool);
+                Collections.singleton(fileDownloadSpec), closeableRegistry, 
downloadThreadPool);
     }
 }
diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBuffer.java
 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBuffer.java
index 5441bd90e..de2b09175 100644
--- 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBuffer.java
+++ 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBuffer.java
@@ -30,8 +30,10 @@ import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -66,7 +68,7 @@ public class LogFetchBuffer implements AutoCloseable {
     private final LinkedList<CompletedFetch> completedFetches;
 
     @GuardedBy("lock")
-    private final LinkedList<PendingFetch> pendingFetches = new LinkedList<>();
+    private final Map<TableBucket, LinkedList<PendingFetch>> pendingFetches = 
new HashMap<>();
 
     @GuardedBy("lock")
     private @Nullable CompletedFetch nextInLineFetch;
@@ -88,7 +90,9 @@ public class LogFetchBuffer implements AutoCloseable {
         inLock(
                 lock,
                 () -> {
-                    pendingFetches.add(pendingFetch);
+                    pendingFetches
+                            .computeIfAbsent(pendingFetch.tableBucket(), k -> 
new LinkedList<>())
+                            .add(pendingFetch);
                 });
     }
 
@@ -96,17 +100,18 @@ public class LogFetchBuffer implements AutoCloseable {
      * Tries to complete the pending fetches in order, convert them into 
completed fetches in the
      * buffer.
      */
-    void tryComplete() {
+    void tryComplete(TableBucket tableBucket) {
         inLock(
                 lock,
                 () -> {
                     boolean hasCompleted = false;
-                    while (!pendingFetches.isEmpty()) {
-                        PendingFetch pendingFetch = pendingFetches.peek();
+                    LinkedList<PendingFetch> pendings = 
this.pendingFetches.get(tableBucket);
+                    while (pendings != null && !pendings.isEmpty()) {
+                        PendingFetch pendingFetch = pendings.peek();
                         if (pendingFetch.isCompleted()) {
                             CompletedFetch completedFetch = 
pendingFetch.toCompletedFetch();
                             completedFetches.add(completedFetch);
-                            pendingFetches.poll();
+                            pendings.poll();
                             hasCompleted = true;
                         } else {
                             break;
@@ -114,6 +119,10 @@ public class LogFetchBuffer implements AutoCloseable {
                     }
                     if (hasCompleted) {
                         notEmptyCondition.signalAll();
+                        // clear the bucket entry if there is no pending 
fetches for the bucket.
+                        if (pendings.isEmpty()) {
+                            this.pendingFetches.remove(tableBucket);
+                        }
                     }
                 });
     }
@@ -122,11 +131,13 @@ public class LogFetchBuffer implements AutoCloseable {
         inLock(
                 lock,
                 () -> {
-                    if (pendingFetches.isEmpty()) {
+                    LinkedList<PendingFetch> pendings =
+                            pendingFetches.get(completedFetch.tableBucket);
+                    if (pendings == null || pendings.isEmpty()) {
                         completedFetches.add(completedFetch);
                         notEmptyCondition.signalAll();
                     } else {
-                        pendingFetches.add(new 
CompletedPendingFetch(completedFetch));
+                        pendings.add(new 
CompletedPendingFetch(completedFetch));
                     }
                 });
     }
@@ -135,17 +146,7 @@ public class LogFetchBuffer implements AutoCloseable {
         if (completedFetches == null || completedFetches.isEmpty()) {
             return;
         }
-        inLock(
-                lock,
-                () -> {
-                    if (pendingFetches.isEmpty()) {
-                        this.completedFetches.addAll(completedFetches);
-                        notEmptyCondition.signalAll();
-                    } else {
-                        completedFetches.forEach(
-                                cf -> pendingFetches.add(new 
CompletedPendingFetch(cf)));
-                    }
-                });
+        inLock(lock, () -> completedFetches.forEach(this::add));
     }
 
     CompletedFetch nextInLineFetch() {
@@ -225,7 +226,8 @@ public class LogFetchBuffer implements AutoCloseable {
                         nextInLineFetch = null;
                     }
 
-                    pendingFetches.removeIf(pf -> 
!buckets.contains(pf.tableBucket()));
+                    // remove entries that not matches the buckets from 
pendingFetches
+                    pendingFetches.entrySet().removeIf(entry -> 
!buckets.contains(entry.getKey()));
                 });
     }
 
@@ -233,12 +235,12 @@ public class LogFetchBuffer implements AutoCloseable {
      * Drains (i.e. <em>removes</em>) the contents of the given {@link 
CompletedFetch} as its data
      * should not be returned to the user.
      */
-    private boolean maybeDrain(Set<TableBucket> buckets, CompletedFetch 
completedFetch) {
-        if (completedFetch != null && 
!buckets.contains(completedFetch.tableBucket)) {
+    private boolean maybeDrain(Set<TableBucket> excludedBuckets, 
CompletedFetch completedFetch) {
+        if (completedFetch != null && 
!excludedBuckets.contains(completedFetch.tableBucket)) {
             LOG.debug(
                     "Removing {} from buffered fetch data as it is not in the 
set of buckets to retain ({})",
                     completedFetch.tableBucket,
-                    buckets);
+                    excludedBuckets);
             completedFetch.drain();
             return true;
         } else {
@@ -256,36 +258,19 @@ public class LogFetchBuffer implements AutoCloseable {
         return inLock(
                 lock,
                 () -> {
-                    // If there are any pending fetches which have not been 
added to
-                    // completedFetches, we will return null. For example, a 
possible scenario is
-                    // that the remote log downloader can not download remote 
log as soon as
-                    // possible. In this case, we can't return any buckets to 
avoid OOM cause by the
-                    // frequently fetch log request send to server to fetch 
log back, which the
-                    // fetch data can not consume timely and will be buffered 
in memory.
-                    // TODO this is a hack logic to avoid OOM, we should fix 
it later to refactor
-                    // the remote log download logic.
-                    if (!pendingFetches.isEmpty()) {
-                        return null;
-                    }
-
                     final Set<TableBucket> buckets = new HashSet<>();
                     if (nextInLineFetch != null && 
!nextInLineFetch.isConsumed()) {
                         buckets.add(nextInLineFetch.tableBucket);
                     }
                     completedFetches.forEach(cf -> 
buckets.add(cf.tableBucket));
+                    buckets.addAll(pendingFetches.keySet());
                     return buckets;
                 });
     }
 
     /** Return the set of {@link TableBucket buckets} for which we have 
pending fetches. */
     Set<TableBucket> pendedBuckets() {
-        return inLock(
-                lock,
-                () -> {
-                    final Set<TableBucket> buckets = new HashSet<>();
-                    pendingFetches.forEach(pf -> 
buckets.add(pf.tableBucket()));
-                    return buckets;
-                });
+        return inLock(lock, pendingFetches::keySet);
     }
 
     @Override
diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchCollector.java
 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchCollector.java
index f847ac04c..fb6095f2a 100644
--- 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchCollector.java
+++ 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchCollector.java
@@ -177,7 +177,7 @@ public class LogFetchCollector {
             } else {
                 // these records aren't next in line based on the last 
consumed offset, ignore them
                 // they must be from an obsolete request
-                LOG.debug(
+                LOG.warn(
                         "Ignoring fetched records for {} at offset {} since 
the current offset is {}",
                         nextInLineFetch.tableBucket,
                         nextInLineFetch.nextFetchOffset(),
@@ -228,7 +228,7 @@ public class LogFetchCollector {
             return null;
         }
         if (offset != fetchOffset) {
-            LOG.debug(
+            LOG.warn(
                     "Discarding stale fetch response for bucket {} since its 
offset {} does not match the expected offset {}.",
                     tb,
                     fetchOffset,
diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java
 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java
index 4c939f9b5..a11a71ca6 100644
--- 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java
+++ 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java
@@ -142,6 +142,7 @@ public class LogFetcher implements Closeable {
         this.scannerMetricGroup = scannerMetricGroup;
         this.remoteLogDownloader =
                 new RemoteLogDownloader(tablePath, conf, remoteFileDownloader, 
scannerMetricGroup);
+        remoteLogDownloader.start();
     }
 
     /**
@@ -364,7 +365,7 @@ public class LogFetcher implements Closeable {
             }
             RemoteLogDownloadFuture downloadFuture =
                     remoteLogDownloader.requestRemoteLog(remoteLogTabletDir, 
segment);
-            PendingFetch pendingFetch =
+            RemotePendingFetch pendingFetch =
                     new RemotePendingFetch(
                             segment,
                             downloadFuture,
@@ -375,7 +376,7 @@ public class LogFetcher implements Closeable {
                             logScannerStatus,
                             isCheckCrcs);
             logFetchBuffer.pend(pendingFetch);
-            downloadFuture.onComplete(logFetchBuffer::tryComplete);
+            downloadFuture.onComplete(() -> 
logFetchBuffer.tryComplete(segment.tableBucket()));
         }
     }
 
diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloader.java
 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloader.java
index e36e9a04c..c3f357eac 100644
--- 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloader.java
+++ 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloader.java
@@ -27,9 +27,8 @@ import com.alibaba.fluss.fs.FsPath;
 import com.alibaba.fluss.fs.FsPathAndFileName;
 import com.alibaba.fluss.metadata.TablePath;
 import com.alibaba.fluss.remote.RemoteLogSegment;
-import com.alibaba.fluss.utils.CloseableRegistry;
+import com.alibaba.fluss.utils.ExceptionUtils;
 import com.alibaba.fluss.utils.FlussPaths;
-import com.alibaba.fluss.utils.MapUtils;
 import com.alibaba.fluss.utils.concurrent.ShutdownableThread;
 
 import org.slf4j.Logger;
@@ -43,18 +42,16 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
-import static com.alibaba.fluss.utils.FileUtils.deleteFileOrDirectory;
+import static com.alibaba.fluss.utils.FileUtils.deleteDirectoryQuietly;
 import static com.alibaba.fluss.utils.FlussPaths.LOG_FILE_SUFFIX;
-import static com.alibaba.fluss.utils.FlussPaths.filenamePrefixFromOffset;
 import static com.alibaba.fluss.utils.FlussPaths.remoteLogSegmentDir;
 import static com.alibaba.fluss.utils.FlussPaths.remoteLogSegmentFile;
 
@@ -68,13 +65,15 @@ public class RemoteLogDownloader implements Closeable {
 
     private final Path localLogDir;
 
-    private final BlockingQueue<RemoteLogDownloadRequest> segmentsToFetch;
+    /**
+     * A queue to hold the remote log segment files to be fetched. The queue 
is ordered by the
+     * max_timestamp of the remote log segment. So we download the remote log 
segments from the
+     * older to the newer.
+     */
+    private final PriorityBlockingQueue<RemoteLogDownloadRequest> 
segmentsToFetch;
 
     private final BlockingQueue<RemoteLogSegment> segmentsToRecycle;
 
-    // <log_segment_id -> segment_uuid_path>
-    private final ConcurrentHashMap<String, Path> fetchedFiles;
-
     private final Semaphore prefetchSemaphore;
 
     private final DownloadRemoteLogThread downloadThread;
@@ -101,9 +100,8 @@ public class RemoteLogDownloader implements Closeable {
             RemoteFileDownloader remoteFileDownloader,
             ScannerMetricGroup scannerMetricGroup,
             long pollTimeout) {
-        this.segmentsToFetch = new LinkedBlockingQueue<>();
+        this.segmentsToFetch = new PriorityBlockingQueue<>();
         this.segmentsToRecycle = new LinkedBlockingQueue<>();
-        this.fetchedFiles = MapUtils.newConcurrentHashMap();
         this.remoteFileDownloader = remoteFileDownloader;
         this.scannerMetricGroup = scannerMetricGroup;
         this.pollTimeout = pollTimeout;
@@ -116,6 +114,9 @@ public class RemoteLogDownloader implements Closeable {
                         conf.get(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR),
                         "remote-logs-" + UUID.randomUUID());
         this.downloadThread = new DownloadRemoteLogThread(tablePath);
+    }
+
+    public void start() {
         downloadThread.start();
     }
 
@@ -140,39 +141,54 @@ public class RemoteLogDownloader implements Closeable {
      * to fetch.
      */
     void fetchOnce() throws Exception {
+        // blocks until there is capacity (the fetched file is consumed)
+        prefetchSemaphore.acquire();
+
         // wait until there is a remote fetch request
         RemoteLogDownloadRequest request = segmentsToFetch.poll(pollTimeout, 
TimeUnit.MILLISECONDS);
         if (request == null) {
+            prefetchSemaphore.release();
             return;
         }
-        // blocks until there is capacity (the fetched file is consumed)
-        prefetchSemaphore.acquire();
+
         try {
             // 1. cleanup the finished logs first to free up disk space
             cleanupRemoteLogs();
 
             // 2. do the actual download work
             FsPathAndFileName fsPathAndFileName = 
request.getFsPathAndFileName();
-            Path segmentPath = 
localLogDir.resolve(request.segment.remoteLogSegmentId().toString());
             scannerMetricGroup.remoteFetchRequestCount().inc();
-            // download the remote file to local
-            LOG.info(
-                    "Start to download remote log segment file {} to local.",
-                    fsPathAndFileName.getFileName());
+
             long startTime = System.currentTimeMillis();
-            remoteFileDownloader.transferAllToDirectory(
-                    Collections.singletonList(fsPathAndFileName),
-                    segmentPath,
-                    new CloseableRegistry());
-            LOG.info(
-                    "Download remote log segment file {} to local cost {} ms.",
-                    fsPathAndFileName.getFileName(),
-                    System.currentTimeMillis() - startTime);
-            File localFile = new File(segmentPath.toFile(), 
fsPathAndFileName.getFileName());
-            scannerMetricGroup.remoteFetchBytes().inc(localFile.length());
-            String segmentId = request.segment.remoteLogSegmentId().toString();
-            fetchedFiles.put(segmentId, segmentPath);
-            request.future.complete(localFile);
+            // download the remote file to local
+            remoteFileDownloader
+                    .downloadFileAsync(fsPathAndFileName, localLogDir)
+                    .whenComplete(
+                            (bytes, throwable) -> {
+                                if (throwable != null) {
+                                    LOG.error(
+                                            "Failed to download remote log 
segment file {}.",
+                                            fsPathAndFileName.getFileName(),
+                                            
ExceptionUtils.stripExecutionException(throwable));
+                                    // release the semaphore for the failed 
request
+                                    prefetchSemaphore.release();
+                                    // add back the request to the queue,
+                                    // so we do not complete the 
request.future here
+                                    segmentsToFetch.add(request);
+                                    
scannerMetricGroup.remoteFetchErrorCount().inc();
+                                } else {
+                                    LOG.info(
+                                            "Successfully downloaded remote 
log segment file {} to local cost {} ms.",
+                                            fsPathAndFileName.getFileName(),
+                                            System.currentTimeMillis() - 
startTime);
+                                    File localFile =
+                                            new File(
+                                                    localLogDir.toFile(),
+                                                    
fsPathAndFileName.getFileName());
+                                    
scannerMetricGroup.remoteFetchBytes().inc(bytes);
+                                    request.future.complete(localFile);
+                                }
+                            });
         } catch (Throwable t) {
             prefetchSemaphore.release();
             // add back the request to the queue
@@ -191,24 +207,15 @@ public class RemoteLogDownloader implements Closeable {
     }
 
     private void cleanupFinishedRemoteLog(RemoteLogSegment segment) {
-        String segmentId = segment.remoteLogSegmentId().toString();
-        Path segmentPath = fetchedFiles.remove(segmentId);
-        if (segmentPath != null) {
-            try {
-                Path logFile =
-                        segmentPath.resolve(
-                                
filenamePrefixFromOffset(segment.remoteLogStartOffset())
-                                        + LOG_FILE_SUFFIX);
-                Files.deleteIfExists(logFile);
-                Files.deleteIfExists(segmentPath);
-                LOG.info(
-                        "Consumed and deleted the fetched log segment file 
{}/{} for bucket {}.",
-                        segmentPath.getFileName(),
-                        logFile.getFileName(),
-                        segment.tableBucket());
-            } catch (IOException e) {
-                LOG.warn("Failed to delete the fetch segment path {}.", 
segmentPath, e);
-            }
+        try {
+            Path logFile = 
localLogDir.resolve(getLocalFileNameOfRemoteSegment(segment));
+            Files.deleteIfExists(logFile);
+            LOG.info(
+                    "Consumed and deleted the fetched log segment file {} for 
bucket {}.",
+                    logFile.getFileName(),
+                    segment.tableBucket());
+        } catch (IOException e) {
+            LOG.warn("Failed to delete the local fetch segment file {}.", 
localLogDir, e);
         }
     }
 
@@ -219,11 +226,8 @@ public class RemoteLogDownloader implements Closeable {
         } catch (InterruptedException e) {
             // ignore
         }
-        // cleanup all downloaded files
-        for (Path segmentPath : fetchedFiles.values()) {
-            deleteFileOrDirectory(segmentPath.toFile());
-        }
-        fetchedFiles.clear();
+
+        deleteDirectoryQuietly(localLogDir.toFile());
     }
 
     @VisibleForTesting
@@ -236,16 +240,34 @@ public class RemoteLogDownloader implements Closeable {
         return localLogDir;
     }
 
+    @VisibleForTesting
+    int getSizeOfSegmentsToFetch() {
+        return segmentsToFetch.size();
+    }
+
     protected static FsPathAndFileName getFsPathAndFileName(
             FsPath remoteLogTabletDir, RemoteLogSegment segment) {
         FsPath remotePath =
                 remoteLogSegmentFile(
                         remoteLogSegmentDir(remoteLogTabletDir, 
segment.remoteLogSegmentId()),
                         segment.remoteLogStartOffset());
-        return new FsPathAndFileName(
-                remotePath,
-                
FlussPaths.filenamePrefixFromOffset(segment.remoteLogStartOffset())
-                        + LOG_FILE_SUFFIX);
+        return new FsPathAndFileName(remotePath, 
getLocalFileNameOfRemoteSegment(segment));
+    }
+
+    /**
+     * Get the local file name of the remote log segment.
+     *
+     * <p>The file name is in pattern:
+     *
+     * <pre>
+     *     ${remote_segment_id}_${offset_prefix}.log
+     * </pre>
+     */
+    private static String getLocalFileNameOfRemoteSegment(RemoteLogSegment 
segment) {
+        return segment.remoteLogSegmentId()
+                + "_"
+                + 
FlussPaths.filenamePrefixFromOffset(segment.remoteLogStartOffset())
+                + LOG_FILE_SUFFIX;
     }
 
     /**
@@ -265,10 +287,10 @@ public class RemoteLogDownloader implements Closeable {
     }
 
     /** Represents a request to download a remote log segment file to local. */
-    private static class RemoteLogDownloadRequest {
-        private final RemoteLogSegment segment;
-        private final FsPath remoteLogTabletDir;
-        private final CompletableFuture<File> future = new 
CompletableFuture<>();
+    static class RemoteLogDownloadRequest implements 
Comparable<RemoteLogDownloadRequest> {
+        final RemoteLogSegment segment;
+        final FsPath remoteLogTabletDir;
+        final CompletableFuture<File> future = new CompletableFuture<>();
 
         public RemoteLogDownloadRequest(RemoteLogSegment segment, FsPath 
remoteLogTabletDir) {
             this.segment = segment;
@@ -278,5 +300,17 @@ public class RemoteLogDownloader implements Closeable {
         public FsPathAndFileName getFsPathAndFileName() {
             return 
RemoteLogDownloader.getFsPathAndFileName(remoteLogTabletDir, segment);
         }
+
+        @Override
+        public int compareTo(RemoteLogDownloadRequest o) {
+            if (segment.tableBucket().equals(o.segment.tableBucket())) {
+                // strictly download in the offset order if they belong to the 
same bucket
+                return Long.compare(
+                        segment.remoteLogStartOffset(), 
o.segment.remoteLogStartOffset());
+            } else {
+                // download segment from old to new across buckets
+                return Long.compare(segment.maxTimestamp(), 
o.segment.maxTimestamp());
+            }
+        }
     }
 }
diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemotePendingFetch.java
 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemotePendingFetch.java
index eae93a606..c61fcc56a 100644
--- 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemotePendingFetch.java
+++ 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemotePendingFetch.java
@@ -28,7 +28,7 @@ import com.alibaba.fluss.remote.RemoteLogSegment;
  */
 class RemotePendingFetch implements PendingFetch {
 
-    private final RemoteLogSegment remoteLogSegment;
+    final RemoteLogSegment remoteLogSegment;
     private final RemoteLogDownloadFuture downloadFuture;
 
     private final int posInLogSegment;
@@ -80,4 +80,18 @@ class RemotePendingFetch implements PendingFetch {
                 fetchOffset,
                 downloadFuture.getRecycleCallback());
     }
+
+    @Override
+    public String toString() {
+        return "RemotePendingFetch{"
+                + "remoteLogSegment="
+                + remoteLogSegment
+                + ", fetchOffset="
+                + fetchOffset
+                + ", posInLogSegment="
+                + posInLogSegment
+                + ", highWatermark="
+                + highWatermark
+                + '}';
+    }
 }
diff --git 
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBufferTest.java
 
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBufferTest.java
index 777b094a2..eb976fab2 100644
--- 
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBufferTest.java
+++ 
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBufferTest.java
@@ -142,24 +142,20 @@ public class LogFetchBufferTest {
             logFetchBuffer.pend(makePendingFetch(tableBucket1));
             logFetchBuffer.pend(makePendingFetch(tableBucket2));
             logFetchBuffer.pend(makePendingFetch(tableBucket3));
-            // TODO these tests need to add back after remove the hack logic in
-            // LogFetchBuffer#bufferedBuckets()
 
-            //            assertThat(logFetchBuffer.bufferedBuckets())
-            //                    .containsExactlyInAnyOrder(tableBucket1, 
tableBucket2,
-            // tableBucket3);
+            assertThat(logFetchBuffer.bufferedBuckets())
+                    .containsExactlyInAnyOrder(tableBucket1, tableBucket2, 
tableBucket3);
             assertThat(logFetchBuffer.pendedBuckets())
                     .containsExactlyInAnyOrder(tableBucket1, tableBucket2, 
tableBucket3);
 
             logFetchBuffer.retainAll(new HashSet<>(Arrays.asList(tableBucket2, 
tableBucket3)));
-            //            assertThat(logFetchBuffer.bufferedBuckets())
-            //                    .containsExactlyInAnyOrder(tableBucket2, 
tableBucket3);
+            assertThat(logFetchBuffer.bufferedBuckets())
+                    .containsExactlyInAnyOrder(tableBucket2, tableBucket3);
             assertThat(logFetchBuffer.pendedBuckets())
                     .containsExactlyInAnyOrder(tableBucket2, tableBucket3);
 
             logFetchBuffer.retainAll(Collections.singleton(tableBucket3));
-            //
-            // 
assertThat(logFetchBuffer.bufferedBuckets()).containsExactlyInAnyOrder(tableBucket3);
+            
assertThat(logFetchBuffer.bufferedBuckets()).containsExactlyInAnyOrder(tableBucket3);
             
assertThat(logFetchBuffer.pendedBuckets()).containsExactlyInAnyOrder(tableBucket3);
 
             logFetchBuffer.retainAll(Collections.emptySet());
@@ -202,41 +198,52 @@ public class LogFetchBufferTest {
 
             assertThat(logFetchBuffer.isEmpty()).isTrue();
             AtomicBoolean completed1 = new AtomicBoolean(false);
-            logFetchBuffer.pend(makePendingFetch(tableBucket1, completed1));
+            PendingFetch pending1 = makePendingFetch(tableBucket1, completed1);
+            logFetchBuffer.pend(pending1);
             // pending fetches are not counted as completed fetches.
             assertThat(logFetchBuffer.isEmpty()).isTrue();
 
+            // tableBucket3 competed fetch will not be pended
             logFetchBuffer.add(makeCompletedFetch(tableBucket3));
-            // competed fetches will be pended if there is any pending fetches
+            assertThat(logFetchBuffer.isEmpty()).isFalse();
+            
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket3);
+            // tableBucket1 competed fetches will be pended
+            logFetchBuffer.add(makeCompletedFetch(tableBucket1));
             assertThat(logFetchBuffer.isEmpty()).isTrue();
 
             AtomicBoolean completed2 = new AtomicBoolean(false);
-            logFetchBuffer.pend(makePendingFetch(tableBucket2, completed2));
-            logFetchBuffer.pend(makePendingFetch(tableBucket3));
-            logFetchBuffer.pend(makePendingFetch(tableBucket3));
+            PendingFetch pending2 = makePendingFetch(tableBucket2, completed2);
+            PendingFetch pending3 = makePendingFetch(tableBucket3);
+            PendingFetch pending4 = makePendingFetch(tableBucket3);
+            logFetchBuffer.pend(pending2);
+            logFetchBuffer.pend(pending3);
+            logFetchBuffer.pend(pending4);
 
             Future<Boolean> signal =
                     service.submit(() -> await(logFetchBuffer, 
Duration.ofSeconds(1)));
-            logFetchBuffer.tryComplete();
-            // nothing happen
+            logFetchBuffer.tryComplete(pending1.tableBucket());
+            // nothing happen, as pending1 is not completed
             assertThat(logFetchBuffer.isEmpty()).isTrue();
             // no condition signal
             assertThat(signal.get()).isFalse();
 
             signal = service.submit(() -> await(logFetchBuffer, 
Duration.ofMinutes(1)));
             completed1.set(true);
-            logFetchBuffer.tryComplete();
+            logFetchBuffer.tryComplete(pending1.tableBucket());
             assertThat(signal.get()).isTrue();
             assertThat(logFetchBuffer.isEmpty()).isFalse();
             
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket1);
-            
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket3);
+            // the buffered complected fetch will be available now
+            
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket1);
             assertThat(logFetchBuffer.isEmpty()).isTrue();
 
             signal = service.submit(() -> await(logFetchBuffer, 
Duration.ofMinutes(1)));
             completed2.set(true);
-            logFetchBuffer.tryComplete();
+            logFetchBuffer.tryComplete(pending2.tableBucket());
             assertThat(signal.get()).isTrue();
             assertThat(logFetchBuffer.isEmpty()).isFalse();
+            logFetchBuffer.tryComplete(pending3.tableBucket());
+            logFetchBuffer.tryComplete(pending4.tableBucket());
             
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket2);
             
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket3);
             
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket3);
diff --git 
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java
 
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java
index fe740863d..61e649d3f 100644
--- 
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java
+++ 
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java
@@ -20,6 +20,7 @@ package com.alibaba.fluss.client.table.scanner.log;
 import com.alibaba.fluss.client.metrics.ScannerMetricGroup;
 import com.alibaba.fluss.client.metrics.TestingScannerMetricGroup;
 import com.alibaba.fluss.client.table.scanner.RemoteFileDownloader;
+import 
com.alibaba.fluss.client.table.scanner.log.RemoteLogDownloader.RemoteLogDownloadRequest;
 import com.alibaba.fluss.config.ConfigOptions;
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.fs.FsPath;
@@ -29,23 +30,32 @@ import com.alibaba.fluss.remote.RemoteLogSegment;
 import com.alibaba.fluss.utils.FileUtils;
 import com.alibaba.fluss.utils.IOUtils;
 
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static com.alibaba.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
 import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID;
 import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
 import static 
com.alibaba.fluss.testutils.DataTestUtils.genRemoteLogSegmentFile;
 import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
+import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitUntil;
 import static com.alibaba.fluss.utils.FlussPaths.remoteLogDir;
 import static com.alibaba.fluss.utils.FlussPaths.remoteLogTabletDir;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -57,9 +67,7 @@ class RemoteLogDownloaderTest {
     private @TempDir File localDir;
     private FsPath remoteLogDir;
     private Configuration conf;
-    private RemoteFileDownloader remoteFileDownloader;
     private ScannerMetricGroup scannerMetricGroup;
-    private RemoteLogDownloader remoteLogDownloader;
 
     @BeforeEach
     void beforeEach() {
@@ -68,77 +76,231 @@ class RemoteLogDownloaderTest {
         conf.set(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR, 
localDir.getAbsolutePath());
         conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM, 4);
         remoteLogDir = remoteLogDir(conf);
-        remoteFileDownloader = new RemoteFileDownloader(1);
         scannerMetricGroup = TestingScannerMetricGroup.newInstance();
-        remoteLogDownloader =
+    }
+
+    @Test
+    void testPrefetchNum() throws Exception {
+        RemoteFileDownloader remoteFileDownloader = new 
RemoteFileDownloader(1);
+        RemoteLogDownloader remoteLogDownloader =
+                new RemoteLogDownloader(
+                        DATA1_TABLE_PATH, conf, remoteFileDownloader, 
scannerMetricGroup, 10L);
+        try {
+            // trigger auto download.
+            remoteLogDownloader.start();
+
+            Path localLogDir = remoteLogDownloader.getLocalLogDir();
+            TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0);
+            List<RemoteLogSegment> remoteLogSegments =
+                    buildRemoteLogSegmentList(tb, DATA1_PHYSICAL_TABLE_PATH, 
5, conf, 10);
+            FsPath remoteLogTabletDir =
+                    remoteLogTabletDir(remoteLogDir, 
DATA1_PHYSICAL_TABLE_PATH, tb);
+            List<RemoteLogDownloadFuture> futures =
+                    requestRemoteLogs(remoteLogDownloader, remoteLogTabletDir, 
remoteLogSegments);
+
+            // the first 4 segments should success.
+            retry(
+                    Duration.ofMinutes(1),
+                    () -> {
+                        for (int i = 0; i < 4; i++) {
+                            assertThat(futures.get(i).isDone()).isTrue();
+                        }
+                    });
+
+            
assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(4);
+            
assertThat(scannerMetricGroup.remoteFetchRequestCount().getCount()).isEqualTo(4);
+            assertThat(scannerMetricGroup.remoteFetchBytes().getCount())
+                    .isEqualTo(
+                            remoteLogSegmentFilesLength(remoteLogSegments, 
remoteLogTabletDir, 4));
+            
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(0);
+
+            futures.get(0).getRecycleCallback().run();
+            // the 5th segment should success.
+            retry(Duration.ofMinutes(1), () -> 
assertThat(futures.get(4).isDone()).isTrue());
+            
assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(4);
+            
assertThat(scannerMetricGroup.remoteFetchRequestCount().getCount()).isEqualTo(5);
+            assertThat(scannerMetricGroup.remoteFetchBytes().getCount())
+                    .isEqualTo(
+                            remoteLogSegmentFilesLength(remoteLogSegments, 
remoteLogTabletDir, 5));
+            
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(0);
+
+            futures.get(1).getRecycleCallback().run();
+            futures.get(2).getRecycleCallback().run();
+            
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(2);
+            // the removal of log files are async, so we need to wait for the 
removal.
+            retry(
+                    Duration.ofMinutes(1),
+                    () -> 
assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(2));
+
+            // test cleanup
+            remoteLogDownloader.close();
+            assertThat(localLogDir.toFile().exists()).isFalse();
+        } finally {
+            IOUtils.closeQuietly(remoteLogDownloader);
+            IOUtils.closeQuietly(remoteFileDownloader);
+        }
+    }
+
+    @Test
+    void testDownloadLogInParallelAndInPriority() throws Exception {
+        class TestRemoteFileDownloader extends RemoteFileDownloader {
+            final Set<String> threadNames = Collections.synchronizedSet(new 
HashSet<>());
+
+            private TestRemoteFileDownloader(int threadNum) {
+                super(threadNum);
+            }
+
+            @Override
+            protected long downloadFile(Path targetFilePath, FsPath 
remoteFilePath)
+                    throws IOException {
+                threadNames.add(Thread.currentThread().getName());
+                return super.downloadFile(targetFilePath, remoteFilePath);
+            }
+        }
+
+        // prepare the environment, 4 download threads, pre-fetch 4 segments, 
10 segments to fetch.
+        TestRemoteFileDownloader fileDownloader = new 
TestRemoteFileDownloader(4);
+        RemoteLogDownloader remoteLogDownloader =
                 new RemoteLogDownloader(
                         DATA1_TABLE_PATH,
-                        conf,
-                        remoteFileDownloader,
+                        conf, // max 4 pre-fetch num
+                        fileDownloader,
                         scannerMetricGroup,
-                        // use a short timout for faster testing
                         10L);
-    }
+        TableBucket bucket1 = new TableBucket(DATA1_TABLE_ID, 1);
+        TableBucket bucket2 = new TableBucket(DATA1_TABLE_ID, 2);
+        TableBucket bucket3 = new TableBucket(DATA1_TABLE_ID, 3);
+        TableBucket bucket4 = new TableBucket(DATA1_TABLE_ID, 4);
+        try {
+            // prepare segments, 4 buckets with different maxTimestamp, total 
10 segments
+            int totalSegments = 10;
+            List<RemoteLogSegment> remoteLogSegments =
+                    buildRemoteLogSegmentList(bucket1, 
DATA1_PHYSICAL_TABLE_PATH, 6, conf, 10);
+            remoteLogSegments.addAll(
+                    buildRemoteLogSegmentList(bucket3, 
DATA1_PHYSICAL_TABLE_PATH, 1, conf, 5));
+            remoteLogSegments.addAll(
+                    buildRemoteLogSegmentList(bucket2, 
DATA1_PHYSICAL_TABLE_PATH, 1, conf, 1));
+            remoteLogSegments.addAll(
+                    buildRemoteLogSegmentList(bucket3, 
DATA1_PHYSICAL_TABLE_PATH, 1, conf, 15));
+            remoteLogSegments.addAll(
+                    buildRemoteLogSegmentList(bucket4, 
DATA1_PHYSICAL_TABLE_PATH, 1, conf, 8));
+
+            Map<UUID, RemoteLogDownloadFuture> futures = new HashMap<>();
+            for (RemoteLogSegment segment : remoteLogSegments) {
+                FsPath remoteLogTabletDir =
+                        remoteLogTabletDir(
+                                remoteLogDir, DATA1_PHYSICAL_TABLE_PATH, 
segment.tableBucket());
+                RemoteLogDownloadFuture future =
+                        
remoteLogDownloader.requestRemoteLog(remoteLogTabletDir, segment);
+                futures.put(segment.remoteLogSegmentId(), future);
+            }
+
+            // start the downloader after requests are added to have 
deterministic request order.
+            remoteLogDownloader.start();
+
+            // check the segments are fetched in priority order.
+            
remoteLogSegments.sort(Comparator.comparingLong(RemoteLogSegment::maxTimestamp));
+            List<RemoteLogDownloadFuture> top4Futures = new ArrayList<>();
+            for (int i = 0; i < 4; i++) {
+                RemoteLogSegment segment = remoteLogSegments.get(i);
+                top4Futures.add(futures.get(segment.remoteLogSegmentId()));
+            }
+
+            // 4 to fetch.
+            retry(
+                    Duration.ofMinutes(1),
+                    () -> {
+                        for (RemoteLogDownloadFuture future : top4Futures) {
+                            assertThat(future.isDone()).isTrue();
+                        }
+                    });
+            // make sure 4 threads are used.
+            assertThat(fileDownloader.threadNames.size()).isEqualTo(4);
+            // only 4 segments are pre-fetched.
+            
assertThat(remoteLogDownloader.getSizeOfSegmentsToFetch()).isEqualTo(totalSegments
 - 4);
 
-    @AfterEach
-    void afterEach() {
-        if (remoteLogDownloader != null) {
+            for (int i = 3; i < totalSegments; i++) {
+                RemoteLogSegment segment = remoteLogSegments.get(i);
+                RemoteLogDownloadFuture future = 
futures.get(segment.remoteLogSegmentId());
+                waitUntil(future::isDone, Duration.ofMinutes(1), "segment 
download timeout");
+                // recycle the one segment to trigger download next segment
+                future.getRecycleCallback().run();
+            }
+
+            // all segments are fetched.
+            
assertThat(remoteLogDownloader.getSizeOfSegmentsToFetch()).isEqualTo(0);
+        } finally {
+            IOUtils.closeQuietly(fileDownloader);
             IOUtils.closeQuietly(remoteLogDownloader);
         }
-        if (remoteFileDownloader != null) {
-            IOUtils.closeQuietly(remoteFileDownloader);
-        }
     }
 
     @Test
-    void testPrefetchNum() throws Exception {
-        Path localLogDir = remoteLogDownloader.getLocalLogDir();
-        TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0);
-        List<RemoteLogSegment> remoteLogSegments =
-                buildRemoteLogSegmentList(tb, DATA1_PHYSICAL_TABLE_PATH, 5, 
conf);
-        FsPath remoteLogTabletDir = remoteLogTabletDir(remoteLogDir, 
DATA1_PHYSICAL_TABLE_PATH, tb);
-        List<RemoteLogDownloadFuture> futures =
-                requestRemoteLogs(remoteLogTabletDir, remoteLogSegments);
-
-        // the first 4 segments should success.
-        retry(
-                Duration.ofMinutes(1),
-                () -> {
-                    for (int i = 0; i < 4; i++) {
-                        assertThat(futures.get(i).isDone()).isTrue();
-                    }
-                });
-
-        assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(4);
-        
assertThat(scannerMetricGroup.remoteFetchRequestCount().getCount()).isEqualTo(4);
-        assertThat(scannerMetricGroup.remoteFetchBytes().getCount())
-                .isEqualTo(remoteLogSegmentFilesLength(remoteLogSegments, 
remoteLogTabletDir, 4));
-        
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(0);
-
-        futures.get(0).getRecycleCallback().run();
-        // the 5th segment should success.
-        retry(Duration.ofMinutes(1), () -> 
assertThat(futures.get(4).isDone()).isTrue());
-        assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(4);
-        
assertThat(scannerMetricGroup.remoteFetchRequestCount().getCount()).isEqualTo(5);
-        assertThat(scannerMetricGroup.remoteFetchBytes().getCount())
-                .isEqualTo(remoteLogSegmentFilesLength(remoteLogSegments, 
remoteLogTabletDir, 5));
-        
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(0);
-
-        futures.get(1).getRecycleCallback().run();
-        futures.get(2).getRecycleCallback().run();
-        
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(2);
-        // the removal of log files are async, so we need to wait for the 
removal.
-        retry(
-                Duration.ofMinutes(1),
-                () -> 
assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(2));
-
-        // test cleanup
-        remoteLogDownloader.close();
-        assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(0);
+    void testOrderOfRemoteLogDownloadRequest() {
+        TableBucket bucket1 = new TableBucket(DATA1_TABLE_ID, 1);
+        TableBucket bucket2 = new TableBucket(DATA1_TABLE_ID, 2);
+        TableBucket bucket3 = new TableBucket(DATA1_TABLE_ID, 3);
+
+        List<RemoteLogDownloadRequest> requests =
+                Arrays.asList(
+                        // different offset, same timestamp and bucket
+                        createDownloadRequest(bucket1, 10, 10),
+                        createDownloadRequest(bucket1, 20, 10),
+                        createDownloadRequest(bucket1, 30, 10),
+                        // -1 timestamp
+                        createDownloadRequest(bucket2, 10, -1),
+                        createDownloadRequest(bucket2, 20, -1),
+                        createDownloadRequest(bucket2, 30, -1),
+                        // 0 offset
+                        createDownloadRequest(bucket3, 0, 5),
+                        createDownloadRequest(bucket3, 0, 15),
+                        createDownloadRequest(bucket3, 0, 25));
+
+        // Sort the requests based on the custom comparator
+        Collections.sort(requests);
+        List<String> results =
+                requests.stream()
+                        .map(
+                                r ->
+                                        String.format(
+                                                "(bucket=%s, offset=%s, 
ts=%s)",
+                                                
r.segment.tableBucket().getBucket(),
+                                                
r.segment.remoteLogStartOffset(),
+                                                r.segment.maxTimestamp()))
+                        .collect(Collectors.toList());
+        List<String> expected =
+                Arrays.asList(
+                        "(bucket=2, offset=10, ts=-1)",
+                        "(bucket=2, offset=20, ts=-1)",
+                        "(bucket=2, offset=30, ts=-1)",
+                        "(bucket=3, offset=0, ts=5)",
+                        "(bucket=1, offset=10, ts=10)",
+                        "(bucket=1, offset=20, ts=10)",
+                        "(bucket=1, offset=30, ts=10)",
+                        "(bucket=3, offset=0, ts=15)",
+                        "(bucket=3, offset=0, ts=25)");
+        assertThat(results).isEqualTo(expected);
+    }
+
+    private RemoteLogDownloadRequest createDownloadRequest(
+            TableBucket tableBucket, long startOffset, long maxTimestamp) {
+        RemoteLogSegment remoteLogSegment =
+                RemoteLogSegment.Builder.builder()
+                        .tableBucket(tableBucket)
+                        .physicalTablePath(DATA1_PHYSICAL_TABLE_PATH)
+                        .remoteLogSegmentId(UUID.randomUUID())
+                        .remoteLogStartOffset(startOffset)
+                        .remoteLogEndOffset(startOffset + 10)
+                        .maxTimestamp(maxTimestamp)
+                        .segmentSizeInBytes(Integer.MAX_VALUE)
+                        .build();
+        return new RemoteLogDownloadRequest(remoteLogSegment, remoteLogDir);
     }
 
     private List<RemoteLogDownloadFuture> requestRemoteLogs(
-            FsPath remoteLogTabletDir, List<RemoteLogSegment> 
remoteLogSegments) {
+            RemoteLogDownloader remoteLogDownloader,
+            FsPath remoteLogTabletDir,
+            List<RemoteLogSegment> remoteLogSegments) {
         List<RemoteLogDownloadFuture> futures = new ArrayList<>();
         for (RemoteLogSegment segment : remoteLogSegments) {
             RemoteLogDownloadFuture future =
@@ -152,7 +314,8 @@ class RemoteLogDownloaderTest {
             TableBucket tableBucket,
             PhysicalTablePath physicalTablePath,
             int num,
-            Configuration conf)
+            Configuration conf,
+            long maxTimestamp)
             throws Exception {
         List<RemoteLogSegment> remoteLogSegmentList = new ArrayList<>();
         for (int i = 0; i < num; i++) {
@@ -165,6 +328,7 @@ class RemoteLogDownloaderTest {
                             .remoteLogSegmentId(segmentId)
                             .remoteLogStartOffset(baseOffset)
                             .remoteLogEndOffset(baseOffset + 9)
+                            .maxTimestamp(maxTimestamp)
                             .segmentSizeInBytes(Integer.MAX_VALUE)
                             .build();
             genRemoteLogSegmentFile(
diff --git 
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/util/CommonRpcMessageUtils.java 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/util/CommonRpcMessageUtils.java
index b3d4a82ac..6513c68eb 100644
--- 
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/util/CommonRpcMessageUtils.java
+++ 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/util/CommonRpcMessageUtils.java
@@ -169,6 +169,10 @@ public class CommonRpcMessageUtils {
                 PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tp, 
partitionName);
                 List<RemoteLogSegment> remoteLogSegmentList = new 
ArrayList<>();
                 for (PbRemoteLogSegment pbRemoteLogSegment : 
pbRlfInfo.getRemoteLogSegmentsList()) {
+                    long maxTimestamp =
+                            pbRemoteLogSegment.hasMaxTimestamp()
+                                    ? pbRemoteLogSegment.getMaxTimestamp()
+                                    : -1;
                     RemoteLogSegment remoteLogSegment =
                             RemoteLogSegment.Builder.builder()
                                     .tableBucket(tb)
@@ -180,7 +184,7 @@ public class CommonRpcMessageUtils {
                                     .remoteLogStartOffset(
                                             
pbRemoteLogSegment.getRemoteLogStartOffset())
                                     
.segmentSizeInBytes(pbRemoteLogSegment.getSegmentSizeInBytes())
-                                    .maxTimestamp(-1L) // not use.
+                                    .maxTimestamp(maxTimestamp)
                                     .build();
                     remoteLogSegmentList.add(remoteLogSegment);
                 }
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index bf2f9aaa6..ca1a4a1d7 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -799,6 +799,7 @@ message PbRemoteLogSegment {
   required int64 remote_log_start_offset = 2;
   required int64 remote_log_end_offset = 3;
   required int32 segment_size_in_bytes = 4;
+  optional int64 max_timestamp = 5;
 }
 
 message PbPartitionInfo {
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
index a1363e6b2..169fa2318 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
@@ -747,7 +747,8 @@ public class ServerRpcMessageUtils {
                                         .setRemoteLogSegmentId(
                                                 
logSegment.remoteLogSegmentId().toString())
                                         
.setRemoteLogEndOffset(logSegment.remoteLogEndOffset())
-                                        
.setSegmentSizeInBytes(logSegment.segmentSizeInBytes());
+                                        
.setSegmentSizeInBytes(logSegment.segmentSizeInBytes())
+                                        
.setMaxTimestamp(logSegment.maxTimestamp());
                         remoteLogSegmentList.add(pbRemoteLogSegment);
                     }
                     fetchLogRespForBucket

Reply via email to