This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 6a60a1393 feat(csharp/src/Drivers/Databricks): Support server side
property passthrough (#2692)
6a60a1393 is described below
commit 6a60a1393104066f13274bdad41452b89d6f4db9
Author: Alex Guo <[email protected]>
AuthorDate: Fri Apr 18 12:42:10 2025 -0700
feat(csharp/src/Drivers/Databricks): Support server side property
passthrough (#2692)
We want to be able to pass server side properties through the driver,
e.g. use_cached_result.
In ODBC driver, the behavior is like this:
- If ApplySSPWithQueries = 1, set server side properties using "set x=y"
commands during open session
- If ApplySSPWithQueries = 0, set server side properties using thrift
configuration
This PR adds support in the ADBC driver for this
Tested E2E by setting
```
{ "adbc.databricks.apply_ssp_with_queries", "false" },
{ "adbc.databricks.SSP_use_cached_result", "false"},
```
which disabled result cache for the executed query.
When using `{ "adbc.databricks.apply_ssp_with_queries", "true" }`, I see
a set query in the query history
<img width="620" alt="image"
src="https://github.com/user-attachments/assets/cabb2708-04d7-40e7-a120-cd956c527fb8"
/>
---
.../src/Drivers/Databricks/DatabricksConnection.cs | 95 ++++++++++++++++++++++
.../src/Drivers/Databricks/DatabricksDatabase.cs | 1 +
.../src/Drivers/Databricks/DatabricksParameters.cs | 15 ++++
.../Databricks/ServerSidePropertyE2ETest.cs | 85 +++++++++++++++++++
4 files changed, 196 insertions(+)
diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index a6ab5c6ff..a12f9bde9 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -17,6 +17,8 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache;
@@ -29,10 +31,22 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
{
internal class DatabricksConnection : SparkHttpConnection
{
+ private bool _applySSPWithQueries = false;
+
public DatabricksConnection(IReadOnlyDictionary<string, string>
properties) : base(properties)
{
+ if
(Properties.TryGetValue(DatabricksParameters.ApplySSPWithQueries, out string?
applySSPWithQueriesStr) &&
+ bool.TryParse(applySSPWithQueriesStr, out bool
applySSPWithQueriesValue))
+ {
+ _applySSPWithQueries = applySSPWithQueriesValue;
+ }
}
+ /// <summary>
+ /// Gets whether server side properties should be applied using
queries.
+ /// </summary>
+ internal bool ApplySSPWithQueries => _applySSPWithQueries;
+
internal override IArrowArrayStream NewReader<T>(T statement, Schema
schema, TGetResultSetMetadataResp? metadataResp = null)
{
// Get result format from metadata response if available
@@ -86,9 +100,90 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
Client_protocol_i64 =
(long)TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
CanUseMultipleCatalogs = true,
};
+
+ // If not using queries to set server-side properties, include
them in Configuration
+ if (!_applySSPWithQueries)
+ {
+ req.Configuration = new Dictionary<string, string>();
+ var serverSideProperties = GetServerSideProperties();
+ foreach (var property in serverSideProperties)
+ {
+ req.Configuration[property.Key] = property.Value;
+ }
+ }
return req;
}
+ /// <summary>
+ /// Gets a dictionary of server-side properties extracted from
connection properties.
+ /// </summary>
+ /// <returns>Dictionary of server-side properties with prefix removed
from keys.</returns>
+ private Dictionary<string, string> GetServerSideProperties()
+ {
+ return Properties
+ .Where(p =>
p.Key.StartsWith(DatabricksParameters.ServerSidePropertyPrefix))
+ .ToDictionary(
+ p =>
p.Key.Substring(DatabricksParameters.ServerSidePropertyPrefix.Length),
+ p => p.Value
+ );
+ }
+
+ /// <summary>
+ /// Applies server-side properties by executing "set key=value"
queries.
+ /// </summary>
+ /// <returns>A task representing the asynchronous operation.</returns>
+ public async Task ApplyServerSidePropertiesAsync()
+ {
+ if (!_applySSPWithQueries)
+ {
+ return;
+ }
+
+ var serverSideProperties = GetServerSideProperties();
+
+ if (serverSideProperties.Count == 0)
+ {
+ return;
+ }
+
+ using var statement = new DatabricksStatement(this);
+
+ foreach (var property in serverSideProperties)
+ {
+ if (!IsValidPropertyName(property.Key))
+ {
+ Debug.WriteLine($"Skipping invalid property name:
{property.Key}");
+ continue;
+ }
+
+ string escapedValue = EscapeSqlString(property.Value);
+ string query = $"SET {property.Key}={escapedValue}";
+ statement.SqlQuery = query;
+
+ try
+ {
+ await statement.ExecuteUpdateAsync();
+ }
+ catch (Exception ex)
+ {
+ Debug.WriteLine($"Error setting server-side property
'{property.Key}': {ex.Message}");
+ }
+ }
+ }
+
+ private bool IsValidPropertyName(string propertyName)
+ {
+ // Allow only letters and underscores in property names
+ return System.Text.RegularExpressions.Regex.IsMatch(
+ propertyName,
+ @"^[a-zA-Z_]+$");
+ }
+
+ private string EscapeSqlString(string value)
+ {
+ return "`" + value.Replace("`", "``") + "`";
+ }
+
protected override Task<TGetResultSetMetadataResp>
GetResultSetMetadataAsync(TGetSchemasResp response, CancellationToken
cancellationToken = default) =>
Task.FromResult(response.DirectResults.ResultSetMetadata);
protected override Task<TGetResultSetMetadataResp>
GetResultSetMetadataAsync(TGetCatalogsResp response, CancellationToken
cancellationToken = default) =>
diff --git a/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
b/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
index 53a8027e0..2150b8938 100644
--- a/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
@@ -41,6 +41,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
DatabricksConnection connection = new
DatabricksConnection(mergedProperties);
connection.OpenAsync().Wait();
+ connection.ApplyServerSidePropertiesAsync().Wait();
return connection;
}
}
diff --git a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
index 833dc3e7d..9d06a9479 100644
--- a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
@@ -42,6 +42,21 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
/// Default value is 5 minutes if not specified.
/// </summary>
public const string CloudFetchTimeoutMinutes =
"adbc.databricks.cloudfetch.timeout_minutes";
+
+ /// <summary>
+ /// Whether to apply service side properties (SSP) with queries. If
false, SSP will be applied
+ /// by setting the Thrift configuration when the session is opened.
+ /// Default value is false if not specified.
+ /// </summary>
+ public const string ApplySSPWithQueries =
"adbc.databricks.apply_ssp_with_queries";
+
+ /// <summary>
+ /// Prefix for server-side properties. Properties with this prefix
will be passed to the server
+ /// by executing a "set key=value" query when opening a session.
+ /// For example, a property with key
"adbc.databricks.SSP_use_cached_result"
+ /// and value "true" will result in executing "set
use_cached_result=true" on the server.
+ /// </summary>
+ public const string ServerSidePropertyPrefix = "adbc.databricks.SSP_";
}
/// <summary>
diff --git a/csharp/test/Drivers/Databricks/ServerSidePropertyE2ETest.cs
b/csharp/test/Drivers/Databricks/ServerSidePropertyE2ETest.cs
new file mode 100644
index 000000000..c5f722ccb
--- /dev/null
+++ b/csharp/test/Drivers/Databricks/ServerSidePropertyE2ETest.cs
@@ -0,0 +1,85 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Databricks;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
+{
+ /// <summary>
+ /// End-to-end tests for the server-side property passthrough feature in
the Databricks ADBC driver.
+ /// </summary>
+ public class ServerSidePropertyE2ETest :
TestBase<DatabricksTestConfiguration, DatabricksTestEnvironment>
+ {
+ public ServerSidePropertyE2ETest(ITestOutputHelper? outputHelper)
+ : base(outputHelper, new DatabricksTestEnvironment.Factory())
+ {
+ // Skip the test if the DATABRICKS_TEST_CONFIG_FILE environment
variable is not set
+ Skip.IfNot(Utils.CanExecuteTestConfig(TestConfigVariable));
+ }
+
+ /// <summary>
+ /// Tests setting server-side properties.
+ /// </summary>
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task TestServerSideProperty(bool applyWithQueries)
+ {
+ var additionalConnectionParams = new Dictionary<string, string>()
+ {
+ [DatabricksParameters.ServerSidePropertyPrefix +
"use_cached_result"] = "false",
+ [DatabricksParameters.ServerSidePropertyPrefix +
"statement_timeout"] = "12345",
+ [DatabricksParameters.ApplySSPWithQueries] =
applyWithQueries.ToString().ToLower()
+ };
+ using var connection = NewConnection(TestConfiguration,
additionalConnectionParams);
+
+ // Verify the server-side property was set by querying it
+ using var statement = connection.CreateStatement();
+ statement.SqlQuery = "SET";
+
+ var result = await statement.ExecuteQueryAsync();
+ Assert.NotNull(result.Stream);
+
+ var batch = await result.Stream.ReadNextRecordBatchAsync();
+ Assert.NotNull(batch);
+ Assert.True(batch.Length > 0);
+ Assert.Equal(2, batch.ColumnCount);
+
+ var returnedProperties = new Dictionary<string, string>();
+ var keys = (StringArray)batch.Column(0);
+ var values = (StringArray)batch.Column(1);
+ for (int i = 0; i < batch.Length; i++)
+ {
+ string key = keys.GetString(i);
+ string value = values.GetString(i);
+ returnedProperties[key] = value;
+ Console.WriteLine($"Property: {key} = {value}");
+ }
+
+ Assert.True(returnedProperties.ContainsKey("use_cached_result"));
+ Assert.Equal("false", returnedProperties["use_cached_result"]);
+
+ Assert.True(returnedProperties.ContainsKey("statement_timeout"));
+ Assert.Equal("12345", returnedProperties["statement_timeout"]);
+ }
+ }
+}