This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 332e145d9 feat(csharp): fix powerbi hang when reading cloudfetch
result in Databricks driver (#2747)
332e145d9 is described below
commit 332e145d9aaf63b18a94f0c6d24a83ff4bf873b8
Author: Jade Wang <[email protected]>
AuthorDate: Tue Apr 29 05:08:15 2025 -0700
feat(csharp): fix powerbi hang when reading cloudfetch result in Databricks
driver (#2747)
### Summary
This PR fixes an issue where PowerBI would hang when reading CloudFetch
results and significantly improves the logging capabilities in the
CloudFetch downloader component.
### Problem
1. The CloudFetchReader was not properly disposing of the download
manager after completing downloads, causing resource leaks that led to
PowerBI hanging.
2. The CloudFetchDownloader was using Debug.WriteLine for logging, which
is inadequate for production scenarios and doesn't provide sufficient
diagnostic information.
### Solution
- Fixed resource management in CloudFetchReader by properly disposing
the download manager after all files are processed
- Replaced Debug.WriteLine calls with more comprehensive Trace logging
- Added detailed performance metrics and diagnostics:
- Download start/completion timestamps
- File sizes and throughput calculations
- Decompression metrics
- Overall download statistics (total files, success/failure counts)
- Added URL sanitization for secure logging
- Added proper error tracking and reporting
### Testing
- Enhanced CloudFetchE2ETest to verify that the reader properly
completes after all data is consumed
- Verified that PowerBI no longer hangs when reading CloudFetch results
---
.../Databricks/CloudFetch/CloudFetchDownloader.cs | 75 ++++++++++++++++++++--
.../Databricks/CloudFetch/CloudFetchReader.cs | 4 +-
.../test/Drivers/Databricks/CloudFetchE2ETest.cs | 2 +
3 files changed, 74 insertions(+), 7 deletions(-)
diff --git a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs
b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs
index e3cec4a2f..8aadf58f2 100644
--- a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs
+++ b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs
@@ -55,6 +55,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
/// <param name="httpClient">The HTTP client to use for
downloads.</param>
/// <param name="maxParallelDownloads">The maximum number of parallel
downloads.</param>
/// <param name="isLz4Compressed">Whether the results are LZ4
compressed.</param>
+ /// <param name="logger">The logger instance.</param>
/// <param name="maxRetries">The maximum number of retry
attempts.</param>
/// <param name="retryDelayMs">The delay between retry attempts in
milliseconds.</param>
public CloudFetchDownloader(
@@ -184,6 +185,12 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
{
await Task.Yield();
+ int totalFiles = 0;
+ int successfulDownloads = 0;
+ int failedDownloads = 0;
+ long totalBytes = 0;
+ var overallStopwatch = Stopwatch.StartNew();
+
try
{
// Keep track of active download tasks
@@ -193,6 +200,8 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
// Process items from the download queue until it's completed
foreach (var downloadResult in
_downloadQueue.GetConsumingEnumerable(cancellationToken))
{
+ totalFiles++;
+
// Check if there's an error before processing more
downloads
if (HasError)
{
@@ -213,7 +222,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
}
catch (Exception ex)
{
- Debug.WriteLine($"Error waiting for downloads
to complete: {ex.Message}");
+ Trace.TraceWarning($"Error waiting for
downloads to complete: {ex.Message}");
// Don't set error here, as individual
download tasks will handle their own errors
}
}
@@ -245,10 +254,11 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
if (t.IsFaulted)
{
Exception ex = t.Exception?.InnerException ??
new Exception("Unknown error");
- Debug.WriteLine($"Download failed:
{ex.Message}");
+ Trace.TraceError($"Download failed for file
{SanitizeUrl(downloadResult.Link.FileLink)}: {ex.Message}");
// Set the download as failed
downloadResult.SetFailed(ex);
+ failedDownloads++;
// Set the error state to stop the download
process
SetError(ex);
@@ -256,6 +266,11 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
// Signal that we should stop processing
downloads
downloadTaskCompletionSource.TrySetException(ex);
}
+ else if (!t.IsFaulted && !t.IsCanceled)
+ {
+ successfulDownloads++;
+ totalBytes += downloadResult.Size;
+ }
}, cancellationToken);
// Add the task to the dictionary
@@ -274,14 +289,21 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
catch (OperationCanceledException) when
(cancellationToken.IsCancellationRequested)
{
// Expected when cancellation is requested
+ Trace.TraceInformation("Download process was cancelled");
}
catch (Exception ex)
{
- Debug.WriteLine($"Error in download loop: {ex.Message}");
+ Trace.TraceError($"Error in download loop: {ex.Message}");
SetError(ex);
}
finally
{
+ overallStopwatch.Stop();
+
+ Trace.TraceInformation(
+ $"Download process completed. Total files: {totalFiles},
Successful: {successfulDownloads}, " +
+ $"Failed: {failedDownloads}, Total size: {totalBytes /
1024.0 / 1024.0:F2} MB, Total time: {overallStopwatch.ElapsedMilliseconds /
1000.0:F2} sec");
+
// If there's an error, add the error to the result queue
if (HasError)
{
@@ -293,11 +315,18 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
private async Task DownloadFileAsync(IDownloadResult downloadResult,
CancellationToken cancellationToken)
{
string url = downloadResult.Link.FileLink;
+ string sanitizedUrl = SanitizeUrl(downloadResult.Link.FileLink);
byte[]? fileData = null;
// Use the size directly from the download result
long size = downloadResult.Size;
+ // Create a stopwatch to track download time
+ var stopwatch = Stopwatch.StartNew();
+
+ // Log download start
+ Trace.TraceInformation($"Starting download of file {sanitizedUrl},
expected size: {size / 1024.0:F2} KB");
+
// Acquire memory before downloading
await _memoryManager.AcquireMemoryAsync(size,
cancellationToken).ConfigureAwait(false);
@@ -318,7 +347,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
long? contentLength =
response.Content.Headers.ContentLength;
if (contentLength.HasValue && contentLength.Value > 0)
{
- Debug.WriteLine($"Downloading file of size:
{contentLength.Value / 1024.0 / 1024.0:F2} MB");
+ Trace.TraceInformation($"Actual file size for
{sanitizedUrl}: {contentLength.Value / 1024.0 / 1024.0:F2} MB");
}
// Read the file data
@@ -328,13 +357,17 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
catch (Exception ex) when (retry < _maxRetries - 1 &&
!cancellationToken.IsCancellationRequested)
{
// Log the error and retry
- Debug.WriteLine($"Error downloading file (attempt {retry +
1}/{_maxRetries}): {ex.Message}");
+ Trace.TraceError($"Error downloading file
{SanitizeUrl(url)} (attempt {retry + 1}/{_maxRetries}): {ex.Message}");
+
await Task.Delay(_retryDelayMs * (retry + 1),
cancellationToken).ConfigureAwait(false);
}
}
if (fileData == null)
{
+ stopwatch.Stop();
+ Trace.TraceError($"Failed to download file {sanitizedUrl}
after {_maxRetries} attempts. Elapsed time: {stopwatch.ElapsedMilliseconds}
ms");
+
// Release the memory we acquired
_memoryManager.ReleaseMemory(size);
throw new InvalidOperationException($"Failed to download file
from {url} after {_maxRetries} attempts.");
@@ -342,12 +375,14 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
// Process the downloaded file data
MemoryStream dataStream;
+ long actualSize = fileData.Length;
// If the data is LZ4 compressed, decompress it
if (_isLz4Compressed)
{
try
{
+ var decompressStopwatch = Stopwatch.StartNew();
dataStream = new MemoryStream();
using (var inputStream = new MemoryStream(fileData))
using (var decompressor = LZ4Stream.Decode(inputStream))
@@ -355,9 +390,17 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
await decompressor.CopyToAsync(dataStream, 81920,
cancellationToken).ConfigureAwait(false);
}
dataStream.Position = 0;
+ decompressStopwatch.Stop();
+
+ Trace.TraceInformation($"Decompressed file {sanitizedUrl}
in {decompressStopwatch.ElapsedMilliseconds} ms. Compressed size: {actualSize /
1024.0:F2} KB, Decompressed size: {dataStream.Length / 1024.0:F2} KB");
+
+ actualSize = dataStream.Length;
}
catch (Exception ex)
{
+ stopwatch.Stop();
+ Trace.TraceError($"Error decompressing data for file
{sanitizedUrl}: {ex.Message}. Elapsed time: {stopwatch.ElapsedMilliseconds}
ms");
+
// Release the memory we acquired
_memoryManager.ReleaseMemory(size);
throw new InvalidOperationException($"Error decompressing
data: {ex.Message}", ex);
@@ -368,6 +411,10 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
dataStream = new MemoryStream(fileData);
}
+ // Stop the stopwatch and log download completion
+ stopwatch.Stop();
+ Trace.TraceInformation($"Completed download of file
{sanitizedUrl}. Size: {actualSize / 1024.0:F2} KB, Latency:
{stopwatch.ElapsedMilliseconds} ms, Throughput: {(actualSize / 1024.0 / 1024.0)
/ (stopwatch.ElapsedMilliseconds / 1000.0):F2} MB/s");
+
// Set the download as completed with the original size
downloadResult.SetCompleted(dataStream, size);
}
@@ -378,6 +425,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
{
if (_error == null)
{
+ Trace.TraceError($"Setting error state: {ex.Message}");
_error = ex;
}
}
@@ -395,7 +443,22 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
}
catch (Exception ex)
{
- Debug.WriteLine($"Error completing with error: {ex.Message}");
+ Trace.TraceError($"Error completing with error: {ex.Message}");
+ }
+ }
+
+ // Helper method to sanitize URLs for logging (to avoid exposing
sensitive information)
+ private string SanitizeUrl(string url)
+ {
+ try
+ {
+ var uri = new Uri(url);
+ return
$"{uri.Scheme}://{uri.Host}/{Path.GetFileName(uri.LocalPath)}";
+ }
+ catch
+ {
+ // If URL parsing fails, return a generic identifier
+ return "cloud-storage-url";
}
}
}
diff --git a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
index 1e3861833..6a8d3d24b 100644
--- a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
+++ b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs
@@ -36,7 +36,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
{
private readonly Schema schema;
private readonly bool isLz4Compressed;
- private readonly ICloudFetchDownloadManager downloadManager;
+ private ICloudFetchDownloadManager? downloadManager;
private ArrowStreamReader? currentReader;
private IDownloadResult? currentDownloadResult;
private bool isPrefetchEnabled;
@@ -136,6 +136,8 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.CloudFetch
this.currentDownloadResult = await
this.downloadManager.GetNextDownloadedFileAsync(cancellationToken);
if (this.currentDownloadResult == null)
{
+ this.downloadManager.Dispose();
+ this.downloadManager = null;
// No more files
return null;
}
diff --git a/csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs
b/csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs
index 96b040274..a96b88cbf 100644
--- a/csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs
+++ b/csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs
@@ -90,6 +90,8 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
Assert.True(totalRows >= rowCount);
+ Assert.Null(await result.Stream.ReadNextRecordBatchAsync());
+
// Also log to the test output helper if available
OutputHelper?.WriteLine($"Read {totalRows} rows from range
function");
}