jadewang-db commented on code in PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2105442755


##########
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs:
##########
@@ -341,6 +365,40 @@ private async Task DownloadFileAsync(IDownloadResult 
downloadResult, Cancellatio
                         HttpCompletionOption.ResponseHeadersRead,
                         cancellationToken).ConfigureAwait(false);
 
+                    // Check if the response indicates an expired URL 
(typically 403 or 401)
+                    if (response.StatusCode == 
System.Net.HttpStatusCode.Forbidden ||
+                        response.StatusCode == 
System.Net.HttpStatusCode.Unauthorized)
+                    {
+                        // If we've already tried refreshing too many times, 
fail
+                        if (downloadResult.RefreshAttempts >= 
_maxUrlRefreshAttempts)
+                        {
+                            throw new InvalidOperationException($"Failed to 
download file after {downloadResult.RefreshAttempts} URL refresh attempts.");
+                        }
+
+                        // Try to refresh the URL
+                        var refreshedLink = await 
_resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset, 
cancellationToken);
+                        if (refreshedLink != null)
+                        {
+                            // Update the download result with the refreshed 
link
+                            
downloadResult.UpdateWithRefreshedLink(refreshedLink);
+                            url = refreshedLink.FileLink;
+                            sanitizedUrl = SanitizeUrl(url);
+
+                            Trace.TraceInformation($"URL for file at offset 
{refreshedLink.StartRowOffset} was refreshed after expired URL response");
+
+                            // Also refresh other potentially expired URLs

Review Comment:
   this is not needed here.



##########
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##########
@@ -125,6 +139,111 @@ public async Task StopAsync()
             }
         }
 
+        /// <summary>
+        /// Gets a URL for the specified offset, fetching or refreshing as 
needed.
+        /// </summary>
+        /// <param name="offset">The row offset for which to get a URL.</param>
+        /// <param name="cancellationToken">The cancellation token.</param>
+        /// <returns>The URL link for the specified offset, or null if not 
available.</returns>
+        public async Task<TSparkArrowResultLink?> GetUrlAsync(long offset, 
CancellationToken cancellationToken)
+        {
+            // Need to fetch or refresh the URL
+            await _fetchLock.WaitAsync(cancellationToken);
+            try
+            {
+                // Determine if we need to fetch new URLs or refresh existing 
ones
+                if (!_urlsByOffset.ContainsKey(offset) && _hasMoreResults)
+                {
+                    // This is a new offset we haven't seen before - fetch new 
URLs
+                    var links = await FetchUrlBatchAsync(offset, 100, 
cancellationToken);
+                    return links.FirstOrDefault(l => l.StartRowOffset == 
offset);
+                }
+                else
+                {
+                    // We have the URL but it's expired - refresh it
+                    return await RefreshUrlAsync(offset, cancellationToken);
+                }
+            }
+            finally
+            {
+                _fetchLock.Release();
+            }
+        }
+
+        /// <summary>
+        /// Checks if any URLs are expired or about to expire.
+        /// </summary>
+        /// <returns>True if any URLs are expired or about to expire, false 
otherwise.</returns>
+        public bool HasExpiredOrExpiringSoonUrls()
+        {
+            return _urlsByOffset.Values.Any(IsUrlExpiredOrExpiringSoon);
+        }
+
+        /// <summary>
+        /// Proactively refreshes URLs that are expired or about to expire.
+        /// </summary>
+        /// <param name="cancellationToken">The cancellation token.</param>
+        public async Task RefreshExpiredUrlsAsync(CancellationToken 
cancellationToken)

Review Comment:
   this can be removed



##########
csharp/src/Drivers/Databricks/CloudFetch/ICloudFetchInterfaces.cs:
##########
@@ -142,6 +160,26 @@ internal interface ICloudFetchResultFetcher
         /// Gets the error encountered by the fetcher, if any.
         /// </summary>
         Exception? Error { get; }
+
+        /// <summary>
+        /// Gets a URL for the specified offset, fetching or refreshing as 
needed.
+        /// </summary>
+        /// <param name="offset">The row offset for which to get a URL.</param>
+        /// <param name="cancellationToken">The cancellation token.</param>
+        /// <returns>The URL link for the specified offset, or null if not 
available.</returns>
+        Task<TSparkArrowResultLink?> GetUrlAsync(long offset, 
CancellationToken cancellationToken);
+
+        /// <summary>

Review Comment:
   not needed



##########
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##########
@@ -125,6 +139,111 @@ public async Task StopAsync()
             }
         }
 
+        /// <summary>
+        /// Gets a URL for the specified offset, fetching or refreshing as 
needed.
+        /// </summary>
+        /// <param name="offset">The row offset for which to get a URL.</param>
+        /// <param name="cancellationToken">The cancellation token.</param>
+        /// <returns>The URL link for the specified offset, or null if not 
available.</returns>
+        public async Task<TSparkArrowResultLink?> GetUrlAsync(long offset, 
CancellationToken cancellationToken)
+        {
+            // Need to fetch or refresh the URL
+            await _fetchLock.WaitAsync(cancellationToken);
+            try
+            {
+                // Determine if we need to fetch new URLs or refresh existing 
ones
+                if (!_urlsByOffset.ContainsKey(offset) && _hasMoreResults)

Review Comment:
   let's remove this



##########
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##########
@@ -125,6 +139,111 @@ public async Task StopAsync()
             }
         }
 
+        /// <summary>
+        /// Gets a URL for the specified offset, fetching or refreshing as 
needed.
+        /// </summary>
+        /// <param name="offset">The row offset for which to get a URL.</param>
+        /// <param name="cancellationToken">The cancellation token.</param>
+        /// <returns>The URL link for the specified offset, or null if not 
available.</returns>
+        public async Task<TSparkArrowResultLink?> GetUrlAsync(long offset, 
CancellationToken cancellationToken)
+        {
+            // Need to fetch or refresh the URL
+            await _fetchLock.WaitAsync(cancellationToken);
+            try
+            {
+                // Determine if we need to fetch new URLs or refresh existing 
ones
+                if (!_urlsByOffset.ContainsKey(offset) && _hasMoreResults)
+                {
+                    // This is a new offset we haven't seen before - fetch new 
URLs
+                    var links = await FetchUrlBatchAsync(offset, 100, 
cancellationToken);
+                    return links.FirstOrDefault(l => l.StartRowOffset == 
offset);
+                }
+                else
+                {
+                    // We have the URL but it's expired - refresh it
+                    return await RefreshUrlAsync(offset, cancellationToken);

Review Comment:
   let's reuse FetchResultsAsync method and add offset to the parameter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to