Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
CurtHagenlocher merged PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
CurtHagenlocher commented on code in PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112768120 ## csharp/src/Drivers/Databricks/CloudFetch/Clock.cs: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; + +namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch +{ +/// +/// Abstraction for time operations to enable testing with controlled time. +/// +public interface IClock Review Comment: I think these should probably all be `internal`, relying on `InternalsVisibleToAttribute` to be able to use them from tests. Or is there a use case for exposing them publicly? ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs: ## @@ -84,9 +96,11 @@ public async Task StartAsync(CancellationToken cancellationToken) // Reset state Review Comment: To this point, it doesn't look like the value of `_lastFetchedOffset` is only written, not read. ## csharp/src/Drivers/Databricks/DatabricksConnection.cs: ## @@ -32,6 +32,8 @@ using Apache.Arrow.Ipc; using Apache.Hive.Service.Rpc.Thrift; using Thrift.Protocol; +using Thrift.Transport; +using Thrift.Transport.Client; Review Comment: Are these needed? There are no other changes in this file. ## csharp/src/Drivers/Databricks/CloudFetch/Clock.cs: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; + +namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch +{ +/// +/// Abstraction for time operations to enable testing with controlled time. +/// +public interface IClock +{ +/// +/// Gets the current UTC time. +/// +DateTime UtcNow { get; } +} + +/// +/// Default implementation that uses system time. +/// +internal class SystemClock : IClock +{ +public DateTime UtcNow => DateTime.UtcNow; +} + +/// +/// Test implementation that allows controlling time for testing scenarios. +/// +public class ControllableClock : IClock Review Comment: Can this be moved to test code? It also doesn't seem like it's currently being used, as the test code is creating a `MockClock` instead of a `ControllableClock`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
toddmeng-db commented on code in PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112523371 ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs: ## @@ -84,9 +96,11 @@ public async Task StartAsync(CancellationToken cancellationToken) // Reset state Review Comment: Is _lastFetchedOffset and _startOffset serving the same purpose? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
toddmeng-db commented on code in PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112516206 ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs: ## @@ -182,15 +265,16 @@ private async Task FetchResultsAsync(CancellationToken cancellationToken) } } Review Comment: is `FetchNextResultBatchAsync` with the offset parameter used anywhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
toddmeng-db commented on code in PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112516206 ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs: ## @@ -182,15 +265,16 @@ private async Task FetchResultsAsync(CancellationToken cancellationToken) } } Review Comment: is the offset parameter used anywhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
toddmeng-db commented on code in PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112498580 ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs: ## @@ -341,6 +365,37 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); +// Check if the response indicates an expired URL (typically 403 or 401) Review Comment: Would it make sense to make this a generic if-fail then retry attempt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
jadewang-db commented on code in PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2105442755 ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs: ## @@ -341,6 +365,40 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); +// Check if the response indicates an expired URL (typically 403 or 401) +if (response.StatusCode == System.Net.HttpStatusCode.Forbidden || +response.StatusCode == System.Net.HttpStatusCode.Unauthorized) +{ +// If we've already tried refreshing too many times, fail +if (downloadResult.RefreshAttempts >= _maxUrlRefreshAttempts) +{ +throw new InvalidOperationException($"Failed to download file after {downloadResult.RefreshAttempts} URL refresh attempts."); +} + +// Try to refresh the URL +var refreshedLink = await _resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset, cancellationToken); +if (refreshedLink != null) +{ +// Update the download result with the refreshed link + downloadResult.UpdateWithRefreshedLink(refreshedLink); +url = refreshedLink.FileLink; +sanitizedUrl = SanitizeUrl(url); + +Trace.TraceInformation($"URL for file at offset {refreshedLink.StartRowOffset} was refreshed after expired URL response"); + +// Also refresh other potentially expired URLs Review Comment: this is not needed here. ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs: ## @@ -125,6 +139,111 @@ public async Task StopAsync() } } +/// +/// Gets a URL for the specified offset, fetching or refreshing as needed. +/// +/// The row offset for which to get a URL. +/// The cancellation token. +/// The URL link for the specified offset, or null if not available. +public async Task GetUrlAsync(long offset, CancellationToken cancellationToken) +{ +// Need to fetch or refresh the URL +await _fetchLock.WaitAsync(cancellationToken); +try +{ +// Determine if we need to fetch new URLs or refresh existing ones +if (!_urlsByOffset.ContainsKey(offset) && _hasMoreResults) +{ +// This is a new offset we haven't seen before - fetch new URLs +var links = await FetchUrlBatchAsync(offset, 100, cancellationToken); +return links.FirstOrDefault(l => l.StartRowOffset == offset); +} +else +{ +// We have the URL but it's expired - refresh it +return await RefreshUrlAsync(offset, cancellationToken); +} +} +finally +{ +_fetchLock.Release(); +} +} + +/// +/// Checks if any URLs are expired or about to expire. +/// +/// True if any URLs are expired or about to expire, false otherwise. +public bool HasExpiredOrExpiringSoonUrls() +{ +return _urlsByOffset.Values.Any(IsUrlExpiredOrExpiringSoon); +} + +/// +/// Proactively refreshes URLs that are expired or about to expire. +/// +/// The cancellation token. +public async Task RefreshExpiredUrlsAsync(CancellationToken cancellationToken) Review Comment: this can be removed ## csharp/src/Drivers/Databricks/CloudFetch/ICloudFetchInterfaces.cs: ## @@ -142,6 +160,26 @@ internal interface ICloudFetchResultFetcher /// Gets the error encountered by the fetcher, if any. /// Exception? Error { get; } + +/// +/// Gets a URL for the specified offset, fetching or refreshing as needed. +/// +/// The row offset for which to get a URL. +/// The cancellation token. +/// The URL link for the specified offset, or null if not available. +Task GetUrlAsync(long offset, CancellationToken cancellationToken); + +/// Review Comment: not needed ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs: ## @@ -125,6 +139,111 @@ public async Task StopAsync() } } +/// +/// Gets a URL for the specified offset, fetching or refreshing as needed. +
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
jadewang-db commented on code in PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2105146012 ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs: ## @@ -237,6 +248,25 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken) break; } +// Check if there are any expired URLs that need refreshing Review Comment: do we really need this? ## csharp/src/Drivers/Databricks/DatabricksConnection.cs: ## @@ -468,6 +471,27 @@ protected internal override Task GetRowSetAsync(TGetPrimaryKeysResp res return base.GetAuthenticationHeaderValue(authType); } +/// +/// Gets a fresh Thrift client for fetching results. +/// This helps avoid "stream already consumed" errors when a Thrift transport has already been used. +/// +/// A fresh Thrift client instance. +internal TCLIService.IAsync GetFreshClient() Review Comment: this should be removed ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchUrlManager.cs: ## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Apache.Hive.Service.Rpc.Thrift; + +namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch +{ +/// +/// Abstraction for time operations to enable testing with controlled time. +/// +public interface IClock +{ +/// +/// Gets the current UTC time. +/// +DateTime UtcNow { get; } +} + +/// +/// Default implementation that uses system time. +/// +internal class SystemClock : IClock +{ +public DateTime UtcNow => DateTime.UtcNow; +} + +/// +/// Test implementation that allows controlling time for testing scenarios. +/// +public class ControllableClock : IClock +{ +private DateTime _currentTime; + +public ControllableClock(DateTime? initialTime = null) +{ +_currentTime = initialTime ?? DateTime.UtcNow; +} + +public DateTime UtcNow => _currentTime; + +/// +/// Advances the clock by the specified time span. +/// +/// The amount of time to advance. +public void AdvanceTime(TimeSpan timeSpan) +{ +_currentTime = _currentTime.Add(timeSpan); +} + +/// +/// Sets the clock to a specific time. +/// +/// The time to set. +public void SetTime(DateTime time) +{ +_currentTime = time; +} + +/// +/// Resets the clock to the current system time. +/// +public void Reset() +{ +_currentTime = DateTime.UtcNow; +} +} + +/// +/// Manages CloudFetch URLs, handling both initial fetching and refreshing of expired URLs. +/// +internal class CloudFetchUrlManager +{ +private readonly IHiveServer2Statement _statement; +private readonly SemaphoreSlim _fetchLock = new SemaphoreSlim(1, 1); +private readonly ConcurrentDictionary _urlsByOffset = new ConcurrentDictionary(); +private readonly int _expirationBufferSeconds; +private readonly IClock _clock; +private long _lastFetchedOffset = 0; +private bool _hasMoreResults = true; + +/// +/// Initializes a new instance of the class. +/// +/// The HiveServer2 statement to use for fetching URLs. +/// Buffer time in seconds before URL expiration to trigger refresh. +/// Clock implementation for time operations. If null, uses system clock. +public CloudFetchUrlManager(IHiveServer2Statement statement, int expirationBufferSeconds = 60, IClock? clock = null) +{ +_statement = statement ?? throw new ArgumentNullException(nameof(statement)); +_expirationBufferSeconds = expirationBufferSeconds; +