Copilot commented on code in PR #1752:
URL: https://github.com/apache/fluss/pull/1752#discussion_r2877068773


##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java:
##########
@@ -151,51 +152,56 @@ void fetchOnce() throws Exception {
             return;
         }
 
-        try {
-            // 1. cleanup the finished logs first to free up disk space
-            cleanupRemoteLogs();
+        // the semaphore will only be released after the remote file are 
downloaded to local
+        // successfully.
+        downloadRemoteLog(request, MAX_RETRY_COUNT, 
System.currentTimeMillis());
+    }
 
-            // 2. do the actual download work
-            FsPathAndFileName fsPathAndFileName = 
request.getFsPathAndFileName();
-            scannerMetricGroup.remoteFetchRequestCount().inc();
-
-            long startTime = System.currentTimeMillis();
-            // download the remote file to local
-            remoteFileDownloader
-                    .downloadFileAsync(fsPathAndFileName, localLogDir)
-                    .whenComplete(
-                            (bytes, throwable) -> {
-                                if (throwable != null) {
-                                    LOG.error(
-                                            "Failed to download remote log 
segment file {}.",
-                                            fsPathAndFileName.getFileName(),
-                                            
ExceptionUtils.stripExecutionException(throwable));
-                                    // release the semaphore for the failed 
request
-                                    prefetchSemaphore.release();
-                                    // add back the request to the queue,
-                                    // so we do not complete the 
request.future here
-                                    segmentsToFetch.add(request);
-                                    
scannerMetricGroup.remoteFetchErrorCount().inc();
-                                } else {
-                                    LOG.info(
-                                            "Successfully downloaded remote 
log segment file {} to local cost {} ms.",
-                                            fsPathAndFileName.getFileName(),
-                                            System.currentTimeMillis() - 
startTime);
-                                    File localFile =
-                                            new File(
-                                                    localLogDir.toFile(),
-                                                    
fsPathAndFileName.getFileName());
-                                    
scannerMetricGroup.remoteFetchBytes().inc(bytes);
-                                    request.future.complete(localFile);
-                                }
-                            });
-        } catch (Throwable t) {
-            prefetchSemaphore.release();
-            // add back the request to the queue
-            segmentsToFetch.add(request);
-            scannerMetricGroup.remoteFetchErrorCount().inc();
-            // log the error and continue instead of shutdown the download 
thread
-            LOG.error("Failed to download remote log segment.", t);
+    private void downloadRemoteLog(
+            RemoteLogDownloadRequest request, int retryCount, long startTime) {
+        // 1. cleanup the finished logs first to free up disk space
+        cleanupRemoteLogs();
+
+        // 2. do the actual download work
+        FsPathAndFileName fsPathAndFileName = request.getFsPathAndFileName();
+        scannerMetricGroup.remoteFetchRequestCount().inc();
+        remoteFileDownloader
+                .downloadFileAsync(fsPathAndFileName, localLogDir)
+                .whenComplete(
+                        (bytes, throwable) -> {
+                            if (throwable != null) {
+                                handleFetchException(request, throwable, 
retryCount, startTime);
+                            } else {
+                                LOG.info(
+                                        "Successfully downloaded remote log 
segment file {} to local cost {} ms.",
+                                        fsPathAndFileName.getFileName(),
+                                        System.currentTimeMillis() - 
startTime);
+                                File localFile =
+                                        new File(
+                                                localLogDir.toFile(),
+                                                
fsPathAndFileName.getFileName());
+                                
scannerMetricGroup.remoteFetchBytes().inc(bytes);
+                                request.future.complete(localFile);
+                            }
+                        });

Review Comment:
   `downloadRemoteLog` no longer guards against synchronous exceptions from 
`cleanupRemoteLogs()` / `downloadFileAsync(...)` (e.g., bad path / FS init 
failure). If an exception is thrown before the `whenComplete` is attached, it 
will bubble out of `fetchOnce()` and can terminate the download thread without 
completing `request.future`, leaving callers blocked. Wrap the body in a 
try/catch and route failures through the same error-handling path (completing 
the future exceptionally after retries).
   ```suggestion
           try {
               // 1. cleanup the finished logs first to free up disk space
               cleanupRemoteLogs();
   
               // 2. do the actual download work
               FsPathAndFileName fsPathAndFileName = 
request.getFsPathAndFileName();
               scannerMetricGroup.remoteFetchRequestCount().inc();
               remoteFileDownloader
                       .downloadFileAsync(fsPathAndFileName, localLogDir)
                       .whenComplete(
                               (bytes, throwable) -> {
                                   if (throwable != null) {
                                       handleFetchException(
                                               request, throwable, retryCount, 
startTime);
                                   } else {
                                       LOG.info(
                                               "Successfully downloaded remote 
log segment file {} to local cost {} ms.",
                                               fsPathAndFileName.getFileName(),
                                               System.currentTimeMillis() - 
startTime);
                                       File localFile =
                                               new File(
                                                       localLogDir.toFile(),
                                                       
fsPathAndFileName.getFileName());
                                       
scannerMetricGroup.remoteFetchBytes().inc(bytes);
                                       request.future.complete(localFile);
                                   }
                               });
           } catch (Throwable t) {
               handleFetchException(request, t, retryCount, startTime);
           }
   ```



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java:
##########
@@ -151,51 +152,56 @@ void fetchOnce() throws Exception {
             return;
         }
 
-        try {
-            // 1. cleanup the finished logs first to free up disk space
-            cleanupRemoteLogs();
+        // the semaphore will only be released after the remote file are 
downloaded to local
+        // successfully.
+        downloadRemoteLog(request, MAX_RETRY_COUNT, 
System.currentTimeMillis());
+    }
 
-            // 2. do the actual download work
-            FsPathAndFileName fsPathAndFileName = 
request.getFsPathAndFileName();
-            scannerMetricGroup.remoteFetchRequestCount().inc();
-
-            long startTime = System.currentTimeMillis();
-            // download the remote file to local
-            remoteFileDownloader
-                    .downloadFileAsync(fsPathAndFileName, localLogDir)
-                    .whenComplete(
-                            (bytes, throwable) -> {
-                                if (throwable != null) {
-                                    LOG.error(
-                                            "Failed to download remote log 
segment file {}.",
-                                            fsPathAndFileName.getFileName(),
-                                            
ExceptionUtils.stripExecutionException(throwable));
-                                    // release the semaphore for the failed 
request
-                                    prefetchSemaphore.release();
-                                    // add back the request to the queue,
-                                    // so we do not complete the 
request.future here
-                                    segmentsToFetch.add(request);
-                                    
scannerMetricGroup.remoteFetchErrorCount().inc();
-                                } else {
-                                    LOG.info(
-                                            "Successfully downloaded remote 
log segment file {} to local cost {} ms.",
-                                            fsPathAndFileName.getFileName(),
-                                            System.currentTimeMillis() - 
startTime);
-                                    File localFile =
-                                            new File(
-                                                    localLogDir.toFile(),
-                                                    
fsPathAndFileName.getFileName());
-                                    
scannerMetricGroup.remoteFetchBytes().inc(bytes);
-                                    request.future.complete(localFile);
-                                }
-                            });
-        } catch (Throwable t) {
-            prefetchSemaphore.release();
-            // add back the request to the queue
-            segmentsToFetch.add(request);
-            scannerMetricGroup.remoteFetchErrorCount().inc();
-            // log the error and continue instead of shutdown the download 
thread
-            LOG.error("Failed to download remote log segment.", t);
+    private void downloadRemoteLog(
+            RemoteLogDownloadRequest request, int retryCount, long startTime) {
+        // 1. cleanup the finished logs first to free up disk space
+        cleanupRemoteLogs();
+
+        // 2. do the actual download work
+        FsPathAndFileName fsPathAndFileName = request.getFsPathAndFileName();
+        scannerMetricGroup.remoteFetchRequestCount().inc();
+        remoteFileDownloader
+                .downloadFileAsync(fsPathAndFileName, localLogDir)
+                .whenComplete(
+                        (bytes, throwable) -> {
+                            if (throwable != null) {
+                                handleFetchException(request, throwable, 
retryCount, startTime);
+                            } else {
+                                LOG.info(
+                                        "Successfully downloaded remote log 
segment file {} to local cost {} ms.",
+                                        fsPathAndFileName.getFileName(),
+                                        System.currentTimeMillis() - 
startTime);
+                                File localFile =
+                                        new File(
+                                                localLogDir.toFile(),
+                                                
fsPathAndFileName.getFileName());
+                                
scannerMetricGroup.remoteFetchBytes().inc(bytes);
+                                request.future.complete(localFile);
+                            }
+                        });
+    }
+
+    private void handleFetchException(
+            RemoteLogDownloadRequest request, Throwable throwable, int 
retryCount, long startTime) {
+        LOG.error(
+                "Failed to download remote log segment file {}.",
+                request.getFsPathAndFileName().getFileName(),
+                ExceptionUtils.stripExecutionException(throwable));
+        scannerMetricGroup.remoteFetchErrorCount().inc();
+        if (retryCount <= 0) {
+            downloadRemoteLog(request, retryCount - 1, startTime);
+        } else {
+            request.future.completeExceptionally(
+                    new IOException(
+                            String.format(
+                                    "Failed to download remote log segment 
file %s, retry count %d",
+                                    
request.getFsPathAndFileName().getFileName(), MAX_RETRY_COUNT),
+                            throwable));

Review Comment:
   `handleFetchException` has the retry condition inverted: with 
`MAX_RETRY_COUNT` (5) the code immediately completes the future exceptionally 
and never retries. Also, the current `if (retryCount <= 0) 
downloadRemoteLog(request, retryCount - 1, ...)` path would recurse forever 
with negative retry counts if it were ever reached. Flip the condition to retry 
while `retryCount > 0` (decrementing each attempt), and complete exceptionally 
once retries are exhausted (without scheduling another download with a negative 
count).



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java:
##########
@@ -109,10 +113,20 @@ void tryComplete(TableBucket tableBucket) {
                     while (pendings != null && !pendings.isEmpty()) {
                         PendingFetch pendingFetch = pendings.peek();
                         if (pendingFetch.isCompleted()) {
-                            CompletedFetch completedFetch = 
pendingFetch.toCompletedFetch();
-                            completedFetches.add(completedFetch);
-                            pendings.poll();
-                            hasCompleted = true;
+                            try {
+                                CompletedFetch completedFetch = 
pendingFetch.toCompletedFetch();
+                                completedFetches.add(completedFetch);
+                                pendings.poll();
+                                hasCompleted = true;
+                            } catch (Throwable t) {
+                                LOG.error(
+                                        "Failed to complete fetch for 
tableBucket: {}",
+                                        tableBucket,
+                                        t);
+                                throwable = t;

Review Comment:
   In `tryComplete`, if `pendingFetch.toCompletedFetch()` throws, you set 
`throwable = t` and return without signaling `notEmptyCondition`. Any thread 
blocked in `awaitNotEmpty()` will continue waiting indefinitely (since `pend()` 
does not signal and no completed fetch was added). Signal the condition when 
recording `throwable` so waiters can wake up and observe the failure via 
`peek()`/`poll()`.
   ```suggestion
                                   throwable = t;
                                   notEmptyCondition.signalAll();
   ```



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java:
##########
@@ -73,17 +75,19 @@ public class LogFetchBuffer implements AutoCloseable {
     @GuardedBy("lock")
     private @Nullable CompletedFetch nextInLineFetch;
 
+    @GuardedBy("lock")
+    private @Nullable Throwable throwable;
+
     public LogFetchBuffer() {
         this.completedFetches = new LinkedList<>();
     }
 
     /**
-     * Returns {@code true} if there are no completed fetches pending to 
return to the user.
-     *
-     * @return {@code true} if the buffer is empty, {@code false} otherwise
+     * @return {@code true} if there are no completed fetches pending to 
return to the user or some
+     *     error occurs, {@code false} otherwise

Review Comment:
   The `isEmpty()` JavaDoc is now inconsistent with the implementation. The 
method returns `false` when `throwable != null` (treating an error like “not 
empty” so it can be surfaced), but the JavaDoc says it returns `true` when 
“some error occurs”. Update the JavaDoc to match the actual semantics (or 
adjust the method if the JavaDoc is intended).
   ```suggestion
        * @return {@code true} if there are no completed fetches pending to 
return to the user and no
        *     error has been recorded, {@code false} otherwise
   ```



##########
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java:
##########
@@ -282,6 +284,52 @@ void testOrderOfRemoteLogDownloadRequest() {
         assertThat(results).isEqualTo(expected);
     }
 
+    @Test
+    void testFetchException() {
+        RemoteFileDownloader remoteFileDownloader = new 
RemoteFileDownloader(1);
+        RemoteLogDownloader remoteLogDownloader =
+                new RemoteLogDownloader(
+                        DATA1_TABLE_PATH, conf, remoteFileDownloader, 
scannerMetricGroup, 10L);
+        try {
+            // trigger auto download.
+            remoteLogDownloader.start();
+
+            TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0);
+            FsPath fsPath = new FsPath("test:///empty");
+            RemoteLogSegment nonExistLogSegment =
+                    RemoteLogSegment.Builder.builder()
+                            .tableBucket(tb)
+                            .physicalTablePath(DATA1_PHYSICAL_TABLE_PATH)
+                            .remoteLogSegmentId(UUID.randomUUID())
+                            .remoteLogStartOffset(1)
+                            .remoteLogEndOffset(2)
+                            .maxTimestamp(2)
+                            .segmentSizeInBytes(Integer.MAX_VALUE)
+                            .build();
+
+            RemoteLogDownloadFuture remoteLogDownloadFuture =
+                    remoteLogDownloader.requestRemoteLog(fsPath, 
nonExistLogSegment);
+            retry(
+                    Duration.ofMinutes(1),
+                    () -> 
assertThat(remoteLogDownloadFuture.isDone()).isTrue());
+            AbstractThrowableAssert<?, ?> exactlyInstanceOf =
+                    assertThatThrownBy(() -> 
remoteLogDownloadFuture.getFileLogRecords(1))
+                            .cause()
+                            .isExactlyInstanceOf(IOException.class)
+                            .hasMessageContaining(
+                                    String.format(
+                                            "Failed to download remote log 
segment file %s, retry count 5",
+                                            
RemoteLogDownloader.getFsPathAndFileName(
+                                                            fsPath, 
nonExistLogSegment)
+                                                    .getFileName()))
+                            .rootCause();
+

Review Comment:
   `exactlyInstanceOf` is assigned but never used, which adds noise to the test 
and may trigger unused-variable warnings in some builds. Remove the local 
variable (and `AbstractThrowableAssert` import if it becomes unused).



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java:
##########
@@ -161,7 +162,7 @@ public boolean hasAvailableFetches() {
         return !logFetchBuffer.isEmpty();
     }
 
-    public Map<TableBucket, List<ScanRecord>> collectFetch() {
+    public Map<TableBucket, List<ScanRecord>> collectFetch() throws 
FetchException {
         return logFetchCollector.collectFetch(logFetchBuffer);
     }

Review Comment:
   After moving to “fail fast” semantics (completing the download future 
exceptionally), the existing remote-download completion callback in 
`pendRemoteFetches` is still registered via 
`RemoteLogDownloadFuture.onComplete(...)`, which currently only runs on 
*successful* completion (`thenRun`). This means a failed remote download may 
never trigger `logFetchBuffer.tryComplete(...)`, so the exception can be 
silently stuck in the pending queue and the scanner may hang instead of 
failing. Consider making the callback run for both success and failure (e.g., 
use a `whenComplete`-style hook) so `tryComplete` can surface the failure via 
the new exception path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to