This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 2dfd11069 feat(csharp/src/Drivers/Databricks): Added support for
user-configurable Fetch heartbeat interval param (#3472)
2dfd11069 is described below
commit 2dfd110699ee0506f14dc8d820dc1a1f4e0df1ad
Author: msrathore-db <[email protected]>
AuthorDate: Thu Sep 25 17:58:29 2025 +0530
feat(csharp/src/Drivers/Databricks): Added support for user-configurable
Fetch heartbeat interval param (#3472)
## Description
Added support for the Fetch heartbeat interval connection param. The
default values are kept same as before. Users can now configure this
param.
[PECO-2736](https://databricks.atlassian.net/browse/PECO-2736)
---
.../src/Drivers/Databricks/DatabricksConnection.cs | 50 ++++++++++++++++++++++
.../src/Drivers/Databricks/DatabricksParameters.cs | 20 ++++++++-
.../Databricks/Reader/DatabricksCompositeReader.cs | 38 +++++++++++++++-
.../Databricks/E2E/DatabricksConnectionTest.cs | 10 +++++
.../Databricks/E2E/DatabricksTestConfiguration.cs | 6 +++
.../Databricks/E2E/DatabricksTestEnvironment.cs | 8 ++++
.../Unit/DatabricksOperationStatusPollerTests.cs | 42 ++++++++++++++++++
7 files changed, 171 insertions(+), 3 deletions(-)
diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index 07f31ceed..a9f4cd0f4 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -83,6 +83,12 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
// Identity federation client ID for token exchange
private string? _identityFederationClientId;
+ // Heartbeat interval configuration
+ private int _fetchHeartbeatIntervalSeconds =
DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds;
+
+ // Request timeout configuration
+ private int _operationStatusRequestTimeoutSeconds =
DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds;
+
// Default namespace
private TNamespace? _defaultNamespace;
@@ -386,6 +392,40 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
{
_identityFederationClientId = identityFederationClientId;
}
+
+ if
(Properties.TryGetValue(DatabricksParameters.FetchHeartbeatInterval, out
string? fetchHeartbeatIntervalStr))
+ {
+ if (!int.TryParse(fetchHeartbeatIntervalStr, out int
fetchHeartbeatIntervalValue))
+ {
+ throw new ArgumentException($"Parameter
'{DatabricksParameters.FetchHeartbeatInterval}' value
'{fetchHeartbeatIntervalStr}' could not be parsed. Valid values are positive
integers.");
+ }
+
+ if (fetchHeartbeatIntervalValue <= 0)
+ {
+ throw new ArgumentOutOfRangeException(
+ nameof(Properties),
+ fetchHeartbeatIntervalValue,
+ $"Parameter
'{DatabricksParameters.FetchHeartbeatInterval}' value must be a positive
integer.");
+ }
+ _fetchHeartbeatIntervalSeconds = fetchHeartbeatIntervalValue;
+ }
+
+ if
(Properties.TryGetValue(DatabricksParameters.OperationStatusRequestTimeout, out
string? operationStatusRequestTimeoutStr))
+ {
+ if (!int.TryParse(operationStatusRequestTimeoutStr, out int
operationStatusRequestTimeoutValue))
+ {
+ throw new ArgumentException($"Parameter
'{DatabricksParameters.OperationStatusRequestTimeout}' value
'{operationStatusRequestTimeoutStr}' could not be parsed. Valid values are
positive integers.");
+ }
+
+ if (operationStatusRequestTimeoutValue <= 0)
+ {
+ throw new ArgumentOutOfRangeException(
+ nameof(Properties),
+ operationStatusRequestTimeoutValue,
+ $"Parameter
'{DatabricksParameters.OperationStatusRequestTimeout}' value must be a positive
integer.");
+ }
+ _operationStatusRequestTimeoutSeconds =
operationStatusRequestTimeoutValue;
+ }
}
/// <summary>
@@ -428,6 +468,16 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
/// </summary>
internal TNamespace? DefaultNamespace => _defaultNamespace;
+ /// <summary>
+ /// Gets the heartbeat interval in seconds for long-running operations.
+ /// </summary>
+ internal int FetchHeartbeatIntervalSeconds =>
_fetchHeartbeatIntervalSeconds;
+
+ /// <summary>
+ /// Gets the request timeout in seconds for operation status polling
requests.
+ /// </summary>
+ internal int OperationStatusRequestTimeoutSeconds =>
_operationStatusRequestTimeoutSeconds;
+
/// <summary>
/// Gets whether multiple catalog is supported
/// </summary>
diff --git a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
index f27993b37..f79b0002a 100644
--- a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
@@ -228,6 +228,22 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
/// Default value is false if not specified.
/// </summary>
public const string DriverConfigTakePrecedence =
"adbc.databricks.driver_config_take_precedence";
+
+ /// <summary>
+ /// The interval in seconds for heartbeat polling during long-running
operations.
+ /// This prevents queries from timing out by periodically checking
operation status.
+ /// Default value is 60 seconds if not specified.
+ /// Must be a positive integer value.
+ /// </summary>
+ public const string FetchHeartbeatInterval =
"adbc.databricks.fetch_heartbeat_interval";
+
+ /// <summary>
+ /// The timeout in seconds for operation status polling requests.
+ /// This controls how long to wait for each individual polling request
to complete.
+ /// Default value is 30 seconds if not specified.
+ /// Must be a positive integer value.
+ /// </summary>
+ public const string OperationStatusRequestTimeout =
"adbc.databricks.operation_status_request_timeout";
}
/// <summary>
@@ -236,12 +252,12 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
public class DatabricksConstants
{
/// <summary>
- /// Default heartbeat interval in seconds for long-running operations.
TODO: make this user-configurable
+ /// Default heartbeat interval in seconds for long-running operations.
/// </summary>
public const int DefaultOperationStatusPollingIntervalSeconds = 60;
/// <summary>
- /// Default timeout in seconds for operation status polling requests.
TODO: make this user-configurable
+ /// Default timeout in seconds for operation status polling requests.
/// </summary>
public const int DefaultOperationStatusRequestTimeoutSeconds = 30;
diff --git a/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
b/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
index 4b7c1dbeb..eb2c07294 100644
--- a/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
@@ -80,7 +80,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
}
if (_response.DirectResults?.ResultSet?.HasMoreRows ?? true)
{
- operationStatusPoller = operationPoller ?? new
DatabricksOperationStatusPoller(_statement, response);
+ operationStatusPoller = operationPoller ?? new
DatabricksOperationStatusPoller(_statement, response,
GetHeartbeatIntervalFromConnection(), GetRequestTimeoutFromConnection());
operationStatusPoller.Start();
}
}
@@ -197,5 +197,41 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
operationStatusPoller?.Dispose();
operationStatusPoller = null;
}
+
+ /// <summary>
+ /// Gets the heartbeat interval from the statement's connection.
+ /// </summary>
+ /// <returns>The heartbeat interval in seconds, or default if not
available.</returns>
+ private int GetHeartbeatIntervalFromConnection()
+ {
+ if (_statement is DatabricksStatement databricksStatement)
+ {
+ var connection = databricksStatement.Connection;
+ if (connection is DatabricksConnection databricksConnection)
+ {
+ return databricksConnection.FetchHeartbeatIntervalSeconds;
+ }
+ }
+
+ return
DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds;
+ }
+
+ /// <summary>
+ /// Gets the request timeout from the statement's connection.
+ /// </summary>
+ /// <returns>The request timeout in seconds, or default if not
available.</returns>
+ private int GetRequestTimeoutFromConnection()
+ {
+ if (_statement is DatabricksStatement databricksStatement)
+ {
+ var connection = databricksStatement.Connection;
+ if (connection is DatabricksConnection databricksConnection)
+ {
+ return
databricksConnection.OperationStatusRequestTimeoutSeconds;
+ }
+ }
+
+ return
DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds;
+ }
}
}
diff --git a/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
b/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
index b564ad381..3d41c2c6a 100644
--- a/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
+++ b/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
@@ -325,6 +325,16 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
Add(new(new() { [SparkParameters.Type] =
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com",
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword",
[DatabricksParameters.TracePropagationEnabled] = "notabool" },
typeof(ArgumentException)));
Add(new(new() { [SparkParameters.Type] =
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com",
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword",
[DatabricksParameters.TraceParentHeaderName] = "" },
typeof(ArgumentException)));
Add(new(new() { [SparkParameters.Type] =
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com",
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword",
[DatabricksParameters.TraceStateEnabled] = "notabool" },
typeof(ArgumentException)));
+
+ // Tests for fetch heartbeat interval parameter
+ Add(new(new() { [SparkParameters.Type] =
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com",
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword",
[DatabricksParameters.FetchHeartbeatInterval] = "notanumber" },
typeof(ArgumentException)));
+ Add(new(new() { [SparkParameters.Type] =
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com",
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword",
[DatabricksParameters.FetchHeartbeatInterval] = "0" },
typeof(ArgumentOutOfRangeException)));
+ Add(new(new() { [SparkParameters.Type] =
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com",
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword",
[DatabricksParameters.FetchHeartbeatInterval] = "-1" },
typeof(ArgumentOutOfRangeException)));
+
+ // Tests for operation status request timeout parameter
+ Add(new(new() { [SparkParameters.Type] =
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com",
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword",
[DatabricksParameters.OperationStatusRequestTimeout] = "notanumber" },
typeof(ArgumentException)));
+ Add(new(new() { [SparkParameters.Type] =
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com",
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword",
[DatabricksParameters.OperationStatusRequestTimeout] = "0" },
typeof(ArgumentOutOfRangeException)));
+ Add(new(new() { [SparkParameters.Type] =
SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com",
[AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword",
[DatabricksParameters.OperationStatusRequestTimeout] = "-1" },
typeof(ArgumentOutOfRangeException)));
}
}
diff --git a/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
b/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
index ab2ae5a42..baf323484 100644
--- a/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
+++ b/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
@@ -55,6 +55,12 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
[JsonPropertyName("enableRunAsyncInThriftOp"), JsonIgnore(Condition =
JsonIgnoreCondition.WhenWritingDefault)]
public string EnableRunAsyncInThriftOp { get; set; } = string.Empty;
+ [JsonPropertyName("fetchHeartbeatInterval"), JsonIgnore(Condition =
JsonIgnoreCondition.WhenWritingDefault)]
+ public string FetchHeartbeatInterval { get; set; } = string.Empty;
+
+ [JsonPropertyName("operationStatusRequestTimeout"),
JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public string OperationStatusRequestTimeout { get; set; } =
string.Empty;
+
[JsonPropertyName("enableDirectResults"), JsonIgnore(Condition =
JsonIgnoreCondition.WhenWritingDefault)]
public string EnableDirectResults { get; set; } = string.Empty;
}
diff --git a/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
b/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
index d93d7823a..a553856c5 100644
--- a/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
+++ b/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
@@ -141,6 +141,14 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
{
parameters.Add(ApacheParameters.QueryTimeoutSeconds,
testConfiguration.QueryTimeoutSeconds!);
}
+ if
(!string.IsNullOrEmpty(testConfiguration.FetchHeartbeatInterval))
+ {
+ parameters.Add(DatabricksParameters.FetchHeartbeatInterval,
testConfiguration.FetchHeartbeatInterval!);
+ }
+ if
(!string.IsNullOrEmpty(testConfiguration.OperationStatusRequestTimeout))
+ {
+
parameters.Add(DatabricksParameters.OperationStatusRequestTimeout,
testConfiguration.OperationStatusRequestTimeout!);
+ }
if
(!string.IsNullOrEmpty(testConfiguration.EnableMultipleCatalogSupport))
{
parameters.Add(DatabricksParameters.EnableMultipleCatalogSupport,
testConfiguration.EnableMultipleCatalogSupport!);
diff --git
a/csharp/test/Drivers/Databricks/Unit/DatabricksOperationStatusPollerTests.cs
b/csharp/test/Drivers/Databricks/Unit/DatabricksOperationStatusPollerTests.cs
index 02eb2ad35..8650ea23a 100644
---
a/csharp/test/Drivers/Databricks/Unit/DatabricksOperationStatusPollerTests.cs
+++
b/csharp/test/Drivers/Databricks/Unit/DatabricksOperationStatusPollerTests.cs
@@ -190,5 +190,47 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit
{
}
}
+
+ [Fact]
+ public async Task UsesCustomHeartbeatInterval()
+ {
+ // Arrange
+ int customHeartbeatInterval = 2; // 2 seconds
+ using var poller = new
DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object,
customHeartbeatInterval);
+ var pollCount = 0;
+ _mockClient.Setup(c =>
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(),
It.IsAny<CancellationToken>()))
+ .ReturnsAsync(new TGetOperationStatusResp())
+ .Callback(() => pollCount++);
+
+ // Act
+ poller.Start();
+ await Task.Delay(TimeSpan.FromSeconds(customHeartbeatInterval *
3)); // Wait for 3 intervals
+
+ // Assert
+ Assert.True(pollCount > 0, "Should have polled at least once");
+ _mockClient.Verify(c =>
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(),
It.IsAny<CancellationToken>()), Times.AtLeastOnce);
+ }
+
+ [Fact]
+ public async Task UsesCustomRequestTimeout()
+ {
+ // Arrange
+ int customRequestTimeout = 5;
+ using var poller = new
DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object,
_heartbeatIntervalSeconds, customRequestTimeout);
+ var pollCount = 0;
+
+ _mockClient.Setup(c =>
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(),
It.IsAny<CancellationToken>()))
+ .ReturnsAsync(new TGetOperationStatusResp())
+ .Callback(() => pollCount++);
+
+ // Act
+ poller.Start();
+ await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds *
2));
+
+ // Assert: This test verifies that the poller can be instantiated
with a custom request timeout
+
+ Assert.True(pollCount > 0, "Should have polled at least once");
+ _mockClient.Verify(c =>
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(),
It.IsAny<CancellationToken>()), Times.AtLeastOnce);
+ }
}
}