This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 420194a6e [client] RemoteLogDownloader supports download log files in
parallel and in priority (#1579)
420194a6e is described below
commit 420194a6e182f9eda317c378a56df5fe0dd93cf1
Author: Jark Wu <[email protected]>
AuthorDate: Mon Aug 25 19:01:25 2025 +0800
[client] RemoteLogDownloader supports download log files in parallel and in
priority (#1579)
---
.../client/table/scanner/RemoteFileDownloader.java | 68 ++++-
.../client/table/scanner/log/LogFetchBuffer.java | 69 ++---
.../table/scanner/log/LogFetchCollector.java | 4 +-
.../fluss/client/table/scanner/log/LogFetcher.java | 5 +-
.../table/scanner/log/RemoteLogDownloader.java | 160 +++++++-----
.../table/scanner/log/RemotePendingFetch.java | 16 +-
.../table/scanner/log/LogFetchBufferTest.java | 45 ++--
.../table/scanner/log/RemoteLogDownloaderTest.java | 286 ++++++++++++++++-----
.../fluss/rpc/util/CommonRpcMessageUtils.java | 6 +-
fluss-rpc/src/main/proto/FlussApi.proto | 1 +
.../fluss/server/utils/ServerRpcMessageUtils.java | 3 +-
11 files changed, 463 insertions(+), 200 deletions(-)
diff --git
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/RemoteFileDownloader.java
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/RemoteFileDownloader.java
index d2c81b9a5..bcde64769 100644
---
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/RemoteFileDownloader.java
+++
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/RemoteFileDownloader.java
@@ -17,30 +17,38 @@
package com.alibaba.fluss.client.table.scanner;
+import com.alibaba.fluss.fs.FSDataInputStream;
+import com.alibaba.fluss.fs.FileSystem;
+import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.fs.FsPathAndFileName;
import com.alibaba.fluss.fs.utils.FileDownloadSpec;
import com.alibaba.fluss.fs.utils.FileDownloadUtils;
import com.alibaba.fluss.utils.CloseableRegistry;
+import com.alibaba.fluss.utils.IOUtils;
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
import java.io.Closeable;
import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
- * The downloader to download the remote files (like kv snapshots files, log
segment files) to be
- * read.
+ * The downloader that has a IO thread pool to download the remote files (like
kv snapshots files,
+ * log segment files).
*/
public class RemoteFileDownloader implements Closeable {
- protected final ExecutorService snapshotDownLoadThreadPool;
+ protected final ExecutorService downloadThreadPool;
public RemoteFileDownloader(int threadNum) {
- snapshotDownLoadThreadPool =
+ downloadThreadPool =
Executors.newFixedThreadPool(
threadNum,
new ExecutorThreadFactory(
@@ -52,9 +60,55 @@ public class RemoteFileDownloader implements Closeable {
Thread.currentThread().getContextClassLoader()));
}
+ /**
+ * Downloads the file from the given remote file path to the target
directory asynchronously,
+ * returns a Future object of the number of downloaded bytes. The Future
will fail if the
+ * download fails after retrying for RETRY_COUNT times.
+ */
+ public CompletableFuture<Long> downloadFileAsync(
+ FsPathAndFileName fsPathAndFileName, Path targetDirectory) {
+ CompletableFuture<Long> future = new CompletableFuture<>();
+ downloadThreadPool.submit(
+ () -> {
+ try {
+ Path targetFilePath =
+
targetDirectory.resolve(fsPathAndFileName.getFileName());
+ FsPath remoteFilePath = fsPathAndFileName.getPath();
+ long downloadBytes = downloadFile(targetFilePath,
remoteFilePath);
+ future.complete(downloadBytes);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ });
+ return future;
+ }
+
+ /**
+ * Copies the file from a remote file path to the given target file path,
returns the number of
+ * downloaded bytes.
+ */
+ protected long downloadFile(Path targetFilePath, FsPath remoteFilePath)
throws IOException {
+ List<Closeable> closeableRegistry = new ArrayList<>(2);
+ try {
+ FileSystem fileSystem = remoteFilePath.getFileSystem();
+ FSDataInputStream inputStream = fileSystem.open(remoteFilePath);
+ closeableRegistry.add(inputStream);
+
+ Files.createDirectories(targetFilePath.getParent());
+ OutputStream outputStream = Files.newOutputStream(targetFilePath);
+ closeableRegistry.add(outputStream);
+
+ return IOUtils.copyBytes(inputStream, outputStream, false);
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ } finally {
+ closeableRegistry.forEach(IOUtils::closeQuietly);
+ }
+ }
+
@Override
public void close() throws IOException {
- snapshotDownLoadThreadPool.shutdownNow();
+ downloadThreadPool.shutdownNow();
}
public void transferAllToDirectory(
@@ -65,8 +119,6 @@ public class RemoteFileDownloader implements Closeable {
FileDownloadSpec fileDownloadSpec =
new FileDownloadSpec(fsPathAndFileNames, targetDirectory);
FileDownloadUtils.transferAllDataToDirectory(
- Collections.singleton(fileDownloadSpec),
- closeableRegistry,
- snapshotDownLoadThreadPool);
+ Collections.singleton(fileDownloadSpec), closeableRegistry,
downloadThreadPool);
}
}
diff --git
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBuffer.java
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBuffer.java
index 5441bd90e..de2b09175 100644
---
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBuffer.java
+++
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBuffer.java
@@ -30,8 +30,10 @@ import javax.annotation.concurrent.ThreadSafe;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -66,7 +68,7 @@ public class LogFetchBuffer implements AutoCloseable {
private final LinkedList<CompletedFetch> completedFetches;
@GuardedBy("lock")
- private final LinkedList<PendingFetch> pendingFetches = new LinkedList<>();
+ private final Map<TableBucket, LinkedList<PendingFetch>> pendingFetches =
new HashMap<>();
@GuardedBy("lock")
private @Nullable CompletedFetch nextInLineFetch;
@@ -88,7 +90,9 @@ public class LogFetchBuffer implements AutoCloseable {
inLock(
lock,
() -> {
- pendingFetches.add(pendingFetch);
+ pendingFetches
+ .computeIfAbsent(pendingFetch.tableBucket(), k ->
new LinkedList<>())
+ .add(pendingFetch);
});
}
@@ -96,17 +100,18 @@ public class LogFetchBuffer implements AutoCloseable {
* Tries to complete the pending fetches in order, convert them into
completed fetches in the
* buffer.
*/
- void tryComplete() {
+ void tryComplete(TableBucket tableBucket) {
inLock(
lock,
() -> {
boolean hasCompleted = false;
- while (!pendingFetches.isEmpty()) {
- PendingFetch pendingFetch = pendingFetches.peek();
+ LinkedList<PendingFetch> pendings =
this.pendingFetches.get(tableBucket);
+ while (pendings != null && !pendings.isEmpty()) {
+ PendingFetch pendingFetch = pendings.peek();
if (pendingFetch.isCompleted()) {
CompletedFetch completedFetch =
pendingFetch.toCompletedFetch();
completedFetches.add(completedFetch);
- pendingFetches.poll();
+ pendings.poll();
hasCompleted = true;
} else {
break;
@@ -114,6 +119,10 @@ public class LogFetchBuffer implements AutoCloseable {
}
if (hasCompleted) {
notEmptyCondition.signalAll();
+ // clear the bucket entry if there is no pending
fetches for the bucket.
+ if (pendings.isEmpty()) {
+ this.pendingFetches.remove(tableBucket);
+ }
}
});
}
@@ -122,11 +131,13 @@ public class LogFetchBuffer implements AutoCloseable {
inLock(
lock,
() -> {
- if (pendingFetches.isEmpty()) {
+ LinkedList<PendingFetch> pendings =
+ pendingFetches.get(completedFetch.tableBucket);
+ if (pendings == null || pendings.isEmpty()) {
completedFetches.add(completedFetch);
notEmptyCondition.signalAll();
} else {
- pendingFetches.add(new
CompletedPendingFetch(completedFetch));
+ pendings.add(new
CompletedPendingFetch(completedFetch));
}
});
}
@@ -135,17 +146,7 @@ public class LogFetchBuffer implements AutoCloseable {
if (completedFetches == null || completedFetches.isEmpty()) {
return;
}
- inLock(
- lock,
- () -> {
- if (pendingFetches.isEmpty()) {
- this.completedFetches.addAll(completedFetches);
- notEmptyCondition.signalAll();
- } else {
- completedFetches.forEach(
- cf -> pendingFetches.add(new
CompletedPendingFetch(cf)));
- }
- });
+ inLock(lock, () -> completedFetches.forEach(this::add));
}
CompletedFetch nextInLineFetch() {
@@ -225,7 +226,8 @@ public class LogFetchBuffer implements AutoCloseable {
nextInLineFetch = null;
}
- pendingFetches.removeIf(pf ->
!buckets.contains(pf.tableBucket()));
+ // remove entries that not matches the buckets from
pendingFetches
+ pendingFetches.entrySet().removeIf(entry ->
!buckets.contains(entry.getKey()));
});
}
@@ -233,12 +235,12 @@ public class LogFetchBuffer implements AutoCloseable {
* Drains (i.e. <em>removes</em>) the contents of the given {@link
CompletedFetch} as its data
* should not be returned to the user.
*/
- private boolean maybeDrain(Set<TableBucket> buckets, CompletedFetch
completedFetch) {
- if (completedFetch != null &&
!buckets.contains(completedFetch.tableBucket)) {
+ private boolean maybeDrain(Set<TableBucket> excludedBuckets,
CompletedFetch completedFetch) {
+ if (completedFetch != null &&
!excludedBuckets.contains(completedFetch.tableBucket)) {
LOG.debug(
"Removing {} from buffered fetch data as it is not in the
set of buckets to retain ({})",
completedFetch.tableBucket,
- buckets);
+ excludedBuckets);
completedFetch.drain();
return true;
} else {
@@ -256,36 +258,19 @@ public class LogFetchBuffer implements AutoCloseable {
return inLock(
lock,
() -> {
- // If there are any pending fetches which have not been
added to
- // completedFetches, we will return null. For example, a
possible scenario is
- // that the remote log downloader can not download remote
log as soon as
- // possible. In this case, we can't return any buckets to
avoid OOM cause by the
- // frequently fetch log request send to server to fetch
log back, which the
- // fetch data can not consume timely and will be buffered
in memory.
- // TODO this is a hack logic to avoid OOM, we should fix
it later to refactor
- // the remote log download logic.
- if (!pendingFetches.isEmpty()) {
- return null;
- }
-
final Set<TableBucket> buckets = new HashSet<>();
if (nextInLineFetch != null &&
!nextInLineFetch.isConsumed()) {
buckets.add(nextInLineFetch.tableBucket);
}
completedFetches.forEach(cf ->
buckets.add(cf.tableBucket));
+ buckets.addAll(pendingFetches.keySet());
return buckets;
});
}
/** Return the set of {@link TableBucket buckets} for which we have
pending fetches. */
Set<TableBucket> pendedBuckets() {
- return inLock(
- lock,
- () -> {
- final Set<TableBucket> buckets = new HashSet<>();
- pendingFetches.forEach(pf ->
buckets.add(pf.tableBucket()));
- return buckets;
- });
+ return inLock(lock, pendingFetches::keySet);
}
@Override
diff --git
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchCollector.java
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchCollector.java
index f847ac04c..fb6095f2a 100644
---
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchCollector.java
+++
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetchCollector.java
@@ -177,7 +177,7 @@ public class LogFetchCollector {
} else {
// these records aren't next in line based on the last
consumed offset, ignore them
// they must be from an obsolete request
- LOG.debug(
+ LOG.warn(
"Ignoring fetched records for {} at offset {} since
the current offset is {}",
nextInLineFetch.tableBucket,
nextInLineFetch.nextFetchOffset(),
@@ -228,7 +228,7 @@ public class LogFetchCollector {
return null;
}
if (offset != fetchOffset) {
- LOG.debug(
+ LOG.warn(
"Discarding stale fetch response for bucket {} since its
offset {} does not match the expected offset {}.",
tb,
fetchOffset,
diff --git
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java
index 4c939f9b5..a11a71ca6 100644
---
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java
+++
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java
@@ -142,6 +142,7 @@ public class LogFetcher implements Closeable {
this.scannerMetricGroup = scannerMetricGroup;
this.remoteLogDownloader =
new RemoteLogDownloader(tablePath, conf, remoteFileDownloader,
scannerMetricGroup);
+ remoteLogDownloader.start();
}
/**
@@ -364,7 +365,7 @@ public class LogFetcher implements Closeable {
}
RemoteLogDownloadFuture downloadFuture =
remoteLogDownloader.requestRemoteLog(remoteLogTabletDir,
segment);
- PendingFetch pendingFetch =
+ RemotePendingFetch pendingFetch =
new RemotePendingFetch(
segment,
downloadFuture,
@@ -375,7 +376,7 @@ public class LogFetcher implements Closeable {
logScannerStatus,
isCheckCrcs);
logFetchBuffer.pend(pendingFetch);
- downloadFuture.onComplete(logFetchBuffer::tryComplete);
+ downloadFuture.onComplete(() ->
logFetchBuffer.tryComplete(segment.tableBucket()));
}
}
diff --git
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloader.java
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloader.java
index e36e9a04c..c3f357eac 100644
---
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloader.java
+++
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloader.java
@@ -27,9 +27,8 @@ import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.fs.FsPathAndFileName;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.remote.RemoteLogSegment;
-import com.alibaba.fluss.utils.CloseableRegistry;
+import com.alibaba.fluss.utils.ExceptionUtils;
import com.alibaba.fluss.utils.FlussPaths;
-import com.alibaba.fluss.utils.MapUtils;
import com.alibaba.fluss.utils.concurrent.ShutdownableThread;
import org.slf4j.Logger;
@@ -43,18 +42,16 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import static com.alibaba.fluss.utils.FileUtils.deleteFileOrDirectory;
+import static com.alibaba.fluss.utils.FileUtils.deleteDirectoryQuietly;
import static com.alibaba.fluss.utils.FlussPaths.LOG_FILE_SUFFIX;
-import static com.alibaba.fluss.utils.FlussPaths.filenamePrefixFromOffset;
import static com.alibaba.fluss.utils.FlussPaths.remoteLogSegmentDir;
import static com.alibaba.fluss.utils.FlussPaths.remoteLogSegmentFile;
@@ -68,13 +65,15 @@ public class RemoteLogDownloader implements Closeable {
private final Path localLogDir;
- private final BlockingQueue<RemoteLogDownloadRequest> segmentsToFetch;
+ /**
+ * A queue to hold the remote log segment files to be fetched. The queue
is ordered by the
+ * max_timestamp of the remote log segment. So we download the remote log
segments from the
+ * older to the newer.
+ */
+ private final PriorityBlockingQueue<RemoteLogDownloadRequest>
segmentsToFetch;
private final BlockingQueue<RemoteLogSegment> segmentsToRecycle;
- // <log_segment_id -> segment_uuid_path>
- private final ConcurrentHashMap<String, Path> fetchedFiles;
-
private final Semaphore prefetchSemaphore;
private final DownloadRemoteLogThread downloadThread;
@@ -101,9 +100,8 @@ public class RemoteLogDownloader implements Closeable {
RemoteFileDownloader remoteFileDownloader,
ScannerMetricGroup scannerMetricGroup,
long pollTimeout) {
- this.segmentsToFetch = new LinkedBlockingQueue<>();
+ this.segmentsToFetch = new PriorityBlockingQueue<>();
this.segmentsToRecycle = new LinkedBlockingQueue<>();
- this.fetchedFiles = MapUtils.newConcurrentHashMap();
this.remoteFileDownloader = remoteFileDownloader;
this.scannerMetricGroup = scannerMetricGroup;
this.pollTimeout = pollTimeout;
@@ -116,6 +114,9 @@ public class RemoteLogDownloader implements Closeable {
conf.get(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR),
"remote-logs-" + UUID.randomUUID());
this.downloadThread = new DownloadRemoteLogThread(tablePath);
+ }
+
+ public void start() {
downloadThread.start();
}
@@ -140,39 +141,54 @@ public class RemoteLogDownloader implements Closeable {
* to fetch.
*/
void fetchOnce() throws Exception {
+ // blocks until there is capacity (the fetched file is consumed)
+ prefetchSemaphore.acquire();
+
// wait until there is a remote fetch request
RemoteLogDownloadRequest request = segmentsToFetch.poll(pollTimeout,
TimeUnit.MILLISECONDS);
if (request == null) {
+ prefetchSemaphore.release();
return;
}
- // blocks until there is capacity (the fetched file is consumed)
- prefetchSemaphore.acquire();
+
try {
// 1. cleanup the finished logs first to free up disk space
cleanupRemoteLogs();
// 2. do the actual download work
FsPathAndFileName fsPathAndFileName =
request.getFsPathAndFileName();
- Path segmentPath =
localLogDir.resolve(request.segment.remoteLogSegmentId().toString());
scannerMetricGroup.remoteFetchRequestCount().inc();
- // download the remote file to local
- LOG.info(
- "Start to download remote log segment file {} to local.",
- fsPathAndFileName.getFileName());
+
long startTime = System.currentTimeMillis();
- remoteFileDownloader.transferAllToDirectory(
- Collections.singletonList(fsPathAndFileName),
- segmentPath,
- new CloseableRegistry());
- LOG.info(
- "Download remote log segment file {} to local cost {} ms.",
- fsPathAndFileName.getFileName(),
- System.currentTimeMillis() - startTime);
- File localFile = new File(segmentPath.toFile(),
fsPathAndFileName.getFileName());
- scannerMetricGroup.remoteFetchBytes().inc(localFile.length());
- String segmentId = request.segment.remoteLogSegmentId().toString();
- fetchedFiles.put(segmentId, segmentPath);
- request.future.complete(localFile);
+ // download the remote file to local
+ remoteFileDownloader
+ .downloadFileAsync(fsPathAndFileName, localLogDir)
+ .whenComplete(
+ (bytes, throwable) -> {
+ if (throwable != null) {
+ LOG.error(
+ "Failed to download remote log
segment file {}.",
+ fsPathAndFileName.getFileName(),
+
ExceptionUtils.stripExecutionException(throwable));
+ // release the semaphore for the failed
request
+ prefetchSemaphore.release();
+ // add back the request to the queue,
+ // so we do not complete the
request.future here
+ segmentsToFetch.add(request);
+
scannerMetricGroup.remoteFetchErrorCount().inc();
+ } else {
+ LOG.info(
+ "Successfully downloaded remote
log segment file {} to local cost {} ms.",
+ fsPathAndFileName.getFileName(),
+ System.currentTimeMillis() -
startTime);
+ File localFile =
+ new File(
+ localLogDir.toFile(),
+
fsPathAndFileName.getFileName());
+
scannerMetricGroup.remoteFetchBytes().inc(bytes);
+ request.future.complete(localFile);
+ }
+ });
} catch (Throwable t) {
prefetchSemaphore.release();
// add back the request to the queue
@@ -191,24 +207,15 @@ public class RemoteLogDownloader implements Closeable {
}
private void cleanupFinishedRemoteLog(RemoteLogSegment segment) {
- String segmentId = segment.remoteLogSegmentId().toString();
- Path segmentPath = fetchedFiles.remove(segmentId);
- if (segmentPath != null) {
- try {
- Path logFile =
- segmentPath.resolve(
-
filenamePrefixFromOffset(segment.remoteLogStartOffset())
- + LOG_FILE_SUFFIX);
- Files.deleteIfExists(logFile);
- Files.deleteIfExists(segmentPath);
- LOG.info(
- "Consumed and deleted the fetched log segment file
{}/{} for bucket {}.",
- segmentPath.getFileName(),
- logFile.getFileName(),
- segment.tableBucket());
- } catch (IOException e) {
- LOG.warn("Failed to delete the fetch segment path {}.",
segmentPath, e);
- }
+ try {
+ Path logFile =
localLogDir.resolve(getLocalFileNameOfRemoteSegment(segment));
+ Files.deleteIfExists(logFile);
+ LOG.info(
+ "Consumed and deleted the fetched log segment file {} for
bucket {}.",
+ logFile.getFileName(),
+ segment.tableBucket());
+ } catch (IOException e) {
+ LOG.warn("Failed to delete the local fetch segment file {}.",
localLogDir, e);
}
}
@@ -219,11 +226,8 @@ public class RemoteLogDownloader implements Closeable {
} catch (InterruptedException e) {
// ignore
}
- // cleanup all downloaded files
- for (Path segmentPath : fetchedFiles.values()) {
- deleteFileOrDirectory(segmentPath.toFile());
- }
- fetchedFiles.clear();
+
+ deleteDirectoryQuietly(localLogDir.toFile());
}
@VisibleForTesting
@@ -236,16 +240,34 @@ public class RemoteLogDownloader implements Closeable {
return localLogDir;
}
+ @VisibleForTesting
+ int getSizeOfSegmentsToFetch() {
+ return segmentsToFetch.size();
+ }
+
protected static FsPathAndFileName getFsPathAndFileName(
FsPath remoteLogTabletDir, RemoteLogSegment segment) {
FsPath remotePath =
remoteLogSegmentFile(
remoteLogSegmentDir(remoteLogTabletDir,
segment.remoteLogSegmentId()),
segment.remoteLogStartOffset());
- return new FsPathAndFileName(
- remotePath,
-
FlussPaths.filenamePrefixFromOffset(segment.remoteLogStartOffset())
- + LOG_FILE_SUFFIX);
+ return new FsPathAndFileName(remotePath,
getLocalFileNameOfRemoteSegment(segment));
+ }
+
+ /**
+ * Get the local file name of the remote log segment.
+ *
+ * <p>The file name is in pattern:
+ *
+ * <pre>
+ * ${remote_segment_id}_${offset_prefix}.log
+ * </pre>
+ */
+ private static String getLocalFileNameOfRemoteSegment(RemoteLogSegment
segment) {
+ return segment.remoteLogSegmentId()
+ + "_"
+ +
FlussPaths.filenamePrefixFromOffset(segment.remoteLogStartOffset())
+ + LOG_FILE_SUFFIX;
}
/**
@@ -265,10 +287,10 @@ public class RemoteLogDownloader implements Closeable {
}
/** Represents a request to download a remote log segment file to local. */
- private static class RemoteLogDownloadRequest {
- private final RemoteLogSegment segment;
- private final FsPath remoteLogTabletDir;
- private final CompletableFuture<File> future = new
CompletableFuture<>();
+ static class RemoteLogDownloadRequest implements
Comparable<RemoteLogDownloadRequest> {
+ final RemoteLogSegment segment;
+ final FsPath remoteLogTabletDir;
+ final CompletableFuture<File> future = new CompletableFuture<>();
public RemoteLogDownloadRequest(RemoteLogSegment segment, FsPath
remoteLogTabletDir) {
this.segment = segment;
@@ -278,5 +300,17 @@ public class RemoteLogDownloader implements Closeable {
public FsPathAndFileName getFsPathAndFileName() {
return
RemoteLogDownloader.getFsPathAndFileName(remoteLogTabletDir, segment);
}
+
+ @Override
+ public int compareTo(RemoteLogDownloadRequest o) {
+ if (segment.tableBucket().equals(o.segment.tableBucket())) {
+ // strictly download in the offset order if they belong to the
same bucket
+ return Long.compare(
+ segment.remoteLogStartOffset(),
o.segment.remoteLogStartOffset());
+ } else {
+ // download segment from old to new across buckets
+ return Long.compare(segment.maxTimestamp(),
o.segment.maxTimestamp());
+ }
+ }
}
}
diff --git
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemotePendingFetch.java
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemotePendingFetch.java
index eae93a606..c61fcc56a 100644
---
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemotePendingFetch.java
+++
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemotePendingFetch.java
@@ -28,7 +28,7 @@ import com.alibaba.fluss.remote.RemoteLogSegment;
*/
class RemotePendingFetch implements PendingFetch {
- private final RemoteLogSegment remoteLogSegment;
+ final RemoteLogSegment remoteLogSegment;
private final RemoteLogDownloadFuture downloadFuture;
private final int posInLogSegment;
@@ -80,4 +80,18 @@ class RemotePendingFetch implements PendingFetch {
fetchOffset,
downloadFuture.getRecycleCallback());
}
+
+ @Override
+ public String toString() {
+ return "RemotePendingFetch{"
+ + "remoteLogSegment="
+ + remoteLogSegment
+ + ", fetchOffset="
+ + fetchOffset
+ + ", posInLogSegment="
+ + posInLogSegment
+ + ", highWatermark="
+ + highWatermark
+ + '}';
+ }
}
diff --git
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBufferTest.java
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBufferTest.java
index 777b094a2..eb976fab2 100644
---
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBufferTest.java
+++
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetchBufferTest.java
@@ -142,24 +142,20 @@ public class LogFetchBufferTest {
logFetchBuffer.pend(makePendingFetch(tableBucket1));
logFetchBuffer.pend(makePendingFetch(tableBucket2));
logFetchBuffer.pend(makePendingFetch(tableBucket3));
- // TODO these tests need to add back after remove the hack logic in
- // LogFetchBuffer#bufferedBuckets()
- // assertThat(logFetchBuffer.bufferedBuckets())
- // .containsExactlyInAnyOrder(tableBucket1,
tableBucket2,
- // tableBucket3);
+ assertThat(logFetchBuffer.bufferedBuckets())
+ .containsExactlyInAnyOrder(tableBucket1, tableBucket2,
tableBucket3);
assertThat(logFetchBuffer.pendedBuckets())
.containsExactlyInAnyOrder(tableBucket1, tableBucket2,
tableBucket3);
logFetchBuffer.retainAll(new HashSet<>(Arrays.asList(tableBucket2,
tableBucket3)));
- // assertThat(logFetchBuffer.bufferedBuckets())
- // .containsExactlyInAnyOrder(tableBucket2,
tableBucket3);
+ assertThat(logFetchBuffer.bufferedBuckets())
+ .containsExactlyInAnyOrder(tableBucket2, tableBucket3);
assertThat(logFetchBuffer.pendedBuckets())
.containsExactlyInAnyOrder(tableBucket2, tableBucket3);
logFetchBuffer.retainAll(Collections.singleton(tableBucket3));
- //
- //
assertThat(logFetchBuffer.bufferedBuckets()).containsExactlyInAnyOrder(tableBucket3);
+
assertThat(logFetchBuffer.bufferedBuckets()).containsExactlyInAnyOrder(tableBucket3);
assertThat(logFetchBuffer.pendedBuckets()).containsExactlyInAnyOrder(tableBucket3);
logFetchBuffer.retainAll(Collections.emptySet());
@@ -202,41 +198,52 @@ public class LogFetchBufferTest {
assertThat(logFetchBuffer.isEmpty()).isTrue();
AtomicBoolean completed1 = new AtomicBoolean(false);
- logFetchBuffer.pend(makePendingFetch(tableBucket1, completed1));
+ PendingFetch pending1 = makePendingFetch(tableBucket1, completed1);
+ logFetchBuffer.pend(pending1);
// pending fetches are not counted as completed fetches.
assertThat(logFetchBuffer.isEmpty()).isTrue();
+ // tableBucket3 competed fetch will not be pended
logFetchBuffer.add(makeCompletedFetch(tableBucket3));
- // competed fetches will be pended if there is any pending fetches
+ assertThat(logFetchBuffer.isEmpty()).isFalse();
+
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket3);
+ // tableBucket1 competed fetches will be pended
+ logFetchBuffer.add(makeCompletedFetch(tableBucket1));
assertThat(logFetchBuffer.isEmpty()).isTrue();
AtomicBoolean completed2 = new AtomicBoolean(false);
- logFetchBuffer.pend(makePendingFetch(tableBucket2, completed2));
- logFetchBuffer.pend(makePendingFetch(tableBucket3));
- logFetchBuffer.pend(makePendingFetch(tableBucket3));
+ PendingFetch pending2 = makePendingFetch(tableBucket2, completed2);
+ PendingFetch pending3 = makePendingFetch(tableBucket3);
+ PendingFetch pending4 = makePendingFetch(tableBucket3);
+ logFetchBuffer.pend(pending2);
+ logFetchBuffer.pend(pending3);
+ logFetchBuffer.pend(pending4);
Future<Boolean> signal =
service.submit(() -> await(logFetchBuffer,
Duration.ofSeconds(1)));
- logFetchBuffer.tryComplete();
- // nothing happen
+ logFetchBuffer.tryComplete(pending1.tableBucket());
+ // nothing happen, as pending1 is not completed
assertThat(logFetchBuffer.isEmpty()).isTrue();
// no condition signal
assertThat(signal.get()).isFalse();
signal = service.submit(() -> await(logFetchBuffer,
Duration.ofMinutes(1)));
completed1.set(true);
- logFetchBuffer.tryComplete();
+ logFetchBuffer.tryComplete(pending1.tableBucket());
assertThat(signal.get()).isTrue();
assertThat(logFetchBuffer.isEmpty()).isFalse();
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket1);
-
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket3);
+ // the buffered complected fetch will be available now
+
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket1);
assertThat(logFetchBuffer.isEmpty()).isTrue();
signal = service.submit(() -> await(logFetchBuffer,
Duration.ofMinutes(1)));
completed2.set(true);
- logFetchBuffer.tryComplete();
+ logFetchBuffer.tryComplete(pending2.tableBucket());
assertThat(signal.get()).isTrue();
assertThat(logFetchBuffer.isEmpty()).isFalse();
+ logFetchBuffer.tryComplete(pending3.tableBucket());
+ logFetchBuffer.tryComplete(pending4.tableBucket());
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket2);
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket3);
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket3);
diff --git
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java
index fe740863d..61e649d3f 100644
---
a/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java
+++
b/fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java
@@ -20,6 +20,7 @@ package com.alibaba.fluss.client.table.scanner.log;
import com.alibaba.fluss.client.metrics.ScannerMetricGroup;
import com.alibaba.fluss.client.metrics.TestingScannerMetricGroup;
import com.alibaba.fluss.client.table.scanner.RemoteFileDownloader;
+import
com.alibaba.fluss.client.table.scanner.log.RemoteLogDownloader.RemoteLogDownloadRequest;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.fs.FsPath;
@@ -29,23 +30,32 @@ import com.alibaba.fluss.remote.RemoteLogSegment;
import com.alibaba.fluss.utils.FileUtils;
import com.alibaba.fluss.utils.IOUtils;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
+import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
import static com.alibaba.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
import static
com.alibaba.fluss.testutils.DataTestUtils.genRemoteLogSegmentFile;
import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
+import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitUntil;
import static com.alibaba.fluss.utils.FlussPaths.remoteLogDir;
import static com.alibaba.fluss.utils.FlussPaths.remoteLogTabletDir;
import static org.assertj.core.api.Assertions.assertThat;
@@ -57,9 +67,7 @@ class RemoteLogDownloaderTest {
private @TempDir File localDir;
private FsPath remoteLogDir;
private Configuration conf;
- private RemoteFileDownloader remoteFileDownloader;
private ScannerMetricGroup scannerMetricGroup;
- private RemoteLogDownloader remoteLogDownloader;
@BeforeEach
void beforeEach() {
@@ -68,77 +76,231 @@ class RemoteLogDownloaderTest {
conf.set(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR,
localDir.getAbsolutePath());
conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM, 4);
remoteLogDir = remoteLogDir(conf);
- remoteFileDownloader = new RemoteFileDownloader(1);
scannerMetricGroup = TestingScannerMetricGroup.newInstance();
- remoteLogDownloader =
+ }
+
+ @Test
+ void testPrefetchNum() throws Exception {
+ RemoteFileDownloader remoteFileDownloader = new
RemoteFileDownloader(1);
+ RemoteLogDownloader remoteLogDownloader =
+ new RemoteLogDownloader(
+ DATA1_TABLE_PATH, conf, remoteFileDownloader,
scannerMetricGroup, 10L);
+ try {
+ // trigger auto download.
+ remoteLogDownloader.start();
+
+ Path localLogDir = remoteLogDownloader.getLocalLogDir();
+ TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0);
+ List<RemoteLogSegment> remoteLogSegments =
+ buildRemoteLogSegmentList(tb, DATA1_PHYSICAL_TABLE_PATH,
5, conf, 10);
+ FsPath remoteLogTabletDir =
+ remoteLogTabletDir(remoteLogDir,
DATA1_PHYSICAL_TABLE_PATH, tb);
+ List<RemoteLogDownloadFuture> futures =
+ requestRemoteLogs(remoteLogDownloader, remoteLogTabletDir,
remoteLogSegments);
+
+ // the first 4 segments should success.
+ retry(
+ Duration.ofMinutes(1),
+ () -> {
+ for (int i = 0; i < 4; i++) {
+ assertThat(futures.get(i).isDone()).isTrue();
+ }
+ });
+
+
assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(4);
+
assertThat(scannerMetricGroup.remoteFetchRequestCount().getCount()).isEqualTo(4);
+ assertThat(scannerMetricGroup.remoteFetchBytes().getCount())
+ .isEqualTo(
+ remoteLogSegmentFilesLength(remoteLogSegments,
remoteLogTabletDir, 4));
+
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(0);
+
+ futures.get(0).getRecycleCallback().run();
+ // the 5th segment should success.
+ retry(Duration.ofMinutes(1), () ->
assertThat(futures.get(4).isDone()).isTrue());
+
assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(4);
+
assertThat(scannerMetricGroup.remoteFetchRequestCount().getCount()).isEqualTo(5);
+ assertThat(scannerMetricGroup.remoteFetchBytes().getCount())
+ .isEqualTo(
+ remoteLogSegmentFilesLength(remoteLogSegments,
remoteLogTabletDir, 5));
+
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(0);
+
+ futures.get(1).getRecycleCallback().run();
+ futures.get(2).getRecycleCallback().run();
+
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(2);
+ // the removal of log files are async, so we need to wait for the
removal.
+ retry(
+ Duration.ofMinutes(1),
+ () ->
assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(2));
+
+ // test cleanup
+ remoteLogDownloader.close();
+ assertThat(localLogDir.toFile().exists()).isFalse();
+ } finally {
+ IOUtils.closeQuietly(remoteLogDownloader);
+ IOUtils.closeQuietly(remoteFileDownloader);
+ }
+ }
+
+ @Test
+ void testDownloadLogInParallelAndInPriority() throws Exception {
+ class TestRemoteFileDownloader extends RemoteFileDownloader {
+ final Set<String> threadNames = Collections.synchronizedSet(new
HashSet<>());
+
+ private TestRemoteFileDownloader(int threadNum) {
+ super(threadNum);
+ }
+
+ @Override
+ protected long downloadFile(Path targetFilePath, FsPath
remoteFilePath)
+ throws IOException {
+ threadNames.add(Thread.currentThread().getName());
+ return super.downloadFile(targetFilePath, remoteFilePath);
+ }
+ }
+
+ // prepare the environment, 4 download threads, pre-fetch 4 segments,
10 segments to fetch.
+ TestRemoteFileDownloader fileDownloader = new
TestRemoteFileDownloader(4);
+ RemoteLogDownloader remoteLogDownloader =
new RemoteLogDownloader(
DATA1_TABLE_PATH,
- conf,
- remoteFileDownloader,
+ conf, // max 4 pre-fetch num
+ fileDownloader,
scannerMetricGroup,
- // use a short timout for faster testing
10L);
- }
+ TableBucket bucket1 = new TableBucket(DATA1_TABLE_ID, 1);
+ TableBucket bucket2 = new TableBucket(DATA1_TABLE_ID, 2);
+ TableBucket bucket3 = new TableBucket(DATA1_TABLE_ID, 3);
+ TableBucket bucket4 = new TableBucket(DATA1_TABLE_ID, 4);
+ try {
+ // prepare segments, 4 buckets with different maxTimestamp, total
10 segments
+ int totalSegments = 10;
+ List<RemoteLogSegment> remoteLogSegments =
+ buildRemoteLogSegmentList(bucket1,
DATA1_PHYSICAL_TABLE_PATH, 6, conf, 10);
+ remoteLogSegments.addAll(
+ buildRemoteLogSegmentList(bucket3,
DATA1_PHYSICAL_TABLE_PATH, 1, conf, 5));
+ remoteLogSegments.addAll(
+ buildRemoteLogSegmentList(bucket2,
DATA1_PHYSICAL_TABLE_PATH, 1, conf, 1));
+ remoteLogSegments.addAll(
+ buildRemoteLogSegmentList(bucket3,
DATA1_PHYSICAL_TABLE_PATH, 1, conf, 15));
+ remoteLogSegments.addAll(
+ buildRemoteLogSegmentList(bucket4,
DATA1_PHYSICAL_TABLE_PATH, 1, conf, 8));
+
+ Map<UUID, RemoteLogDownloadFuture> futures = new HashMap<>();
+ for (RemoteLogSegment segment : remoteLogSegments) {
+ FsPath remoteLogTabletDir =
+ remoteLogTabletDir(
+ remoteLogDir, DATA1_PHYSICAL_TABLE_PATH,
segment.tableBucket());
+ RemoteLogDownloadFuture future =
+
remoteLogDownloader.requestRemoteLog(remoteLogTabletDir, segment);
+ futures.put(segment.remoteLogSegmentId(), future);
+ }
+
+ // start the downloader after requests are added to have
deterministic request order.
+ remoteLogDownloader.start();
+
+ // check the segments are fetched in priority order.
+
remoteLogSegments.sort(Comparator.comparingLong(RemoteLogSegment::maxTimestamp));
+ List<RemoteLogDownloadFuture> top4Futures = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ RemoteLogSegment segment = remoteLogSegments.get(i);
+ top4Futures.add(futures.get(segment.remoteLogSegmentId()));
+ }
+
+ // 4 to fetch.
+ retry(
+ Duration.ofMinutes(1),
+ () -> {
+ for (RemoteLogDownloadFuture future : top4Futures) {
+ assertThat(future.isDone()).isTrue();
+ }
+ });
+ // make sure 4 threads are used.
+ assertThat(fileDownloader.threadNames.size()).isEqualTo(4);
+ // only 4 segments are pre-fetched.
+
assertThat(remoteLogDownloader.getSizeOfSegmentsToFetch()).isEqualTo(totalSegments
- 4);
- @AfterEach
- void afterEach() {
- if (remoteLogDownloader != null) {
+ for (int i = 3; i < totalSegments; i++) {
+ RemoteLogSegment segment = remoteLogSegments.get(i);
+ RemoteLogDownloadFuture future =
futures.get(segment.remoteLogSegmentId());
+ waitUntil(future::isDone, Duration.ofMinutes(1), "segment
download timeout");
+ // recycle the one segment to trigger download next segment
+ future.getRecycleCallback().run();
+ }
+
+ // all segments are fetched.
+
assertThat(remoteLogDownloader.getSizeOfSegmentsToFetch()).isEqualTo(0);
+ } finally {
+ IOUtils.closeQuietly(fileDownloader);
IOUtils.closeQuietly(remoteLogDownloader);
}
- if (remoteFileDownloader != null) {
- IOUtils.closeQuietly(remoteFileDownloader);
- }
}
@Test
- void testPrefetchNum() throws Exception {
- Path localLogDir = remoteLogDownloader.getLocalLogDir();
- TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0);
- List<RemoteLogSegment> remoteLogSegments =
- buildRemoteLogSegmentList(tb, DATA1_PHYSICAL_TABLE_PATH, 5,
conf);
- FsPath remoteLogTabletDir = remoteLogTabletDir(remoteLogDir,
DATA1_PHYSICAL_TABLE_PATH, tb);
- List<RemoteLogDownloadFuture> futures =
- requestRemoteLogs(remoteLogTabletDir, remoteLogSegments);
-
- // the first 4 segments should success.
- retry(
- Duration.ofMinutes(1),
- () -> {
- for (int i = 0; i < 4; i++) {
- assertThat(futures.get(i).isDone()).isTrue();
- }
- });
-
- assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(4);
-
assertThat(scannerMetricGroup.remoteFetchRequestCount().getCount()).isEqualTo(4);
- assertThat(scannerMetricGroup.remoteFetchBytes().getCount())
- .isEqualTo(remoteLogSegmentFilesLength(remoteLogSegments,
remoteLogTabletDir, 4));
-
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(0);
-
- futures.get(0).getRecycleCallback().run();
- // the 5th segment should success.
- retry(Duration.ofMinutes(1), () ->
assertThat(futures.get(4).isDone()).isTrue());
- assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(4);
-
assertThat(scannerMetricGroup.remoteFetchRequestCount().getCount()).isEqualTo(5);
- assertThat(scannerMetricGroup.remoteFetchBytes().getCount())
- .isEqualTo(remoteLogSegmentFilesLength(remoteLogSegments,
remoteLogTabletDir, 5));
-
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(0);
-
- futures.get(1).getRecycleCallback().run();
- futures.get(2).getRecycleCallback().run();
-
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(2);
- // the removal of log files are async, so we need to wait for the
removal.
- retry(
- Duration.ofMinutes(1),
- () ->
assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(2));
-
- // test cleanup
- remoteLogDownloader.close();
- assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(0);
+ void testOrderOfRemoteLogDownloadRequest() {
+ TableBucket bucket1 = new TableBucket(DATA1_TABLE_ID, 1);
+ TableBucket bucket2 = new TableBucket(DATA1_TABLE_ID, 2);
+ TableBucket bucket3 = new TableBucket(DATA1_TABLE_ID, 3);
+
+ List<RemoteLogDownloadRequest> requests =
+ Arrays.asList(
+ // different offset, same timestamp and bucket
+ createDownloadRequest(bucket1, 10, 10),
+ createDownloadRequest(bucket1, 20, 10),
+ createDownloadRequest(bucket1, 30, 10),
+ // -1 timestamp
+ createDownloadRequest(bucket2, 10, -1),
+ createDownloadRequest(bucket2, 20, -1),
+ createDownloadRequest(bucket2, 30, -1),
+ // 0 offset
+ createDownloadRequest(bucket3, 0, 5),
+ createDownloadRequest(bucket3, 0, 15),
+ createDownloadRequest(bucket3, 0, 25));
+
+ // Sort the requests based on the custom comparator
+ Collections.sort(requests);
+ List<String> results =
+ requests.stream()
+ .map(
+ r ->
+ String.format(
+ "(bucket=%s, offset=%s,
ts=%s)",
+
r.segment.tableBucket().getBucket(),
+
r.segment.remoteLogStartOffset(),
+ r.segment.maxTimestamp()))
+ .collect(Collectors.toList());
+ List<String> expected =
+ Arrays.asList(
+ "(bucket=2, offset=10, ts=-1)",
+ "(bucket=2, offset=20, ts=-1)",
+ "(bucket=2, offset=30, ts=-1)",
+ "(bucket=3, offset=0, ts=5)",
+ "(bucket=1, offset=10, ts=10)",
+ "(bucket=1, offset=20, ts=10)",
+ "(bucket=1, offset=30, ts=10)",
+ "(bucket=3, offset=0, ts=15)",
+ "(bucket=3, offset=0, ts=25)");
+ assertThat(results).isEqualTo(expected);
+ }
+
+ private RemoteLogDownloadRequest createDownloadRequest(
+ TableBucket tableBucket, long startOffset, long maxTimestamp) {
+ RemoteLogSegment remoteLogSegment =
+ RemoteLogSegment.Builder.builder()
+ .tableBucket(tableBucket)
+ .physicalTablePath(DATA1_PHYSICAL_TABLE_PATH)
+ .remoteLogSegmentId(UUID.randomUUID())
+ .remoteLogStartOffset(startOffset)
+ .remoteLogEndOffset(startOffset + 10)
+ .maxTimestamp(maxTimestamp)
+ .segmentSizeInBytes(Integer.MAX_VALUE)
+ .build();
+ return new RemoteLogDownloadRequest(remoteLogSegment, remoteLogDir);
}
private List<RemoteLogDownloadFuture> requestRemoteLogs(
- FsPath remoteLogTabletDir, List<RemoteLogSegment>
remoteLogSegments) {
+ RemoteLogDownloader remoteLogDownloader,
+ FsPath remoteLogTabletDir,
+ List<RemoteLogSegment> remoteLogSegments) {
List<RemoteLogDownloadFuture> futures = new ArrayList<>();
for (RemoteLogSegment segment : remoteLogSegments) {
RemoteLogDownloadFuture future =
@@ -152,7 +314,8 @@ class RemoteLogDownloaderTest {
TableBucket tableBucket,
PhysicalTablePath physicalTablePath,
int num,
- Configuration conf)
+ Configuration conf,
+ long maxTimestamp)
throws Exception {
List<RemoteLogSegment> remoteLogSegmentList = new ArrayList<>();
for (int i = 0; i < num; i++) {
@@ -165,6 +328,7 @@ class RemoteLogDownloaderTest {
.remoteLogSegmentId(segmentId)
.remoteLogStartOffset(baseOffset)
.remoteLogEndOffset(baseOffset + 9)
+ .maxTimestamp(maxTimestamp)
.segmentSizeInBytes(Integer.MAX_VALUE)
.build();
genRemoteLogSegmentFile(
diff --git
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/util/CommonRpcMessageUtils.java
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/util/CommonRpcMessageUtils.java
index b3d4a82ac..6513c68eb 100644
---
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/util/CommonRpcMessageUtils.java
+++
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/util/CommonRpcMessageUtils.java
@@ -169,6 +169,10 @@ public class CommonRpcMessageUtils {
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tp,
partitionName);
List<RemoteLogSegment> remoteLogSegmentList = new
ArrayList<>();
for (PbRemoteLogSegment pbRemoteLogSegment :
pbRlfInfo.getRemoteLogSegmentsList()) {
+ long maxTimestamp =
+ pbRemoteLogSegment.hasMaxTimestamp()
+ ? pbRemoteLogSegment.getMaxTimestamp()
+ : -1;
RemoteLogSegment remoteLogSegment =
RemoteLogSegment.Builder.builder()
.tableBucket(tb)
@@ -180,7 +184,7 @@ public class CommonRpcMessageUtils {
.remoteLogStartOffset(
pbRemoteLogSegment.getRemoteLogStartOffset())
.segmentSizeInBytes(pbRemoteLogSegment.getSegmentSizeInBytes())
- .maxTimestamp(-1L) // not use.
+ .maxTimestamp(maxTimestamp)
.build();
remoteLogSegmentList.add(remoteLogSegment);
}
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index bf2f9aaa6..ca1a4a1d7 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -799,6 +799,7 @@ message PbRemoteLogSegment {
required int64 remote_log_start_offset = 2;
required int64 remote_log_end_offset = 3;
required int32 segment_size_in_bytes = 4;
+ optional int64 max_timestamp = 5;
}
message PbPartitionInfo {
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
index a1363e6b2..169fa2318 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
@@ -747,7 +747,8 @@ public class ServerRpcMessageUtils {
.setRemoteLogSegmentId(
logSegment.remoteLogSegmentId().toString())
.setRemoteLogEndOffset(logSegment.remoteLogEndOffset())
-
.setSegmentSizeInBytes(logSegment.segmentSizeInBytes());
+
.setSegmentSizeInBytes(logSegment.segmentSizeInBytes())
+
.setMaxTimestamp(logSegment.maxTimestamp());
remoteLogSegmentList.add(pbRemoteLogSegment);
}
fetchLogRespForBucket