This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 2dfd11069 feat(csharp/src/Drivers/Databricks): Added support for 
user-configurable Fetch heartbeat interval param (#3472)
2dfd11069 is described below

commit 2dfd110699ee0506f14dc8d820dc1a1f4e0df1ad
Author: msrathore-db <[email protected]>
AuthorDate: Thu Sep 25 17:58:29 2025 +0530

    feat(csharp/src/Drivers/Databricks): Added support for user-configurable 
Fetch heartbeat interval param (#3472)
    
    ## Description
    Added support for the Fetch heartbeat interval connection param. The
    default values are kept same as before. Users can now configure this
    param.
    
    [PECO-2736](https://databricks.atlassian.net/browse/PECO-2736)
---
 .../src/Drivers/Databricks/DatabricksConnection.cs | 50 ++++++++++++++++++++++
 .../src/Drivers/Databricks/DatabricksParameters.cs | 20 ++++++++-
 .../Databricks/Reader/DatabricksCompositeReader.cs | 38 +++++++++++++++-
 .../Databricks/E2E/DatabricksConnectionTest.cs     | 10 +++++
 .../Databricks/E2E/DatabricksTestConfiguration.cs  |  6 +++
 .../Databricks/E2E/DatabricksTestEnvironment.cs    |  8 ++++
 .../Unit/DatabricksOperationStatusPollerTests.cs   | 42 ++++++++++++++++++
 7 files changed, 171 insertions(+), 3 deletions(-)

diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs 
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index 07f31ceed..a9f4cd0f4 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -83,6 +83,12 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         // Identity federation client ID for token exchange
         private string? _identityFederationClientId;
 
+        // Heartbeat interval configuration
+        private int _fetchHeartbeatIntervalSeconds = 
DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds;
+
+        // Request timeout configuration
+        private int _operationStatusRequestTimeoutSeconds = 
DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds;
+
         // Default namespace
         private TNamespace? _defaultNamespace;
 
@@ -386,6 +392,40 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
             {
                 _identityFederationClientId = identityFederationClientId;
             }
+
+            if 
(Properties.TryGetValue(DatabricksParameters.FetchHeartbeatInterval, out 
string? fetchHeartbeatIntervalStr))
+            {
+                if (!int.TryParse(fetchHeartbeatIntervalStr, out int 
fetchHeartbeatIntervalValue))
+                {
+                    throw new ArgumentException($"Parameter 
'{DatabricksParameters.FetchHeartbeatInterval}' value 
'{fetchHeartbeatIntervalStr}' could not be parsed. Valid values are positive 
integers.");
+                }
+
+                if (fetchHeartbeatIntervalValue <= 0)
+                {
+                    throw new ArgumentOutOfRangeException(
+                        nameof(Properties),
+                        fetchHeartbeatIntervalValue,
+                        $"Parameter 
'{DatabricksParameters.FetchHeartbeatInterval}' value must be a positive 
integer.");
+                }
+                _fetchHeartbeatIntervalSeconds = fetchHeartbeatIntervalValue;
+            }
+
+            if 
(Properties.TryGetValue(DatabricksParameters.OperationStatusRequestTimeout, out 
string? operationStatusRequestTimeoutStr))
+            {
+                if (!int.TryParse(operationStatusRequestTimeoutStr, out int 
operationStatusRequestTimeoutValue))
+                {
+                    throw new ArgumentException($"Parameter 
'{DatabricksParameters.OperationStatusRequestTimeout}' value 
'{operationStatusRequestTimeoutStr}' could not be parsed. Valid values are 
positive integers.");
+                }
+
+                if (operationStatusRequestTimeoutValue <= 0)
+                {
+                    throw new ArgumentOutOfRangeException(
+                        nameof(Properties),
+                        operationStatusRequestTimeoutValue,
+                        $"Parameter 
'{DatabricksParameters.OperationStatusRequestTimeout}' value must be a positive 
integer.");
+                }
+                _operationStatusRequestTimeoutSeconds = 
operationStatusRequestTimeoutValue;
+            }
         }
 
         /// <summary>
@@ -428,6 +468,16 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         /// </summary>
         internal TNamespace? DefaultNamespace => _defaultNamespace;
 
+        /// <summary>
+        /// Gets the heartbeat interval in seconds for long-running operations.
+        /// </summary>
+        internal int FetchHeartbeatIntervalSeconds => 
_fetchHeartbeatIntervalSeconds;
+
+        /// <summary>
+        /// Gets the request timeout in seconds for operation status polling 
requests.
+        /// </summary>
+        internal int OperationStatusRequestTimeoutSeconds => 
_operationStatusRequestTimeoutSeconds;
+
         /// <summary>
         /// Gets whether multiple catalog is supported
         /// </summary>
diff --git a/csharp/src/Drivers/Databricks/DatabricksParameters.cs 
b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
index f27993b37..f79b0002a 100644
--- a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
@@ -228,6 +228,22 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         /// Default value is false if not specified.
         /// </summary>
         public const string DriverConfigTakePrecedence = 
"adbc.databricks.driver_config_take_precedence";
+
+        /// <summary>
+        /// The interval in seconds for heartbeat polling during long-running 
operations.
+        /// This prevents queries from timing out by periodically checking 
operation status.
+        /// Default value is 60 seconds if not specified.
+        /// Must be a positive integer value.
+        /// </summary>
+        public const string FetchHeartbeatInterval = 
"adbc.databricks.fetch_heartbeat_interval";
+
+        /// <summary>
+        /// The timeout in seconds for operation status polling requests.
+        /// This controls how long to wait for each individual polling request 
to complete.
+        /// Default value is 30 seconds if not specified.
+        /// Must be a positive integer value.
+        /// </summary>
+        public const string OperationStatusRequestTimeout = 
"adbc.databricks.operation_status_request_timeout";
     }
 
     /// <summary>
@@ -236,12 +252,12 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
     public class DatabricksConstants
     {
         /// <summary>
-        /// Default heartbeat interval in seconds for long-running operations. 
TODO: make this user-configurable
+        /// Default heartbeat interval in seconds for long-running operations.
         /// </summary>
         public const int DefaultOperationStatusPollingIntervalSeconds = 60;
 
         /// <summary>
-        /// Default timeout in seconds for operation status polling requests. 
TODO: make this user-configurable
+        /// Default timeout in seconds for operation status polling requests.
         /// </summary>
         public const int DefaultOperationStatusRequestTimeoutSeconds = 30;
 
diff --git a/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs 
b/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
index 4b7c1dbeb..eb2c07294 100644
--- a/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
@@ -80,7 +80,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
             }
             if (_response.DirectResults?.ResultSet?.HasMoreRows ?? true)
             {
-                operationStatusPoller = operationPoller ?? new 
DatabricksOperationStatusPoller(_statement, response);
+                operationStatusPoller = operationPoller ?? new 
DatabricksOperationStatusPoller(_statement, response, 
GetHeartbeatIntervalFromConnection(), GetRequestTimeoutFromConnection());
                 operationStatusPoller.Start();
             }
         }
@@ -197,5 +197,41 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
             operationStatusPoller?.Dispose();
             operationStatusPoller = null;
         }
+
+        /// <summary>
+        /// Gets the heartbeat interval from the statement's connection.
+        /// </summary>
+        /// <returns>The heartbeat interval in seconds, or default if not 
available.</returns>
+        private int GetHeartbeatIntervalFromConnection()
+        {
+            if (_statement is DatabricksStatement databricksStatement)
+            {
+                var connection = databricksStatement.Connection;
+                if (connection is DatabricksConnection databricksConnection)
+                {
+                    return databricksConnection.FetchHeartbeatIntervalSeconds;
+                }
+            }
+
+            return 
DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds;
+        }
+
+        /// <summary>
+        /// Gets the request timeout from the statement's connection.
+        /// </summary>
+        /// <returns>The request timeout in seconds, or default if not 
available.</returns>
+        private int GetRequestTimeoutFromConnection()
+        {
+            if (_statement is DatabricksStatement databricksStatement)
+            {
+                var connection = databricksStatement.Connection;
+                if (connection is DatabricksConnection databricksConnection)
+                {
+                    return 
databricksConnection.OperationStatusRequestTimeoutSeconds;
+                }
+            }
+
+            return 
DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds;
+        }
     }
 }
diff --git a/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs 
b/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
index b564ad381..3d41c2c6a 100644
--- a/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
+++ b/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
@@ -325,6 +325,16 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
                 Add(new(new() { [SparkParameters.Type] = 
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", 
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", 
[DatabricksParameters.TracePropagationEnabled] = "notabool" }, 
typeof(ArgumentException)));
                 Add(new(new() { [SparkParameters.Type] = 
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", 
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", 
[DatabricksParameters.TraceParentHeaderName] = "" }, 
typeof(ArgumentException)));
                 Add(new(new() { [SparkParameters.Type] = 
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", 
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", 
[DatabricksParameters.TraceStateEnabled] = "notabool" }, 
typeof(ArgumentException)));
+
+                // Tests for fetch heartbeat interval parameter
+                Add(new(new() { [SparkParameters.Type] = 
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", 
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", 
[DatabricksParameters.FetchHeartbeatInterval] = "notanumber" }, 
typeof(ArgumentException)));
+                Add(new(new() { [SparkParameters.Type] = 
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", 
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", 
[DatabricksParameters.FetchHeartbeatInterval] = "0" }, 
typeof(ArgumentOutOfRangeException)));
+                Add(new(new() { [SparkParameters.Type] = 
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", 
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", 
[DatabricksParameters.FetchHeartbeatInterval] = "-1" }, 
typeof(ArgumentOutOfRangeException)));
+
+                // Tests for operation status request timeout parameter
+                Add(new(new() { [SparkParameters.Type] = 
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", 
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", 
[DatabricksParameters.OperationStatusRequestTimeout] = "notanumber" }, 
typeof(ArgumentException)));
+                Add(new(new() { [SparkParameters.Type] = 
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", 
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", 
[DatabricksParameters.OperationStatusRequestTimeout] = "0" }, 
typeof(ArgumentOutOfRangeException)));
+                Add(new(new() { [SparkParameters.Type] = 
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", 
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", 
[DatabricksParameters.OperationStatusRequestTimeout] = "-1" }, 
typeof(ArgumentOutOfRangeException)));
             }
         }
 
diff --git a/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs 
b/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
index ab2ae5a42..baf323484 100644
--- a/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
+++ b/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
@@ -55,6 +55,12 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
         [JsonPropertyName("enableRunAsyncInThriftOp"), JsonIgnore(Condition = 
JsonIgnoreCondition.WhenWritingDefault)]
         public string EnableRunAsyncInThriftOp { get; set; } = string.Empty;
 
+        [JsonPropertyName("fetchHeartbeatInterval"), JsonIgnore(Condition = 
JsonIgnoreCondition.WhenWritingDefault)]
+        public string FetchHeartbeatInterval { get; set; } = string.Empty;
+
+        [JsonPropertyName("operationStatusRequestTimeout"), 
JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+        public string OperationStatusRequestTimeout { get; set; } = 
string.Empty;
+
         [JsonPropertyName("enableDirectResults"), JsonIgnore(Condition = 
JsonIgnoreCondition.WhenWritingDefault)]
         public string EnableDirectResults { get; set; } = string.Empty;
     }
diff --git a/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs 
b/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
index d93d7823a..a553856c5 100644
--- a/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
+++ b/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
@@ -141,6 +141,14 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
             {
                 parameters.Add(ApacheParameters.QueryTimeoutSeconds, 
testConfiguration.QueryTimeoutSeconds!);
             }
+            if 
(!string.IsNullOrEmpty(testConfiguration.FetchHeartbeatInterval))
+            {
+                parameters.Add(DatabricksParameters.FetchHeartbeatInterval, 
testConfiguration.FetchHeartbeatInterval!);
+            }
+            if 
(!string.IsNullOrEmpty(testConfiguration.OperationStatusRequestTimeout))
+            {
+                
parameters.Add(DatabricksParameters.OperationStatusRequestTimeout, 
testConfiguration.OperationStatusRequestTimeout!);
+            }
             if 
(!string.IsNullOrEmpty(testConfiguration.EnableMultipleCatalogSupport))
             {
                 
parameters.Add(DatabricksParameters.EnableMultipleCatalogSupport, 
testConfiguration.EnableMultipleCatalogSupport!);
diff --git 
a/csharp/test/Drivers/Databricks/Unit/DatabricksOperationStatusPollerTests.cs 
b/csharp/test/Drivers/Databricks/Unit/DatabricksOperationStatusPollerTests.cs
index 02eb2ad35..8650ea23a 100644
--- 
a/csharp/test/Drivers/Databricks/Unit/DatabricksOperationStatusPollerTests.cs
+++ 
b/csharp/test/Drivers/Databricks/Unit/DatabricksOperationStatusPollerTests.cs
@@ -190,5 +190,47 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit
             {
             }
         }
+
+        [Fact]
+        public async Task UsesCustomHeartbeatInterval()
+        {
+            // Arrange
+            int customHeartbeatInterval = 2; // 2 seconds
+            using var poller = new 
DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object, 
customHeartbeatInterval);
+            var pollCount = 0;
+            _mockClient.Setup(c => 
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(), 
It.IsAny<CancellationToken>()))
+                .ReturnsAsync(new TGetOperationStatusResp())
+                .Callback(() => pollCount++);
+
+            // Act
+            poller.Start();
+            await Task.Delay(TimeSpan.FromSeconds(customHeartbeatInterval * 
3)); // Wait for 3 intervals
+
+            // Assert
+            Assert.True(pollCount > 0, "Should have polled at least once");
+            _mockClient.Verify(c => 
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(), 
It.IsAny<CancellationToken>()), Times.AtLeastOnce);
+        }
+
+        [Fact]
+        public async Task UsesCustomRequestTimeout()
+        {
+            // Arrange
+            int customRequestTimeout = 5;
+            using var poller = new 
DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object, 
_heartbeatIntervalSeconds, customRequestTimeout);
+            var pollCount = 0;
+
+            _mockClient.Setup(c => 
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(), 
It.IsAny<CancellationToken>()))
+                .ReturnsAsync(new TGetOperationStatusResp())
+                .Callback(() => pollCount++);
+
+            // Act
+            poller.Start();
+            await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds * 
2));
+
+            // Assert: This test verifies that the poller can be instantiated 
with a custom request timeout
+
+            Assert.True(pollCount > 0, "Should have polled at least once");
+            _mockClient.Verify(c => 
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(), 
It.IsAny<CancellationToken>()), Times.AtLeastOnce);
+        }
     }
 }

Reply via email to