jadewang-db commented on code in PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2105442755
########## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs: ########## @@ -341,6 +365,40 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); + // Check if the response indicates an expired URL (typically 403 or 401) + if (response.StatusCode == System.Net.HttpStatusCode.Forbidden || + response.StatusCode == System.Net.HttpStatusCode.Unauthorized) + { + // If we've already tried refreshing too many times, fail + if (downloadResult.RefreshAttempts >= _maxUrlRefreshAttempts) + { + throw new InvalidOperationException($"Failed to download file after {downloadResult.RefreshAttempts} URL refresh attempts."); + } + + // Try to refresh the URL + var refreshedLink = await _resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset, cancellationToken); + if (refreshedLink != null) + { + // Update the download result with the refreshed link + downloadResult.UpdateWithRefreshedLink(refreshedLink); + url = refreshedLink.FileLink; + sanitizedUrl = SanitizeUrl(url); + + Trace.TraceInformation($"URL for file at offset {refreshedLink.StartRowOffset} was refreshed after expired URL response"); + + // Also refresh other potentially expired URLs Review Comment: this is not needed here. ########## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs: ########## @@ -125,6 +139,111 @@ public async Task StopAsync() } } + /// <summary> + /// Gets a URL for the specified offset, fetching or refreshing as needed. + /// </summary> + /// <param name="offset">The row offset for which to get a URL.</param> + /// <param name="cancellationToken">The cancellation token.</param> + /// <returns>The URL link for the specified offset, or null if not available.</returns> + public async Task<TSparkArrowResultLink?> GetUrlAsync(long offset, CancellationToken cancellationToken) + { + // Need to fetch or refresh the URL + await _fetchLock.WaitAsync(cancellationToken); + try + { + // Determine if we need to fetch new URLs or refresh existing ones + if (!_urlsByOffset.ContainsKey(offset) && _hasMoreResults) + { + // This is a new offset we haven't seen before - fetch new URLs + var links = await FetchUrlBatchAsync(offset, 100, cancellationToken); + return links.FirstOrDefault(l => l.StartRowOffset == offset); + } + else + { + // We have the URL but it's expired - refresh it + return await RefreshUrlAsync(offset, cancellationToken); + } + } + finally + { + _fetchLock.Release(); + } + } + + /// <summary> + /// Checks if any URLs are expired or about to expire. + /// </summary> + /// <returns>True if any URLs are expired or about to expire, false otherwise.</returns> + public bool HasExpiredOrExpiringSoonUrls() + { + return _urlsByOffset.Values.Any(IsUrlExpiredOrExpiringSoon); + } + + /// <summary> + /// Proactively refreshes URLs that are expired or about to expire. + /// </summary> + /// <param name="cancellationToken">The cancellation token.</param> + public async Task RefreshExpiredUrlsAsync(CancellationToken cancellationToken) Review Comment: this can be removed ########## csharp/src/Drivers/Databricks/CloudFetch/ICloudFetchInterfaces.cs: ########## @@ -142,6 +160,26 @@ internal interface ICloudFetchResultFetcher /// Gets the error encountered by the fetcher, if any. /// </summary> Exception? Error { get; } + + /// <summary> + /// Gets a URL for the specified offset, fetching or refreshing as needed. + /// </summary> + /// <param name="offset">The row offset for which to get a URL.</param> + /// <param name="cancellationToken">The cancellation token.</param> + /// <returns>The URL link for the specified offset, or null if not available.</returns> + Task<TSparkArrowResultLink?> GetUrlAsync(long offset, CancellationToken cancellationToken); + + /// <summary> Review Comment: not needed ########## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs: ########## @@ -125,6 +139,111 @@ public async Task StopAsync() } } + /// <summary> + /// Gets a URL for the specified offset, fetching or refreshing as needed. + /// </summary> + /// <param name="offset">The row offset for which to get a URL.</param> + /// <param name="cancellationToken">The cancellation token.</param> + /// <returns>The URL link for the specified offset, or null if not available.</returns> + public async Task<TSparkArrowResultLink?> GetUrlAsync(long offset, CancellationToken cancellationToken) + { + // Need to fetch or refresh the URL + await _fetchLock.WaitAsync(cancellationToken); + try + { + // Determine if we need to fetch new URLs or refresh existing ones + if (!_urlsByOffset.ContainsKey(offset) && _hasMoreResults) Review Comment: let's remove this ########## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs: ########## @@ -125,6 +139,111 @@ public async Task StopAsync() } } + /// <summary> + /// Gets a URL for the specified offset, fetching or refreshing as needed. + /// </summary> + /// <param name="offset">The row offset for which to get a URL.</param> + /// <param name="cancellationToken">The cancellation token.</param> + /// <returns>The URL link for the specified offset, or null if not available.</returns> + public async Task<TSparkArrowResultLink?> GetUrlAsync(long offset, CancellationToken cancellationToken) + { + // Need to fetch or refresh the URL + await _fetchLock.WaitAsync(cancellationToken); + try + { + // Determine if we need to fetch new URLs or refresh existing ones + if (!_urlsByOffset.ContainsKey(offset) && _hasMoreResults) + { + // This is a new offset we haven't seen before - fetch new URLs + var links = await FetchUrlBatchAsync(offset, 100, cancellationToken); + return links.FirstOrDefault(l => l.StartRowOffset == offset); + } + else + { + // We have the URL but it's expired - refresh it + return await RefreshUrlAsync(offset, cancellationToken); Review Comment: let's reuse FetchResultsAsync method and add offset to the parameter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org