Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]

2025-05-28 Thread via GitHub


CurtHagenlocher merged PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]

2025-05-28 Thread via GitHub


CurtHagenlocher commented on code in PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112768120


##
csharp/src/Drivers/Databricks/CloudFetch/Clock.cs:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
+{
+/// 
+/// Abstraction for time operations to enable testing with controlled time.
+/// 
+public interface IClock

Review Comment:
   I think these should probably all be `internal`, relying on 
`InternalsVisibleToAttribute` to be able to use them from tests. Or is there a 
use case for exposing them publicly?



##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##
@@ -84,9 +96,11 @@ public async Task StartAsync(CancellationToken 
cancellationToken)
 
 // Reset state

Review Comment:
   To this point, it doesn't look like the value of `_lastFetchedOffset` is 
only written, not read.



##
csharp/src/Drivers/Databricks/DatabricksConnection.cs:
##
@@ -32,6 +32,8 @@
 using Apache.Arrow.Ipc;
 using Apache.Hive.Service.Rpc.Thrift;
 using Thrift.Protocol;
+using Thrift.Transport;
+using Thrift.Transport.Client;

Review Comment:
   Are these needed? There are no other changes in this file.



##
csharp/src/Drivers/Databricks/CloudFetch/Clock.cs:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
+{
+/// 
+/// Abstraction for time operations to enable testing with controlled time.
+/// 
+public interface IClock
+{
+/// 
+/// Gets the current UTC time.
+/// 
+DateTime UtcNow { get; }
+}
+
+/// 
+/// Default implementation that uses system time.
+/// 
+internal class SystemClock : IClock
+{
+public DateTime UtcNow => DateTime.UtcNow;
+}
+
+/// 
+/// Test implementation that allows controlling time for testing scenarios.
+/// 
+public class ControllableClock : IClock

Review Comment:
   Can this be moved to test code? It also doesn't seem like it's currently 
being used, as the test code is creating a `MockClock` instead of a 
`ControllableClock`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]

2025-05-28 Thread via GitHub


toddmeng-db commented on code in PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112523371


##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##
@@ -84,9 +96,11 @@ public async Task StartAsync(CancellationToken 
cancellationToken)
 
 // Reset state

Review Comment:
   Is _lastFetchedOffset and _startOffset serving the same purpose?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]

2025-05-28 Thread via GitHub


toddmeng-db commented on code in PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112516206


##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##
@@ -182,15 +265,16 @@ private async Task FetchResultsAsync(CancellationToken 
cancellationToken)
 }
 }

Review Comment:
   is `FetchNextResultBatchAsync` with the offset parameter used anywhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]

2025-05-28 Thread via GitHub


toddmeng-db commented on code in PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112516206


##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##
@@ -182,15 +265,16 @@ private async Task FetchResultsAsync(CancellationToken 
cancellationToken)
 }
 }

Review Comment:
   is the offset parameter used anywhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]

2025-05-28 Thread via GitHub


toddmeng-db commented on code in PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2112498580


##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs:
##
@@ -341,6 +365,37 @@ private async Task DownloadFileAsync(IDownloadResult 
downloadResult, Cancellatio
 HttpCompletionOption.ResponseHeadersRead,
 cancellationToken).ConfigureAwait(false);
 
+// Check if the response indicates an expired URL 
(typically 403 or 401)

Review Comment:
   Would it make sense to make this a generic if-fail then retry attempt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]

2025-05-23 Thread via GitHub


jadewang-db commented on code in PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2105442755


##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs:
##
@@ -341,6 +365,40 @@ private async Task DownloadFileAsync(IDownloadResult 
downloadResult, Cancellatio
 HttpCompletionOption.ResponseHeadersRead,
 cancellationToken).ConfigureAwait(false);
 
+// Check if the response indicates an expired URL 
(typically 403 or 401)
+if (response.StatusCode == 
System.Net.HttpStatusCode.Forbidden ||
+response.StatusCode == 
System.Net.HttpStatusCode.Unauthorized)
+{
+// If we've already tried refreshing too many times, 
fail
+if (downloadResult.RefreshAttempts >= 
_maxUrlRefreshAttempts)
+{
+throw new InvalidOperationException($"Failed to 
download file after {downloadResult.RefreshAttempts} URL refresh attempts.");
+}
+
+// Try to refresh the URL
+var refreshedLink = await 
_resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset, 
cancellationToken);
+if (refreshedLink != null)
+{
+// Update the download result with the refreshed 
link
+
downloadResult.UpdateWithRefreshedLink(refreshedLink);
+url = refreshedLink.FileLink;
+sanitizedUrl = SanitizeUrl(url);
+
+Trace.TraceInformation($"URL for file at offset 
{refreshedLink.StartRowOffset} was refreshed after expired URL response");
+
+// Also refresh other potentially expired URLs

Review Comment:
   this is not needed here.



##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##
@@ -125,6 +139,111 @@ public async Task StopAsync()
 }
 }
 
+/// 
+/// Gets a URL for the specified offset, fetching or refreshing as 
needed.
+/// 
+/// The row offset for which to get a URL.
+/// The cancellation token.
+/// The URL link for the specified offset, or null if not 
available.
+public async Task GetUrlAsync(long offset, 
CancellationToken cancellationToken)
+{
+// Need to fetch or refresh the URL
+await _fetchLock.WaitAsync(cancellationToken);
+try
+{
+// Determine if we need to fetch new URLs or refresh existing 
ones
+if (!_urlsByOffset.ContainsKey(offset) && _hasMoreResults)
+{
+// This is a new offset we haven't seen before - fetch new 
URLs
+var links = await FetchUrlBatchAsync(offset, 100, 
cancellationToken);
+return links.FirstOrDefault(l => l.StartRowOffset == 
offset);
+}
+else
+{
+// We have the URL but it's expired - refresh it
+return await RefreshUrlAsync(offset, cancellationToken);
+}
+}
+finally
+{
+_fetchLock.Release();
+}
+}
+
+/// 
+/// Checks if any URLs are expired or about to expire.
+/// 
+/// True if any URLs are expired or about to expire, false 
otherwise.
+public bool HasExpiredOrExpiringSoonUrls()
+{
+return _urlsByOffset.Values.Any(IsUrlExpiredOrExpiringSoon);
+}
+
+/// 
+/// Proactively refreshes URLs that are expired or about to expire.
+/// 
+/// The cancellation token.
+public async Task RefreshExpiredUrlsAsync(CancellationToken 
cancellationToken)

Review Comment:
   this can be removed



##
csharp/src/Drivers/Databricks/CloudFetch/ICloudFetchInterfaces.cs:
##
@@ -142,6 +160,26 @@ internal interface ICloudFetchResultFetcher
 /// Gets the error encountered by the fetcher, if any.
 /// 
 Exception? Error { get; }
+
+/// 
+/// Gets a URL for the specified offset, fetching or refreshing as 
needed.
+/// 
+/// The row offset for which to get a URL.
+/// The cancellation token.
+/// The URL link for the specified offset, or null if not 
available.
+Task GetUrlAsync(long offset, 
CancellationToken cancellationToken);
+
+/// 

Review Comment:
   not needed



##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchResultFetcher.cs:
##
@@ -125,6 +139,111 @@ public async Task StopAsync()
 }
 }
 
+/// 
+/// Gets a URL for the specified offset, fetching or refreshing as 
needed.
+

Re: [PR] feat(csharp/src/Drivers/Databricks): Implement CloudFetchUrlManager to handle presigned URL expiration in CloudFetch [arrow-adbc]

2025-05-23 Thread via GitHub


jadewang-db commented on code in PR #2855:
URL: https://github.com/apache/arrow-adbc/pull/2855#discussion_r2105146012


##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs:
##
@@ -237,6 +248,25 @@ private async Task DownloadFilesAsync(CancellationToken 
cancellationToken)
 break;
 }
 
+// Check if there are any expired URLs that need refreshing

Review Comment:
   do we really need this?



##
csharp/src/Drivers/Databricks/DatabricksConnection.cs:
##
@@ -468,6 +471,27 @@ protected internal override Task 
GetRowSetAsync(TGetPrimaryKeysResp res
 return base.GetAuthenticationHeaderValue(authType);
 }
 
+/// 
+/// Gets a fresh Thrift client for fetching results.
+/// This helps avoid "stream already consumed" errors when a Thrift 
transport has already been used.
+/// 
+/// A fresh Thrift client instance.
+internal TCLIService.IAsync GetFreshClient()

Review Comment:
   this should be removed



##
csharp/src/Drivers/Databricks/CloudFetch/CloudFetchUrlManager.cs:
##
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
+{
+/// 
+/// Abstraction for time operations to enable testing with controlled time.
+/// 
+public interface IClock
+{
+/// 
+/// Gets the current UTC time.
+/// 
+DateTime UtcNow { get; }
+}
+
+/// 
+/// Default implementation that uses system time.
+/// 
+internal class SystemClock : IClock
+{
+public DateTime UtcNow => DateTime.UtcNow;
+}
+
+/// 
+/// Test implementation that allows controlling time for testing scenarios.
+/// 
+public class ControllableClock : IClock
+{
+private DateTime _currentTime;
+
+public ControllableClock(DateTime? initialTime = null)
+{
+_currentTime = initialTime ?? DateTime.UtcNow;
+}
+
+public DateTime UtcNow => _currentTime;
+
+/// 
+/// Advances the clock by the specified time span.
+/// 
+/// The amount of time to advance.
+public void AdvanceTime(TimeSpan timeSpan)
+{
+_currentTime = _currentTime.Add(timeSpan);
+}
+
+/// 
+/// Sets the clock to a specific time.
+/// 
+/// The time to set.
+public void SetTime(DateTime time)
+{
+_currentTime = time;
+}
+
+/// 
+/// Resets the clock to the current system time.
+/// 
+public void Reset()
+{
+_currentTime = DateTime.UtcNow;
+}
+}
+
+/// 
+/// Manages CloudFetch URLs, handling both initial fetching and refreshing 
of expired URLs.
+/// 
+internal class CloudFetchUrlManager
+{
+private readonly IHiveServer2Statement _statement;
+private readonly SemaphoreSlim _fetchLock = new SemaphoreSlim(1, 1);
+private readonly ConcurrentDictionary 
_urlsByOffset = new ConcurrentDictionary();
+private readonly int _expirationBufferSeconds;
+private readonly IClock _clock;
+private long _lastFetchedOffset = 0;
+private bool _hasMoreResults = true;
+
+/// 
+/// Initializes a new instance of the  class.
+/// 
+/// The HiveServer2 statement to use for 
fetching URLs.
+/// Buffer time in seconds 
before URL expiration to trigger refresh.
+/// Clock implementation for time operations. If 
null, uses system clock.
+public CloudFetchUrlManager(IHiveServer2Statement statement, int 
expirationBufferSeconds = 60, IClock? clock = null)
+{
+_statement = statement ?? throw new 
ArgumentNullException(nameof(statement));
+_expirationBufferSeconds = expirationBufferSeconds;
+