This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a60a1393 feat(csharp/src/Drivers/Databricks): Support server side 
property passthrough (#2692)
6a60a1393 is described below

commit 6a60a1393104066f13274bdad41452b89d6f4db9
Author: Alex Guo <[email protected]>
AuthorDate: Fri Apr 18 12:42:10 2025 -0700

    feat(csharp/src/Drivers/Databricks): Support server side property 
passthrough (#2692)
    
    We want to be able to pass server side properties through the driver,
    e.g. use_cached_result.
    In ODBC driver, the behavior is like this:
    - If ApplySSPWithQueries = 1, set server side properties using "set x=y"
    commands during open session
    - If ApplySSPWithQueries = 0, set server side properties using thrift
    configuration
    
    This PR adds support in the ADBC driver for this
    
    Tested E2E by setting
    ```
    { "adbc.databricks.apply_ssp_with_queries", "false" },
    { "adbc.databricks.SSP_use_cached_result", "false"},
    ```
    which disabled result cache for the executed query.
    
    When using `{ "adbc.databricks.apply_ssp_with_queries", "true" }`, I see
    a set query in the query history
    <img width="620" alt="image"
    
src="https://github.com/user-attachments/assets/cabb2708-04d7-40e7-a120-cd956c527fb8";
    />
---
 .../src/Drivers/Databricks/DatabricksConnection.cs | 95 ++++++++++++++++++++++
 .../src/Drivers/Databricks/DatabricksDatabase.cs   |  1 +
 .../src/Drivers/Databricks/DatabricksParameters.cs | 15 ++++
 .../Databricks/ServerSidePropertyE2ETest.cs        | 85 +++++++++++++++++++
 4 files changed, 196 insertions(+)

diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs 
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index a6ab5c6ff..a12f9bde9 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -17,6 +17,8 @@
 
 using System;
 using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 using Apache.Arrow.Adbc.Drivers.Apache;
@@ -29,10 +31,22 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
 {
     internal class DatabricksConnection : SparkHttpConnection
     {
+        private bool _applySSPWithQueries = false;
+
         public DatabricksConnection(IReadOnlyDictionary<string, string> 
properties) : base(properties)
         {
+            if 
(Properties.TryGetValue(DatabricksParameters.ApplySSPWithQueries, out string? 
applySSPWithQueriesStr) &&
+                bool.TryParse(applySSPWithQueriesStr, out bool 
applySSPWithQueriesValue))
+            {
+                _applySSPWithQueries = applySSPWithQueriesValue;
+            }
         }
 
+        /// <summary>
+        /// Gets whether server side properties should be applied using 
queries.
+        /// </summary>
+        internal bool ApplySSPWithQueries => _applySSPWithQueries;
+
         internal override IArrowArrayStream NewReader<T>(T statement, Schema 
schema, TGetResultSetMetadataResp? metadataResp = null)
         {
             // Get result format from metadata response if available
@@ -86,9 +100,90 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                 Client_protocol_i64 = 
(long)TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
                 CanUseMultipleCatalogs = true,
             };
+
+            // If not using queries to set server-side properties, include 
them in Configuration
+            if (!_applySSPWithQueries)
+            {
+                req.Configuration = new Dictionary<string, string>();
+                var serverSideProperties = GetServerSideProperties();
+                foreach (var property in serverSideProperties)
+                {
+                    req.Configuration[property.Key] = property.Value;
+                }
+            }
             return req;
         }
 
+        /// <summary>
+        /// Gets a dictionary of server-side properties extracted from 
connection properties.
+        /// </summary>
+        /// <returns>Dictionary of server-side properties with prefix removed 
from keys.</returns>
+        private Dictionary<string, string> GetServerSideProperties()
+        {
+            return Properties
+                .Where(p => 
p.Key.StartsWith(DatabricksParameters.ServerSidePropertyPrefix))
+                .ToDictionary(
+                    p => 
p.Key.Substring(DatabricksParameters.ServerSidePropertyPrefix.Length),
+                    p => p.Value
+                );
+        }
+
+        /// <summary>
+        /// Applies server-side properties by executing "set key=value" 
queries.
+        /// </summary>
+        /// <returns>A task representing the asynchronous operation.</returns>
+        public async Task ApplyServerSidePropertiesAsync()
+        {
+            if (!_applySSPWithQueries)
+            {
+                return;
+            }
+
+            var serverSideProperties = GetServerSideProperties();
+
+            if (serverSideProperties.Count == 0)
+            {
+                return;
+            }
+
+            using var statement = new DatabricksStatement(this);
+
+            foreach (var property in serverSideProperties)
+            {
+                if (!IsValidPropertyName(property.Key))
+                {
+                    Debug.WriteLine($"Skipping invalid property name: 
{property.Key}");
+                    continue;
+                }
+
+                string escapedValue = EscapeSqlString(property.Value);
+                string query = $"SET {property.Key}={escapedValue}";
+                statement.SqlQuery = query;
+
+                try
+                {
+                    await statement.ExecuteUpdateAsync();
+                }
+                catch (Exception ex)
+                {
+                    Debug.WriteLine($"Error setting server-side property 
'{property.Key}': {ex.Message}");
+                }
+            }
+        }
+
+        private bool IsValidPropertyName(string propertyName)
+        {
+            // Allow only letters and underscores in property names
+            return System.Text.RegularExpressions.Regex.IsMatch(
+                propertyName,
+                @"^[a-zA-Z_]+$");
+        }
+
+        private string EscapeSqlString(string value)
+        {
+            return "`" + value.Replace("`", "``") + "`";
+        }
+
         protected override Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetSchemasResp response, CancellationToken 
cancellationToken = default) =>
             Task.FromResult(response.DirectResults.ResultSetMetadata);
         protected override Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetCatalogsResp response, CancellationToken 
cancellationToken = default) =>
diff --git a/csharp/src/Drivers/Databricks/DatabricksDatabase.cs 
b/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
index 53a8027e0..2150b8938 100644
--- a/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksDatabase.cs
@@ -41,6 +41,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                     .ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
             DatabricksConnection connection = new 
DatabricksConnection(mergedProperties);
             connection.OpenAsync().Wait();
+            connection.ApplyServerSidePropertiesAsync().Wait();
             return connection;
         }
     }
diff --git a/csharp/src/Drivers/Databricks/DatabricksParameters.cs 
b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
index 833dc3e7d..9d06a9479 100644
--- a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
@@ -42,6 +42,21 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         /// Default value is 5 minutes if not specified.
         /// </summary>
         public const string CloudFetchTimeoutMinutes = 
"adbc.databricks.cloudfetch.timeout_minutes";
+
+        /// <summary>
+        /// Whether to apply service side properties (SSP) with queries. If 
false, SSP will be applied
+        /// by setting the Thrift configuration when the session is opened.
+        /// Default value is false if not specified.
+        /// </summary>
+        public const string ApplySSPWithQueries = 
"adbc.databricks.apply_ssp_with_queries";
+
+        /// <summary>
+        /// Prefix for server-side properties. Properties with this prefix 
will be passed to the server
+        /// by executing a "set key=value" query when opening a session.
+        /// For example, a property with key 
"adbc.databricks.SSP_use_cached_result"
+        /// and value "true" will result in executing "set 
use_cached_result=true" on the server.
+        /// </summary>
+        public const string ServerSidePropertyPrefix = "adbc.databricks.SSP_";
     }
 
     /// <summary>
diff --git a/csharp/test/Drivers/Databricks/ServerSidePropertyE2ETest.cs 
b/csharp/test/Drivers/Databricks/ServerSidePropertyE2ETest.cs
new file mode 100644
index 000000000..c5f722ccb
--- /dev/null
+++ b/csharp/test/Drivers/Databricks/ServerSidePropertyE2ETest.cs
@@ -0,0 +1,85 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Databricks;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
+{
+    /// <summary>
+    /// End-to-end tests for the server-side property passthrough feature in 
the Databricks ADBC driver.
+    /// </summary>
+    public class ServerSidePropertyE2ETest : 
TestBase<DatabricksTestConfiguration, DatabricksTestEnvironment>
+    {
+        public ServerSidePropertyE2ETest(ITestOutputHelper? outputHelper)
+            : base(outputHelper, new DatabricksTestEnvironment.Factory())
+        {
+            // Skip the test if the DATABRICKS_TEST_CONFIG_FILE environment 
variable is not set
+            Skip.IfNot(Utils.CanExecuteTestConfig(TestConfigVariable));
+        }
+
+        /// <summary>
+        /// Tests setting server-side properties.
+        /// </summary>
+        [Theory]
+        [InlineData(true)]
+        [InlineData(false)]
+        public async Task TestServerSideProperty(bool applyWithQueries)
+        {
+            var additionalConnectionParams = new Dictionary<string, string>()
+            {
+                [DatabricksParameters.ServerSidePropertyPrefix + 
"use_cached_result"] = "false",
+                [DatabricksParameters.ServerSidePropertyPrefix + 
"statement_timeout"] = "12345",
+                [DatabricksParameters.ApplySSPWithQueries] = 
applyWithQueries.ToString().ToLower()
+            };
+            using var connection = NewConnection(TestConfiguration, 
additionalConnectionParams);
+
+            // Verify the server-side property was set by querying it
+            using var statement = connection.CreateStatement();
+            statement.SqlQuery = "SET";
+
+            var result = await statement.ExecuteQueryAsync();
+            Assert.NotNull(result.Stream);
+
+            var batch = await result.Stream.ReadNextRecordBatchAsync();
+            Assert.NotNull(batch);
+            Assert.True(batch.Length > 0);
+            Assert.Equal(2, batch.ColumnCount);
+
+            var returnedProperties = new Dictionary<string, string>();
+            var keys = (StringArray)batch.Column(0);
+            var values = (StringArray)batch.Column(1);
+            for (int i = 0; i < batch.Length; i++)
+            {
+                string key = keys.GetString(i);
+                string value = values.GetString(i);
+                returnedProperties[key] = value;
+                Console.WriteLine($"Property: {key} = {value}");
+            }
+
+            Assert.True(returnedProperties.ContainsKey("use_cached_result"));
+            Assert.Equal("false", returnedProperties["use_cached_result"]);
+
+            Assert.True(returnedProperties.ContainsKey("statement_timeout"));
+            Assert.Equal("12345", returnedProperties["statement_timeout"]);
+        }
+    }
+}

Reply via email to