Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
CurtHagenlocher merged PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
CurtHagenlocher commented on code in PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112768120
##
csharp/src/Drivers/Databricks/CloudFetch/Clock.cs:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
+{
+///
+/// Abstraction for time operations to enable testing with controlled time.
+///
+public interface IClock
Review Comment:
I think these should probably all be `internal`, relying on
`InternalsVisibleToAttribute` to be able to use them from tests. Or is there a
use case for exposing them publicly?
##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##
@@ -84,9 +96,11 @@ public async Task StartAsync(CancellationToken
cancellationToken)
// Reset state
Review Comment:
To this point, it doesn't look like the value of `_lastFetchedOffset` is
only written, not read.
##
csharp/src/Drivers/Databricks/DatabricksConnection.cs:
##
@@ -32,6 +32,8 @@
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift.Protocol;
+using Thrift.Transport;
+using Thrift.Transport.Client;
Review Comment:
Are these needed? There are no other changes in this file.
##
csharp/src/Drivers/Databricks/CloudFetch/Clock.cs:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
+{
+///
+/// Abstraction for time operations to enable testing with controlled time.
+///
+public interface IClock
+{
+///
+/// Gets the current UTC time.
+///
+DateTime UtcNow { get; }
+}
+
+///
+/// Default implementation that uses system time.
+///
+internal class SystemClock : IClock
+{
+public DateTime UtcNow => DateTime.UtcNow;
+}
+
+///
+/// Test implementation that allows controlling time for testing scenarios.
+///
+public class ControllableClock : IClock
Review Comment:
Can this be moved to test code? It also doesn't seem like it's currently
being used, as the test code is creating a `MockClock` instead of a
`ControllableClock`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
toddmeng-db commented on code in PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112523371 ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs: ## @@ -84,9 +96,11 @@ public async Task StartAsync(CancellationToken cancellationToken) // Reset state Review Comment: Is _lastFetchedOffset and _startOffset serving the same purpose? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
toddmeng-db commented on code in PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112516206 ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs: ## @@ -182,15 +265,16 @@ private async Task FetchResultsAsync(CancellationToken cancellationToken) } } Review Comment: is `FetchNextResultBatchAsync` with the offset parameter used anywhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
toddmeng-db commented on code in PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112516206 ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs: ## @@ -182,15 +265,16 @@ private async Task FetchResultsAsync(CancellationToken cancellationToken) } } Review Comment: is the offset parameter used anywhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
toddmeng-db commented on code in PR #2855: URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112498580 ## csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs: ## @@ -341,6 +365,37 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); +// Check if the response indicates an expired URL (typically 403 or 401) Review Comment: Would it make sense to make this a generic if-fail then retry attempt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
jadewang-db commented on code in PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2105442755
##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs:
##
@@ -341,6 +365,40 @@ private async Task DownloadFileAsync(IDownloadResult
downloadResult, Cancellatio
HttpCompletionOption.ResponseHeadersRead,
cancellationToken).ConfigureAwait(false);
+// Check if the response indicates an expired URL
(typically 403 or 401)
+if (response.StatusCode ==
System.Net.HttpStatusCode.Forbidden ||
+response.StatusCode ==
System.Net.HttpStatusCode.Unauthorized)
+{
+// If we've already tried refreshing too many times,
fail
+if (downloadResult.RefreshAttempts >=
_maxUrlRefreshAttempts)
+{
+throw new InvalidOperationException($"Failed to
download file after {downloadResult.RefreshAttempts} URL refresh attempts.");
+}
+
+// Try to refresh the URL
+var refreshedLink = await
_resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset,
cancellationToken);
+if (refreshedLink != null)
+{
+// Update the download result with the refreshed
link
+
downloadResult.UpdateWithRefreshedLink(refreshedLink);
+url = refreshedLink.FileLink;
+sanitizedUrl = SanitizeUrl(url);
+
+Trace.TraceInformation($"URL for file at offset
{refreshedLink.StartRowOffset} was refreshed after expired URL response");
+
+// Also refresh other potentially expired URLs
Review Comment:
this is not needed here.
##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##
@@ -125,6 +139,111 @@ public async Task StopAsync()
}
}
+///
+/// Gets a URL for the specified offset, fetching or refreshing as
needed.
+///
+/// The row offset for which to get a URL.
+/// The cancellation token.
+/// The URL link for the specified offset, or null if not
available.
+public async Task GetUrlAsync(long offset,
CancellationToken cancellationToken)
+{
+// Need to fetch or refresh the URL
+await _fetchLock.WaitAsync(cancellationToken);
+try
+{
+// Determine if we need to fetch new URLs or refresh existing
ones
+if (!_urlsByOffset.ContainsKey(offset) && _hasMoreResults)
+{
+// This is a new offset we haven't seen before - fetch new
URLs
+var links = await FetchUrlBatchAsync(offset, 100,
cancellationToken);
+return links.FirstOrDefault(l => l.StartRowOffset ==
offset);
+}
+else
+{
+// We have the URL but it's expired - refresh it
+return await RefreshUrlAsync(offset, cancellationToken);
+}
+}
+finally
+{
+_fetchLock.Release();
+}
+}
+
+///
+/// Checks if any URLs are expired or about to expire.
+///
+/// True if any URLs are expired or about to expire, false
otherwise.
+public bool HasExpiredOrExpiringSoonUrls()
+{
+return _urlsByOffset.Values.Any(IsUrlExpiredOrExpiringSoon);
+}
+
+///
+/// Proactively refreshes URLs that are expired or about to expire.
+///
+/// The cancellation token.
+public async Task RefreshExpiredUrlsAsync(CancellationToken
cancellationToken)
Review Comment:
this can be removed
##
csharp/src/Drivers/Databricks/CloudFetch/ICloudFetchInterfaces.cs:
##
@@ -142,6 +160,26 @@ internal interface ICloudFetchResultFetcher
/// Gets the error encountered by the fetcher, if any.
///
Exception? Error { get; }
+
+///
+/// Gets a URL for the specified offset, fetching or refreshing as
needed.
+///
+/// The row offset for which to get a URL.
+/// The cancellation token.
+/// The URL link for the specified offset, or null if not
available.
+Task GetUrlAsync(long offset,
CancellationToken cancellationToken);
+
+///
Review Comment:
not needed
##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##
@@ -125,6 +139,111 @@ public async Task StopAsync()
}
}
+///
+/// Gets a URL for the specified offset, fetching or refreshing as
needed.
+
Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]
jadewang-db commented on code in PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2105146012
##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs:
##
@@ -237,6 +248,25 @@ private async Task DownloadFilesAsync(CancellationToken
cancellationToken)
break;
}
+// Check if there are any expired URLs that need refreshing
Review Comment:
do we really need this?
##
csharp/src/Drivers/Databricks/DatabricksConnection.cs:
##
@@ -468,6 +471,27 @@ protected internal override Task
GetRowSetAsync(TGetPrimaryKeysResp res
return base.GetAuthenticationHeaderValue(authType);
}
+///
+/// Gets a fresh Thrift client for fetching results.
+/// This helps avoid "stream already consumed" errors when a Thrift
transport has already been used.
+///
+/// A fresh Thrift client instance.
+internal TCLIService.IAsync GetFreshClient()
Review Comment:
this should be removed
##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchUrlManager.cs:
##
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
+{
+///
+/// Abstraction for time operations to enable testing with controlled time.
+///
+public interface IClock
+{
+///
+/// Gets the current UTC time.
+///
+DateTime UtcNow { get; }
+}
+
+///
+/// Default implementation that uses system time.
+///
+internal class SystemClock : IClock
+{
+public DateTime UtcNow => DateTime.UtcNow;
+}
+
+///
+/// Test implementation that allows controlling time for testing scenarios.
+///
+public class ControllableClock : IClock
+{
+private DateTime _currentTime;
+
+public ControllableClock(DateTime? initialTime = null)
+{
+_currentTime = initialTime ?? DateTime.UtcNow;
+}
+
+public DateTime UtcNow => _currentTime;
+
+///
+/// Advances the clock by the specified time span.
+///
+/// The amount of time to advance.
+public void AdvanceTime(TimeSpan timeSpan)
+{
+_currentTime = _currentTime.Add(timeSpan);
+}
+
+///
+/// Sets the clock to a specific time.
+///
+/// The time to set.
+public void SetTime(DateTime time)
+{
+_currentTime = time;
+}
+
+///
+/// Resets the clock to the current system time.
+///
+public void Reset()
+{
+_currentTime = DateTime.UtcNow;
+}
+}
+
+///
+/// Manages CloudFetch URLs, handling both initial fetching and refreshing
of expired URLs.
+///
+internal class CloudFetchUrlManager
+{
+private readonly IHiveServer2Statement _statement;
+private readonly SemaphoreSlim _fetchLock = new SemaphoreSlim(1, 1);
+private readonly ConcurrentDictionary
_urlsByOffset = new ConcurrentDictionary();
+private readonly int _expirationBufferSeconds;
+private readonly IClock _clock;
+private long _lastFetchedOffset = 0;
+private bool _hasMoreResults = true;
+
+///
+/// Initializes a new instance of the class.
+///
+/// The HiveServer2 statement to use for
fetching URLs.
+/// Buffer time in seconds
before URL expiration to trigger refresh.
+/// Clock implementation for time operations. If
null, uses system clock.
+public CloudFetchUrlManager(IHiveServer2Statement statement, int
expirationBufferSeconds = 60, IClock? clock = null)
+{
+_statement = statement ?? throw new
ArgumentNullException(nameof(statement));
+_expirationBufferSeconds = expirationBufferSeconds;
+
