Copilot commented on code in PR #1752:
URL: https://github.com/apache/fluss/pull/1752#discussion_r2877068773
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java:
##########
@@ -151,51 +152,56 @@ void fetchOnce() throws Exception {
return;
}
- try {
- // 1. cleanup the finished logs first to free up disk space
- cleanupRemoteLogs();
+ // the semaphore will only be released after the remote file are
downloaded to local
+ // successfully.
+ downloadRemoteLog(request, MAX_RETRY_COUNT,
System.currentTimeMillis());
+ }
- // 2. do the actual download work
- FsPathAndFileName fsPathAndFileName =
request.getFsPathAndFileName();
- scannerMetricGroup.remoteFetchRequestCount().inc();
-
- long startTime = System.currentTimeMillis();
- // download the remote file to local
- remoteFileDownloader
- .downloadFileAsync(fsPathAndFileName, localLogDir)
- .whenComplete(
- (bytes, throwable) -> {
- if (throwable != null) {
- LOG.error(
- "Failed to download remote log
segment file {}.",
- fsPathAndFileName.getFileName(),
-
ExceptionUtils.stripExecutionException(throwable));
- // release the semaphore for the failed
request
- prefetchSemaphore.release();
- // add back the request to the queue,
- // so we do not complete the
request.future here
- segmentsToFetch.add(request);
-
scannerMetricGroup.remoteFetchErrorCount().inc();
- } else {
- LOG.info(
- "Successfully downloaded remote
log segment file {} to local cost {} ms.",
- fsPathAndFileName.getFileName(),
- System.currentTimeMillis() -
startTime);
- File localFile =
- new File(
- localLogDir.toFile(),
-
fsPathAndFileName.getFileName());
-
scannerMetricGroup.remoteFetchBytes().inc(bytes);
- request.future.complete(localFile);
- }
- });
- } catch (Throwable t) {
- prefetchSemaphore.release();
- // add back the request to the queue
- segmentsToFetch.add(request);
- scannerMetricGroup.remoteFetchErrorCount().inc();
- // log the error and continue instead of shutdown the download
thread
- LOG.error("Failed to download remote log segment.", t);
+ private void downloadRemoteLog(
+ RemoteLogDownloadRequest request, int retryCount, long startTime) {
+ // 1. cleanup the finished logs first to free up disk space
+ cleanupRemoteLogs();
+
+ // 2. do the actual download work
+ FsPathAndFileName fsPathAndFileName = request.getFsPathAndFileName();
+ scannerMetricGroup.remoteFetchRequestCount().inc();
+ remoteFileDownloader
+ .downloadFileAsync(fsPathAndFileName, localLogDir)
+ .whenComplete(
+ (bytes, throwable) -> {
+ if (throwable != null) {
+ handleFetchException(request, throwable,
retryCount, startTime);
+ } else {
+ LOG.info(
+ "Successfully downloaded remote log
segment file {} to local cost {} ms.",
+ fsPathAndFileName.getFileName(),
+ System.currentTimeMillis() -
startTime);
+ File localFile =
+ new File(
+ localLogDir.toFile(),
+
fsPathAndFileName.getFileName());
+
scannerMetricGroup.remoteFetchBytes().inc(bytes);
+ request.future.complete(localFile);
+ }
+ });
Review Comment:
`downloadRemoteLog` no longer guards against synchronous exceptions from
`cleanupRemoteLogs()` / `downloadFileAsync(...)` (e.g., bad path / FS init
failure). If an exception is thrown before the `whenComplete` is attached, it
will bubble out of `fetchOnce()` and can terminate the download thread without
completing `request.future`, leaving callers blocked. Wrap the body in a
try/catch and route failures through the same error-handling path (completing
the future exceptionally after retries).
```suggestion
try {
// 1. cleanup the finished logs first to free up disk space
cleanupRemoteLogs();
// 2. do the actual download work
FsPathAndFileName fsPathAndFileName =
request.getFsPathAndFileName();
scannerMetricGroup.remoteFetchRequestCount().inc();
remoteFileDownloader
.downloadFileAsync(fsPathAndFileName, localLogDir)
.whenComplete(
(bytes, throwable) -> {
if (throwable != null) {
handleFetchException(
request, throwable, retryCount,
startTime);
} else {
LOG.info(
"Successfully downloaded remote
log segment file {} to local cost {} ms.",
fsPathAndFileName.getFileName(),
System.currentTimeMillis() -
startTime);
File localFile =
new File(
localLogDir.toFile(),
fsPathAndFileName.getFileName());
scannerMetricGroup.remoteFetchBytes().inc(bytes);
request.future.complete(localFile);
}
});
} catch (Throwable t) {
handleFetchException(request, t, retryCount, startTime);
}
```
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java:
##########
@@ -151,51 +152,56 @@ void fetchOnce() throws Exception {
return;
}
- try {
- // 1. cleanup the finished logs first to free up disk space
- cleanupRemoteLogs();
+ // the semaphore will only be released after the remote file are
downloaded to local
+ // successfully.
+ downloadRemoteLog(request, MAX_RETRY_COUNT,
System.currentTimeMillis());
+ }
- // 2. do the actual download work
- FsPathAndFileName fsPathAndFileName =
request.getFsPathAndFileName();
- scannerMetricGroup.remoteFetchRequestCount().inc();
-
- long startTime = System.currentTimeMillis();
- // download the remote file to local
- remoteFileDownloader
- .downloadFileAsync(fsPathAndFileName, localLogDir)
- .whenComplete(
- (bytes, throwable) -> {
- if (throwable != null) {
- LOG.error(
- "Failed to download remote log
segment file {}.",
- fsPathAndFileName.getFileName(),
-
ExceptionUtils.stripExecutionException(throwable));
- // release the semaphore for the failed
request
- prefetchSemaphore.release();
- // add back the request to the queue,
- // so we do not complete the
request.future here
- segmentsToFetch.add(request);
-
scannerMetricGroup.remoteFetchErrorCount().inc();
- } else {
- LOG.info(
- "Successfully downloaded remote
log segment file {} to local cost {} ms.",
- fsPathAndFileName.getFileName(),
- System.currentTimeMillis() -
startTime);
- File localFile =
- new File(
- localLogDir.toFile(),
-
fsPathAndFileName.getFileName());
-
scannerMetricGroup.remoteFetchBytes().inc(bytes);
- request.future.complete(localFile);
- }
- });
- } catch (Throwable t) {
- prefetchSemaphore.release();
- // add back the request to the queue
- segmentsToFetch.add(request);
- scannerMetricGroup.remoteFetchErrorCount().inc();
- // log the error and continue instead of shutdown the download
thread
- LOG.error("Failed to download remote log segment.", t);
+ private void downloadRemoteLog(
+ RemoteLogDownloadRequest request, int retryCount, long startTime) {
+ // 1. cleanup the finished logs first to free up disk space
+ cleanupRemoteLogs();
+
+ // 2. do the actual download work
+ FsPathAndFileName fsPathAndFileName = request.getFsPathAndFileName();
+ scannerMetricGroup.remoteFetchRequestCount().inc();
+ remoteFileDownloader
+ .downloadFileAsync(fsPathAndFileName, localLogDir)
+ .whenComplete(
+ (bytes, throwable) -> {
+ if (throwable != null) {
+ handleFetchException(request, throwable,
retryCount, startTime);
+ } else {
+ LOG.info(
+ "Successfully downloaded remote log
segment file {} to local cost {} ms.",
+ fsPathAndFileName.getFileName(),
+ System.currentTimeMillis() -
startTime);
+ File localFile =
+ new File(
+ localLogDir.toFile(),
+
fsPathAndFileName.getFileName());
+
scannerMetricGroup.remoteFetchBytes().inc(bytes);
+ request.future.complete(localFile);
+ }
+ });
+ }
+
+ private void handleFetchException(
+ RemoteLogDownloadRequest request, Throwable throwable, int
retryCount, long startTime) {
+ LOG.error(
+ "Failed to download remote log segment file {}.",
+ request.getFsPathAndFileName().getFileName(),
+ ExceptionUtils.stripExecutionException(throwable));
+ scannerMetricGroup.remoteFetchErrorCount().inc();
+ if (retryCount <= 0) {
+ downloadRemoteLog(request, retryCount - 1, startTime);
+ } else {
+ request.future.completeExceptionally(
+ new IOException(
+ String.format(
+ "Failed to download remote log segment
file %s, retry count %d",
+
request.getFsPathAndFileName().getFileName(), MAX_RETRY_COUNT),
+ throwable));
Review Comment:
`handleFetchException` has the retry condition inverted: with
`MAX_RETRY_COUNT` (5) the code immediately completes the future exceptionally
and never retries. Also, the current `if (retryCount <= 0)
downloadRemoteLog(request, retryCount - 1, ...)` path would recurse forever
with negative retry counts if it were ever reached. Flip the condition to retry
while `retryCount > 0` (decrementing each attempt), and complete exceptionally
once retries are exhausted (without scheduling another download with a negative
count).
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java:
##########
@@ -109,10 +113,20 @@ void tryComplete(TableBucket tableBucket) {
while (pendings != null && !pendings.isEmpty()) {
PendingFetch pendingFetch = pendings.peek();
if (pendingFetch.isCompleted()) {
- CompletedFetch completedFetch =
pendingFetch.toCompletedFetch();
- completedFetches.add(completedFetch);
- pendings.poll();
- hasCompleted = true;
+ try {
+ CompletedFetch completedFetch =
pendingFetch.toCompletedFetch();
+ completedFetches.add(completedFetch);
+ pendings.poll();
+ hasCompleted = true;
+ } catch (Throwable t) {
+ LOG.error(
+ "Failed to complete fetch for
tableBucket: {}",
+ tableBucket,
+ t);
+ throwable = t;
Review Comment:
In `tryComplete`, if `pendingFetch.toCompletedFetch()` throws, you set
`throwable = t` and return without signaling `notEmptyCondition`. Any thread
blocked in `awaitNotEmpty()` will continue waiting indefinitely (since `pend()`
does not signal and no completed fetch was added). Signal the condition when
recording `throwable` so waiters can wake up and observe the failure via
`peek()`/`poll()`.
```suggestion
throwable = t;
notEmptyCondition.signalAll();
```
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java:
##########
@@ -73,17 +75,19 @@ public class LogFetchBuffer implements AutoCloseable {
@GuardedBy("lock")
private @Nullable CompletedFetch nextInLineFetch;
+ @GuardedBy("lock")
+ private @Nullable Throwable throwable;
+
public LogFetchBuffer() {
this.completedFetches = new LinkedList<>();
}
/**
- * Returns {@code true} if there are no completed fetches pending to
return to the user.
- *
- * @return {@code true} if the buffer is empty, {@code false} otherwise
+ * @return {@code true} if there are no completed fetches pending to
return to the user or some
+ * error occurs, {@code false} otherwise
Review Comment:
The `isEmpty()` JavaDoc is now inconsistent with the implementation. The
method returns `false` when `throwable != null` (treating an error like “not
empty” so it can be surfaced), but the JavaDoc says it returns `true` when
“some error occurs”. Update the JavaDoc to match the actual semantics (or
adjust the method if the JavaDoc is intended).
```suggestion
* @return {@code true} if there are no completed fetches pending to
return to the user and no
* error has been recorded, {@code false} otherwise
```
##########
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java:
##########
@@ -282,6 +284,52 @@ void testOrderOfRemoteLogDownloadRequest() {
assertThat(results).isEqualTo(expected);
}
+ @Test
+ void testFetchException() {
+ RemoteFileDownloader remoteFileDownloader = new
RemoteFileDownloader(1);
+ RemoteLogDownloader remoteLogDownloader =
+ new RemoteLogDownloader(
+ DATA1_TABLE_PATH, conf, remoteFileDownloader,
scannerMetricGroup, 10L);
+ try {
+ // trigger auto download.
+ remoteLogDownloader.start();
+
+ TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0);
+ FsPath fsPath = new FsPath("test:///empty");
+ RemoteLogSegment nonExistLogSegment =
+ RemoteLogSegment.Builder.builder()
+ .tableBucket(tb)
+ .physicalTablePath(DATA1_PHYSICAL_TABLE_PATH)
+ .remoteLogSegmentId(UUID.randomUUID())
+ .remoteLogStartOffset(1)
+ .remoteLogEndOffset(2)
+ .maxTimestamp(2)
+ .segmentSizeInBytes(Integer.MAX_VALUE)
+ .build();
+
+ RemoteLogDownloadFuture remoteLogDownloadFuture =
+ remoteLogDownloader.requestRemoteLog(fsPath,
nonExistLogSegment);
+ retry(
+ Duration.ofMinutes(1),
+ () ->
assertThat(remoteLogDownloadFuture.isDone()).isTrue());
+ AbstractThrowableAssert<?, ?> exactlyInstanceOf =
+ assertThatThrownBy(() ->
remoteLogDownloadFuture.getFileLogRecords(1))
+ .cause()
+ .isExactlyInstanceOf(IOException.class)
+ .hasMessageContaining(
+ String.format(
+ "Failed to download remote log
segment file %s, retry count 5",
+
RemoteLogDownloader.getFsPathAndFileName(
+ fsPath,
nonExistLogSegment)
+ .getFileName()))
+ .rootCause();
+
Review Comment:
`exactlyInstanceOf` is assigned but never used, which adds noise to the test
and may trigger unused-variable warnings in some builds. Remove the local
variable (and `AbstractThrowableAssert` import if it becomes unused).
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java:
##########
@@ -161,7 +162,7 @@ public boolean hasAvailableFetches() {
return !logFetchBuffer.isEmpty();
}
- public Map<TableBucket, List<ScanRecord>> collectFetch() {
+ public Map<TableBucket, List<ScanRecord>> collectFetch() throws
FetchException {
return logFetchCollector.collectFetch(logFetchBuffer);
}
Review Comment:
After moving to “fail fast” semantics (completing the download future
exceptionally), the existing remote-download completion callback in
`pendRemoteFetches` is still registered via
`RemoteLogDownloadFuture.onComplete(...)`, which currently only runs on
*successful* completion (`thenRun`). This means a failed remote download may
never trigger `logFetchBuffer.tryComplete(...)`, so the exception can be
silently stuck in the pending queue and the scanner may hang instead of
failing. Consider making the callback run for both success and failure (e.g.,
use a `whenComplete`-style hook) so `tryComplete` can surface the failure via
the new exception path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]