This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 5aa9f1ea6 feat(csharp/src/Drivers/BigQuery): Add support for AAD/Entra
authentication (#2655)
5aa9f1ea6 is described below
commit 5aa9f1ea60984c09defdc8d8746db9e2f5bd8b98
Author: davidhcoe <[email protected]>
AuthorDate: Wed Apr 23 17:21:34 2025 -0400
feat(csharp/src/Drivers/BigQuery): Add support for AAD/Entra authentication
(#2655)
- Adds support for users to login with their Entra / Azure AD account
- Adds a retry concept to the driver that will check whether a token
needs to be refreshed and then invoke a delegate so an outside caller
can perform the token update. Will only go this path if the user has
defined a handler for UpdateToken.
- Includes long running tests to demonstrate the concept:

---------
Co-authored-by: David Coe <>
---
csharp/src/Drivers/BigQuery/BigQueryConnection.cs | 293 +++++++++++++++------
csharp/src/Drivers/BigQuery/BigQueryParameters.cs | 16 +-
csharp/src/Drivers/BigQuery/BigQueryStatement.cs | 214 +++++++++++----
...ryTableTypes.cs => BigQueryStsTokenResponse.cs} | 20 +-
csharp/src/Drivers/BigQuery/BigQueryTableTypes.cs | 4 +-
csharp/src/Drivers/BigQuery/BigQueryUtils.cs | 37 +++
.../Drivers/BigQuery/ITokenProtectedResource.cs | 40 +++
csharp/src/Drivers/BigQuery/RetryManager.cs | 82 ++++++
.../Drivers/BigQuery/TokenProtectedReadClient.cs | 60 +++++
csharp/src/Drivers/BigQuery/readme.md | 38 ++-
.../MultiEnvironmentTestUtils.cs | 2 +-
...Apache.Arrow.Adbc.Tests.Drivers.BigQuery.csproj | 1 +
.../test/Drivers/BigQuery/AuthenticationTests.cs | 121 +++++++++
csharp/test/Drivers/BigQuery/BigQueryData.cs | 1 -
.../Drivers/BigQuery/BigQueryTestConfiguration.cs | 22 ++
.../test/Drivers/BigQuery/BigQueryTestingUtils.cs | 83 +++++-
csharp/test/Drivers/BigQuery/StatementTests.cs | 61 +++++
17 files changed, 944 insertions(+), 151 deletions(-)
diff --git a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
index 0e84e6959..090016ce9 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
@@ -17,12 +17,12 @@
using System;
using System.Collections.Generic;
-using System.Collections.ObjectModel;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Text.RegularExpressions;
+using System.Threading.Tasks;
using Apache.Arrow.Adbc.Extensions;
using Apache.Arrow.Ipc;
using Apache.Arrow.Types;
@@ -36,17 +36,15 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
/// <summary>
/// BigQuery-specific implementation of <see cref="AdbcConnection"/>
/// </summary>
- public class BigQueryConnection : AdbcConnection
+ public class BigQueryConnection : AdbcConnection, ITokenProtectedResource
{
- readonly IReadOnlyDictionary<string, string> properties;
- BigQueryClient? client;
- GoogleCredential? credential;
+ readonly Dictionary<string, string> properties;
+ readonly HttpClient httpClient;
bool includePublicProjectIds = false;
-
const string infoDriverName = "ADBC BigQuery Driver";
- const string infoDriverVersion = "1.0.0";
+ const string infoDriverVersion = "1.0.1";
const string infoVendorName = "BigQuery";
- const string infoDriverArrowVersion = "1.0.0";
+ const string infoDriverArrowVersion = "19.0.0";
const string publicProjectId = "bigquery-public-data";
readonly AdbcInfoCode[] infoSupportedCodes = new[] {
@@ -58,14 +56,47 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
public BigQueryConnection(IReadOnlyDictionary<string, string>
properties)
{
- this.properties = properties;
+ if (properties == null)
+ {
+ this.properties = new Dictionary<string, string>();
+ }
+ else
+ {
+ this.properties = properties.ToDictionary(k => k.Key, v =>
v.Value);
+ }
// add the default value for now and set to true until C# has a
BigDecimal
- Dictionary<string, string> modifiedProperties =
this.properties.ToDictionary(k => k.Key, v => v.Value);
- modifiedProperties[BigQueryParameters.LargeDecimalsAsString] =
BigQueryConstants.TreatLargeDecimalAsString;
- this.properties = new ReadOnlyDictionary<string,
string>(modifiedProperties);
+ this.properties[BigQueryParameters.LargeDecimalsAsString] =
BigQueryConstants.TreatLargeDecimalAsString;
+ this.httpClient = new HttpClient();
+
+ if
(this.properties.TryGetValue(BigQueryParameters.MaximumRetryAttempts, out
string? sRetryAttempts) &&
+ int.TryParse(sRetryAttempts, out int retries) &&
+ retries >= 0)
+ {
+ MaxRetryAttempts = retries;
+ }
+
+ if (this.properties.TryGetValue(BigQueryParameters.RetryDelayMs,
out string? sRetryDelay) &&
+ int.TryParse(sRetryDelay, out int delay) &&
+ delay >= 0)
+ {
+ RetryDelayMs = delay;
+ }
}
+ /// <summary>
+ /// The function to call when updating the token.
+ /// </summary>
+ public Func<Task>? UpdateToken { get; set; }
+
+ internal BigQueryClient? Client { get; private set; }
+
+ internal GoogleCredential? Credential { get; private set; }
+
+ internal int MaxRetryAttempts { get; private set; } = 5;
+
+ internal int RetryDelayMs { get; private set; } = 200;
+
/// <summary>
/// Initializes the internal BigQuery connection
/// </summary>
@@ -74,17 +105,8 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
{
string? projectId = null;
string? billingProjectId = null;
- string? clientId = null;
- string? clientSecret = null;
- string? refreshToken = null;
TimeSpan? clientTimeout = null;
- string tokenEndpoint = BigQueryConstants.TokenEndpoint;
-
- string? authenticationType =
BigQueryConstants.UserAuthenticationType;
-
- // TODO: handle token expiration
-
// if the caller doesn't specify a projectId, use the default
if (!this.properties.TryGetValue(BigQueryParameters.ProjectId, out
projectId))
projectId = BigQueryConstants.DetectProjectId;
@@ -95,18 +117,59 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
if
(this.properties.TryGetValue(BigQueryParameters.IncludePublicProjectId, out
string? result))
{
if (!string.IsNullOrEmpty(result))
- includePublicProjectIds = Convert.ToBoolean(result);
+ this.includePublicProjectIds = Convert.ToBoolean(result);
+ }
+
+ if (this.properties.TryGetValue(BigQueryParameters.ClientTimeout,
out string? timeoutSeconds) &&
+ int.TryParse(timeoutSeconds, out int seconds))
+ {
+ clientTimeout = TimeSpan.FromSeconds(seconds);
}
+ SetCredential();
+
+ BigQueryClientBuilder bigQueryClientBuilder = new
BigQueryClientBuilder()
+ {
+ ProjectId = projectId,
+ QuotaProject = billingProjectId,
+ GoogleCredential = Credential
+ };
+
+ BigQueryClient client = bigQueryClientBuilder.Build();
+
+ if (clientTimeout.HasValue)
+ {
+ client.Service.HttpClient.Timeout = clientTimeout.Value;
+ }
+
+ Client = client;
+ return client;
+ }
+
+ internal void SetCredential()
+ {
+ string? clientId = null;
+ string? clientSecret = null;
+ string? refreshToken = null;
+ string? accessToken = null;
+ string? audienceUri = null;
+ string? authenticationType = null;
+
+ string tokenEndpoint = BigQueryConstants.TokenEndpoint;
+
+ if
(!this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out
authenticationType))
+ throw new ArgumentException($"The
{BigQueryParameters.AuthenticationType} parameter is not present");
+
if
(this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out string?
newAuthenticationType))
{
if (!string.IsNullOrEmpty(newAuthenticationType))
authenticationType = newAuthenticationType;
if
(!authenticationType.Equals(BigQueryConstants.UserAuthenticationType,
StringComparison.OrdinalIgnoreCase) &&
-
!authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType,
StringComparison.OrdinalIgnoreCase))
+
!authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType,
StringComparison.OrdinalIgnoreCase) &&
+
!authenticationType.Equals(BigQueryConstants.EntraIdAuthenticationType,
StringComparison.OrdinalIgnoreCase))
{
- throw new ArgumentException($"The
{BigQueryParameters.AuthenticationType} parameter can only be
`{BigQueryConstants.UserAuthenticationType}` or
`{BigQueryConstants.ServiceAccountAuthenticationType}`");
+ throw new ArgumentException($"The
{BigQueryParameters.AuthenticationType} parameter can only be
`{BigQueryConstants.UserAuthenticationType}`,
`{BigQueryConstants.ServiceAccountAuthenticationType}` or
`{BigQueryConstants.EntraIdAuthenticationType}`");
}
}
@@ -121,40 +184,41 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
if
(!this.properties.TryGetValue(BigQueryParameters.RefreshToken, out
refreshToken))
throw new ArgumentException($"The
{BigQueryParameters.RefreshToken} parameter is not present");
- this.credential =
ApplyScopes(GoogleCredential.FromAccessToken(GetAccessToken(clientId,
clientSecret, refreshToken, tokenEndpoint)));
+ Credential =
ApplyScopes(GoogleCredential.FromAccessToken(GetAccessToken(clientId,
clientSecret, refreshToken, tokenEndpoint)));
}
- else
+ else if (!string.IsNullOrEmpty(authenticationType) &&
authenticationType.Equals(BigQueryConstants.EntraIdAuthenticationType,
StringComparison.OrdinalIgnoreCase))
+ {
+ if
(!this.properties.TryGetValue(BigQueryParameters.AccessToken, out accessToken))
+ throw new ArgumentException($"The
{BigQueryParameters.AccessToken} parameter is not present");
+
+ if
(!this.properties.TryGetValue(BigQueryParameters.AudienceUri, out audienceUri))
+ throw new ArgumentException($"The
{BigQueryParameters.AudienceUri} parameter is not present");
+
+ Credential =
ApplyScopes(GoogleCredential.FromAccessToken(TradeEntraIdTokenForBigQueryToken(audienceUri,
accessToken)));
+ }
+ else if (!string.IsNullOrEmpty(authenticationType) &&
authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType,
StringComparison.OrdinalIgnoreCase))
{
string? json = string.Empty;
if
(!this.properties.TryGetValue(BigQueryParameters.JsonCredential, out json))
throw new ArgumentException($"The
{BigQueryParameters.JsonCredential} parameter is not present");
- this.credential = ApplyScopes(GoogleCredential.FromJson(json));
+ Credential = ApplyScopes(GoogleCredential.FromJson(json));
}
-
- if (this.properties.TryGetValue(BigQueryParameters.ClientTimeout,
out string? timeoutSeconds) &&
- int.TryParse(timeoutSeconds, out int seconds))
+ else
{
- clientTimeout = TimeSpan.FromSeconds(seconds);
+ throw new ArgumentException($"{authenticationType} is not a
valid authenticationType");
}
+ }
- BigQueryClientBuilder bigQueryClientBuilder = new
BigQueryClientBuilder()
- {
- ProjectId = projectId,
- QuotaProject = billingProjectId,
- GoogleCredential = this.credential
- };
-
- BigQueryClient client = bigQueryClientBuilder.Build();
+ public override void SetOption(string key, string value)
+ {
+ this.properties[key] = value;
- if (clientTimeout.HasValue)
+ if (key.Equals(BigQueryParameters.AccessToken))
{
- client.Service.HttpClient.Timeout = clientTimeout.Value;
+ UpdateClientToken();
}
-
- this.client = client;
- return client;
}
/// <summary>
@@ -309,6 +373,22 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
return new
BigQueryInfoArrowStream(StandardSchemas.GetObjectsSchema, dataArrays);
}
+ /// <summary>
+ /// Renews the internal BigQueryClient with updated credentials.
+ /// </summary>
+ internal void UpdateClientToken()
+ {
+ // there isn't a way to set the credentials, just need to open a
new client
+ Client = Open();
+ }
+
+ /// <summary>
+ /// Determines if the token needs to be updated.
+ /// </summary>
+ public bool TokenRequiresUpdate(Exception ex) =>
BigQueryUtils.TokenRequiresUpdate(ex);
+
+ private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action)
=> await RetryManager.ExecuteWithRetriesAsync<T>(this, action,
MaxRetryAttempts, RetryDelayMs);
+
/// <summary>
/// Executes the query using the BigQueryClient.
/// </summary>
@@ -322,7 +402,11 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
/// </remarks>
private BigQueryResults? ExecuteQuery(string sql,
IEnumerable<BigQueryParameter>? parameters, QueryOptions? queryOptions = null,
GetQueryResultsOptions? resultsOptions = null)
{
- BigQueryResults? result = this.client?.ExecuteQuery(sql,
parameters, queryOptions, resultsOptions);
+ if (Client == null) { Client = Open(); }
+
+ Func<Task<BigQueryResults?>> func = () =>
Client.ExecuteQueryAsync(sql, parameters ??
Enumerable.Empty<BigQueryParameter>(), queryOptions, resultsOptions);
+ BigQueryResults? result =
ExecuteWithRetriesAsync<BigQueryResults?>(func).GetAwaiter().GetResult();
+
return result;
}
@@ -337,33 +421,42 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
StringArray.Builder catalogNameBuilder = new StringArray.Builder();
List<IArrowArray?> catalogDbSchemasValues = new
List<IArrowArray?>();
string catalogRegexp = PatternToRegEx(catalogPattern);
- PagedEnumerable<ProjectList, CloudProject>? catalogs =
this.client?.ListProjects();
+ PagedEnumerable<ProjectList, CloudProject>? catalogs;
+ List<string> projectIds = new List<string>();
+
+ Func<Task<PagedEnumerable<ProjectList, CloudProject>?>> func = ()
=> Task.Run(() =>
+ {
+ // stick with this call because PagedAsyncEnumerable has
different behaviors for selecting items
+ return Client?.ListProjects();
+ });
+
+ catalogs = ExecuteWithRetriesAsync<PagedEnumerable<ProjectList,
CloudProject>?>(func).GetAwaiter().GetResult();
if (catalogs != null)
{
- List<string> projectIds = catalogs.Select(x =>
x.ProjectId).ToList();
+ projectIds = catalogs.Select(x => x.ProjectId).ToList();
+ }
- if (this.includePublicProjectIds &&
!projectIds.Contains(publicProjectId))
- projectIds.Add(publicProjectId);
+ if (this.includePublicProjectIds &&
!projectIds.Contains(publicProjectId))
+ projectIds.Add(publicProjectId);
- projectIds.Sort();
+ projectIds.Sort();
- foreach (string projectId in projectIds)
+ foreach (string projectId in projectIds)
+ {
+ if (Regex.IsMatch(projectId, catalogRegexp,
RegexOptions.IgnoreCase))
{
- if (Regex.IsMatch(projectId, catalogRegexp,
RegexOptions.IgnoreCase))
- {
- catalogNameBuilder.Append(projectId);
+ catalogNameBuilder.Append(projectId);
- if (depth == GetObjectsDepth.Catalogs)
- {
- catalogDbSchemasValues.Add(null);
- }
- else
- {
- catalogDbSchemasValues.Add(GetDbSchemas(
- depth, projectId, dbSchemaPattern,
- tableNamePattern, tableTypes,
columnNamePattern));
- }
+ if (depth == GetObjectsDepth.Catalogs)
+ {
+ catalogDbSchemasValues.Add(null);
+ }
+ else
+ {
+ catalogDbSchemasValues.Add(GetDbSchemas(
+ depth, projectId, dbSchemaPattern,
+ tableNamePattern, tableTypes, columnNamePattern));
}
}
}
@@ -394,7 +487,13 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
string dbSchemaRegexp = PatternToRegEx(dbSchemaPattern);
- PagedEnumerable<DatasetList, BigQueryDataset>? schemas =
this.client?.ListDatasets(catalog);
+ Func<Task<PagedEnumerable<DatasetList, BigQueryDataset>?>> func =
() => Task.Run(() =>
+ {
+ // stick with this call because PagedAsyncEnumerable has
different behaviors for selecting items
+ return Client?.ListDatasets(catalog);
+ });
+
+ PagedEnumerable<DatasetList, BigQueryDataset>? schemas =
ExecuteWithRetriesAsync<PagedEnumerable<DatasetList,
BigQueryDataset>?>(func).GetAwaiter().GetResult();
if (schemas != null)
{
@@ -1033,22 +1132,22 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
public override AdbcStatement CreateStatement()
{
- if (this.credential == null)
+ if (Credential == null)
{
throw new AdbcException("A credential must be set",
AdbcStatusCode.Unauthenticated);
}
- if (this.client == null)
+ if (Client == null)
{
- this.client = Open();
+ Client = Open();
}
- BigQueryStatement statement = new BigQueryStatement(this.client,
this.credential);
+ BigQueryStatement statement = new BigQueryStatement(this);
statement.Options = ParseOptions();
return statement;
}
- private IReadOnlyDictionary<string, string> ParseOptions()
+ private Dictionary<string, string> ParseOptions()
{
Dictionary<string, string> options = new Dictionary<string,
string>();
@@ -1066,19 +1165,20 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
foreach (string key in statementOptions)
{
- if (properties.TryGetValue(key, out string? value))
+ if (this.properties.TryGetValue(key, out string? value))
{
options[key] = value;
}
}
- return new ReadOnlyDictionary<string, string>(options);
+ return options;
}
public override void Dispose()
{
- this.client?.Dispose();
- this.client = null;
+ Client?.Dispose();
+ Client = null;
+ this.httpClient?.Dispose();
}
private static Regex sanitizedInputRegex = new
Regex("^[a-zA-Z0-9_-]+");
@@ -1121,14 +1221,55 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
HttpRequestMessage request = new
HttpRequestMessage(HttpMethod.Post, tokenEndpoint);
request.Headers.Add("Accept", "application/json");
request.Content = new StringContent(body, Encoding.UTF8,
"application/x-www-form-urlencoded");
- HttpResponseMessage response =
httpClient.SendAsync(request).Result;
- string responseBody = response.Content.ReadAsStringAsync().Result;
+ HttpResponseMessage response =
httpClient.SendAsync(request).GetAwaiter().GetResult();
+ string responseBody =
response.Content.ReadAsStringAsync().GetAwaiter().GetResult();
BigQueryTokenResponse? bigQueryTokenResponse =
JsonSerializer.Deserialize<BigQueryTokenResponse>(responseBody);
return bigQueryTokenResponse?.AccessToken;
}
+ /// <summary>
+ /// Gets the access token from the sts endpoint.
+ /// </summary>
+ /// <param name="audience"></param>
+ /// <param name="entraAccessToken"></param>
+ /// <returns></returns>
+ private string? TradeEntraIdTokenForBigQueryToken(string audience,
string entraAccessToken)
+ {
+ try
+ {
+ var requestBody = new
+ {
+ scope = BigQueryConstants.EntraIdScope,
+ subjectToken = entraAccessToken,
+ audience = audience,
+ grantType = BigQueryConstants.EntraGrantType,
+ subjectTokenType = BigQueryConstants.EntraSubjectTokenType,
+ requestedTokenType =
BigQueryConstants.EntraRequestedTokenType
+ };
+
+ string json = JsonSerializer.Serialize(requestBody);
+ StringContent content = new StringContent(json, Encoding.UTF8,
"application/json");
+
+ HttpResponseMessage response =
this.httpClient.PostAsync(BigQueryConstants.EntraStsTokenEndpoint,
content).GetAwaiter().GetResult();
+ response.EnsureSuccessStatusCode();
+
+ string responseBody =
response.Content.ReadAsStringAsync().GetAwaiter().GetResult();
+
+ BigQueryStsTokenResponse? bigQueryTokenResponse =
JsonSerializer.Deserialize<BigQueryStsTokenResponse>(responseBody);
+
+ return bigQueryTokenResponse?.AccessToken;
+ }
+ catch (Exception ex)
+ {
+ throw new AdbcException(
+ "Unable to obtain access token from BigQuery",
+ AdbcStatusCode.Unauthenticated,
+ ex);
+ }
+ }
+
enum XdbcDataType
{
XdbcDataType_XDBC_UNKNOWN_TYPE = 0,
diff --git a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs
b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs
index 02873c2f1..45125856b 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs
@@ -20,8 +20,10 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
/// <summary>
/// Parameters used for connecting to BigQuery data sources.
/// </summary>
- public class BigQueryParameters
+ internal class BigQueryParameters
{
+ public const string AccessToken = "adbc.bigquery.access_token";
+ public const string AudienceUri = "adbc.bigquery.audience_uri";
public const string ProjectId = "adbc.bigquery.project_id";
public const string BillingProjectId =
"adbc.bigquery.billing_project_id";
public const string ClientId = "adbc.bigquery.client_id";
@@ -36,6 +38,8 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
public const string Scopes = "adbc.bigquery.scopes";
public const string IncludeConstraintsWithGetObjects =
"adbc.bigquery.include_constraints_getobjects";
public const string ClientTimeout = "adbc.bigquery.client.timeout";
+ public const string MaximumRetryAttempts =
"adbc.bigquery.maximum_retries";
+ public const string RetryDelayMs = "adbc.bigquery.retry_delay_ms";
public const string GetQueryResultsOptionsTimeout =
"adbc.bigquery.get_query_results_options.timeout";
public const string MaxFetchConcurrency =
"adbc.bigquery.max_fetch_concurrency";
public const string IncludePublicProjectId =
"adbc.bigquery.include_public_project_id";
@@ -47,13 +51,21 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
/// <summary>
/// Constants used for default parameter values.
/// </summary>
- public class BigQueryConstants
+ internal class BigQueryConstants
{
public const string UserAuthenticationType = "user";
+ public const string EntraIdAuthenticationType = "aad";
public const string ServiceAccountAuthenticationType = "service";
public const string TokenEndpoint =
"https://accounts.google.com/o/oauth2/token";
public const string TreatLargeDecimalAsString = "true";
+ // Entra ID / Azure AD constants
+ public const string EntraGrantType =
"urn:ietf:params:oauth:grant-type:token-exchange";
+ public const string EntraSubjectTokenType =
"urn:ietf:params:oauth:token-type:id_token";
+ public const string EntraRequestedTokenType =
"urn:ietf:params:oauth:token-type:access_token";
+ public const string EntraIdScope =
"https://www.googleapis.com/auth/cloud-platform";
+ public const string EntraStsTokenEndpoint =
"https://sts.googleapis.com/v1/token";
+
// default value per
https://pkg.go.dev/cloud.google.com/go/bigquery#section-readme
public const string DetectProjectId = "*detect-project-id*";
}
diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
index 578f2eef7..e7c6673e1 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
@@ -36,88 +37,158 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
/// <summary>
/// BigQuery-specific implementation of <see cref="AdbcStatement"/>
/// </summary>
- public class BigQueryStatement : AdbcStatement
+ class BigQueryStatement : AdbcStatement, ITokenProtectedResource,
IDisposable
{
- readonly BigQueryClient client;
- readonly GoogleCredential credential;
+ readonly BigQueryConnection bigQueryConnection;
- public BigQueryStatement(BigQueryClient client, GoogleCredential
credential)
+ public BigQueryStatement(BigQueryConnection bigQueryConnection)
{
- this.client = client;
- this.credential = credential;
+ if (bigQueryConnection == null) { throw new
AdbcException($"{nameof(bigQueryConnection)} cannot be null",
AdbcStatusCode.InvalidArgument); }
+
+ // pass on the handler since this isn't accessible publicly
+ UpdateToken = bigQueryConnection.UpdateToken;
+
+ this.bigQueryConnection = bigQueryConnection;
}
- public IReadOnlyDictionary<string, string>? Options { get; set; }
+ public Func<Task>? UpdateToken { get; set; }
+
+ internal Dictionary<string, string>? Options { get; set; }
+
+ private BigQueryClient Client => this.bigQueryConnection.Client ??
throw new AdbcException("Client cannot be null");
+
+ private GoogleCredential Credential =>
this.bigQueryConnection.Credential ?? throw new AdbcException("Credential
cannot be null");
+
+ private int MaxRetryAttempts =>
this.bigQueryConnection.MaxRetryAttempts;
+
+ private int RetryDelayMs => this.bigQueryConnection.RetryDelayMs;
+
+ public override void SetOption(string key, string value)
+ {
+ if (Options == null)
+ {
+ Options = new Dictionary<string, string>();
+ }
+
+ Options[key] = value;
+ }
public override QueryResult ExecuteQuery()
{
- // Create job
+ return ExecuteQueryInternalAsync().GetAwaiter().GetResult();
+ }
+
+ private async Task<QueryResult> ExecuteQueryInternalAsync()
+ {
QueryOptions queryOptions = ValidateOptions();
- BigQueryJob job = this.client.CreateQueryJob(SqlQuery, null,
queryOptions);
+ BigQueryJob job = await Client.CreateQueryJobAsync(SqlQuery, null,
queryOptions);
- // Get results
+ JobReference jobReference = job.Reference;
GetQueryResultsOptions getQueryResultsOptions = new
GetQueryResultsOptions();
- if
(this.Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout,
out string? timeoutSeconds) == true &&
+
+ if
(Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out
string? timeoutSeconds) == true &&
int.TryParse(timeoutSeconds, out int seconds) &&
seconds >= 0)
{
getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds);
}
- BigQueryResults results =
job.GetQueryResults(getQueryResultsOptions);
+
+ Func<Task<BigQueryJob>> checkJobStatus = async () =>
+ {
+ while (true)
+ {
+ var jobWithStatus = await Client.GetJobAsync(jobReference);
+
+ if (jobWithStatus.State == JobState.Done)
+ {
+ if (jobWithStatus.Status.ErrorResult != null)
+ {
+ // TODO: log
+ Debug.WriteLine($"Error:
{jobWithStatus.Status.ErrorResult.Message}");
+ }
+
+ return jobWithStatus;
+ }
+ }
+ };
+
+ await ExecuteWithRetriesAsync<BigQueryJob>(checkJobStatus);
+
+ Func<Task<BigQueryResults>> getJobResults = async () =>
+ {
+ // if the authentication token was reset, then we need a new
job with the latest token
+ BigQueryJob completedJob = await
Client.GetJobAsync(jobReference);
+ return await completedJob.GetQueryResultsAsync();
+ };
+
+ BigQueryResults results = await
ExecuteWithRetriesAsync(getJobResults);
+
+ TokenProtectedReadClientManger clientMgr = new
TokenProtectedReadClientManger(Credential);
+ clientMgr.UpdateToken = () => Task.Run(() =>
+ {
+ this.bigQueryConnection.SetCredential();
+ clientMgr.UpdateCredential(Credential);
+ });
// For multi-statement queries, the results.TableReference is null
if (results.TableReference == null)
{
string statementType = string.Empty;
- if
(this.Options?.TryGetValue(BigQueryParameters.StatementType, out string?
statementTypeString) == true)
+ if (Options?.TryGetValue(BigQueryParameters.StatementType, out
string? statementTypeString) == true)
{
statementType = statementTypeString;
}
int statementIndex = 1;
- if
(this.Options?.TryGetValue(BigQueryParameters.StatementIndex, out string?
statementIndexString) == true &&
+ if (Options?.TryGetValue(BigQueryParameters.StatementIndex,
out string? statementIndexString) == true &&
int.TryParse(statementIndexString, out int
statementIndexInt) &&
statementIndexInt > 0)
{
statementIndex = statementIndexInt;
}
string evaluationKind = string.Empty;
- if
(this.Options?.TryGetValue(BigQueryParameters.EvaluationKind, out string?
evaluationKindString) == true)
+ if (Options?.TryGetValue(BigQueryParameters.EvaluationKind,
out string? evaluationKindString) == true)
{
evaluationKind = evaluationKindString;
}
- // To get the results of all statements in a multi-statement
query, enumerate the child jobs. Related public docs:
https://cloud.google.com/bigquery/docs/multi-statement-queries#get_all_executed_statements.
- // Can filter by StatementType and EvaluationKind. Related
public docs:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobstatistics2,
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#evaluationkind
- ListJobsOptions listJobsOptions = new ListJobsOptions();
- listJobsOptions.ParentJobId = results.JobReference.JobId;
- var joblist = client.ListJobs(listJobsOptions)
- .Select(job => client.GetJob(job.Reference))
- .Where(job => string.IsNullOrEmpty(evaluationKind) ||
job.Statistics.ScriptStatistics.EvaluationKind.Equals(evaluationKind,
StringComparison.OrdinalIgnoreCase))
- .Where(job => string.IsNullOrEmpty(statementType) ||
job.Statistics.Query.StatementType.Equals(statementType,StringComparison.OrdinalIgnoreCase))
- .OrderBy(job => job.Resource.Statistics.CreationTime)
- .ToList();
-
- if (joblist.Count > 0)
+ Func<Task<BigQueryResults>> getMultiJobResults = async () =>
{
- if (statementIndex < 1 || statementIndex > joblist.Count)
+ // To get the results of all statements in a
multi-statement query, enumerate the child jobs. Related public docs:
https://cloud.google.com/bigquery/docs/multi-statement-queries#get_all_executed_statements.
+ // Can filter by StatementType and EvaluationKind. Related
public docs:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobstatistics2,
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#evaluationkind
+ ListJobsOptions listJobsOptions = new ListJobsOptions();
+ listJobsOptions.ParentJobId = results.JobReference.JobId;
+ var joblist = Client.ListJobs(listJobsOptions)
+ .Select(job => Client.GetJob(job.Reference))
+ .Where(job => string.IsNullOrEmpty(evaluationKind) ||
job.Statistics.ScriptStatistics.EvaluationKind.Equals(evaluationKind,
StringComparison.OrdinalIgnoreCase))
+ .Where(job => string.IsNullOrEmpty(statementType) ||
job.Statistics.Query.StatementType.Equals(statementType,
StringComparison.OrdinalIgnoreCase))
+ .OrderBy(job => job.Resource.Statistics.CreationTime)
+ .ToList();
+
+ if (joblist.Count > 0)
{
- throw new ArgumentOutOfRangeException($"The specified
index {statementIndex} is out of range. There are {joblist.Count} jobs
available.");
+ if (statementIndex < 1 || statementIndex >
joblist.Count)
+ {
+ throw new ArgumentOutOfRangeException($"The
specified index {statementIndex} is out of range. There are {joblist.Count}
jobs available.");
+ }
+ return await joblist[statementIndex -
1].GetQueryResultsAsync(getQueryResultsOptions);
}
- results = joblist[statementIndex -
1].GetQueryResults(getQueryResultsOptions);
- }
+
+ throw new AdbcException($"Unable to obtain result from
statement [{statementIndex}]", AdbcStatusCode.InvalidData);
+ };
+
+ results = await ExecuteWithRetriesAsync(getMultiJobResults);
}
- if (results.TableReference == null)
+
+ if (results?.TableReference == null)
{
throw new AdbcException("There is no query statement");
}
- // BigQuery Read Client for streaming
- BigQueryReadClientBuilder readClientBuilder = new
BigQueryReadClientBuilder();
- readClientBuilder.Credential = this.credential;
- BigQueryReadClient readClient = readClientBuilder.Build();
string table =
$"projects/{results.TableReference.ProjectId}/datasets/{results.TableReference.DatasetId}/tables/{results.TableReference.TableId}";
+
int maxStreamCount = 1;
- if
(this.Options?.TryGetValue(BigQueryParameters.MaxFetchConcurrency, out string?
maxStreamCountString) == true)
+
+ if (Options?.TryGetValue(BigQueryParameters.MaxFetchConcurrency,
out string? maxStreamCountString) == true)
{
if (int.TryParse(maxStreamCountString, out int count))
{
@@ -127,36 +198,45 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
}
}
}
+
ReadSession rs = new ReadSession { Table = table, DataFormat =
DataFormat.Arrow };
- ReadSession rrs = readClient.CreateReadSession("projects/" +
results.TableReference.ProjectId, rs, maxStreamCount);
+
+ Func<Task<ReadSession>> createReadSession = () =>
clientMgr.ReadClient.CreateReadSessionAsync("projects/" +
results.TableReference.ProjectId, rs, maxStreamCount);
+
+ ReadSession rrs = await
ExecuteWithRetriesAsync<ReadSession>(createReadSession);
+
long totalRows = results.TotalRows == null ? -1L :
(long)results.TotalRows.Value;
+
var readers = rrs.Streams
- .Select(s => ReadChunk(readClient, s.Name))
+ .Select(s => ReadChunkWithRetries(clientMgr,
s.Name))
.Where(chunk => chunk != null)
.Cast<IArrowReader>();
+
IArrowArrayStream stream = new
MultiArrowReader(TranslateSchema(results.Schema), readers);
+
return new QueryResult(totalRows, stream);
}
public override UpdateResult ExecuteUpdate()
+ {
+ return ExecuteUpdateInternalAsync().GetAwaiter().GetResult();
+ }
+
+ private async Task<UpdateResult> ExecuteUpdateInternalAsync()
{
QueryOptions options = ValidateOptions();
GetQueryResultsOptions getQueryResultsOptions = new
GetQueryResultsOptions();
- if
(this.Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout,
out string? timeoutSeconds) == true &&
+ if
(Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out
string? timeoutSeconds) == true &&
int.TryParse(timeoutSeconds, out int seconds) &&
seconds >= 0)
{
getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds);
}
- BigQueryResults result = this.client.ExecuteQuery(
- SqlQuery,
- parameters: null,
- queryOptions: options,
- resultsOptions: getQueryResultsOptions);
-
- long updatedRows = result.NumDmlAffectedRows == null ? -1L :
result.NumDmlAffectedRows.Value;
+ Func<Task<BigQueryResults?>> func = () =>
Client.ExecuteQueryAsync(SqlQuery, null, options, getQueryResultsOptions);
+ BigQueryResults? result = await
ExecuteWithRetriesAsync<BigQueryResults?>(func);
+ long updatedRows = result?.NumDmlAffectedRows.HasValue == true ?
result.NumDmlAffectedRows.Value : -1L;
return new UpdateResult(updatedRows);
}
@@ -211,8 +291,8 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
return GetType(field, new Decimal128Type(38, 9));
case "BIGNUMERIC" or "BIGDECIMAL":
- if (this.Options != null)
- return
bool.Parse(this.Options[BigQueryParameters.LargeDecimalsAsString]) ?
GetType(field, StringType.Default) : GetType(field, new Decimal256Type(76, 38));
+ if (Options != null)
+ return
bool.Parse(Options[BigQueryParameters.LargeDecimalsAsString]) ? GetType(field,
StringType.Default) : GetType(field, new Decimal256Type(76, 38));
else
return GetType(field, StringType.Default);
@@ -241,11 +321,22 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
return type;
}
- static IArrowReader? ReadChunk(BigQueryReadClient readClient, string
streamName)
+ private IArrowReader?
ReadChunkWithRetries(TokenProtectedReadClientManger clientMgr, string
streamName)
+ {
+ Func<Task<IArrowReader?>> func = () =>
Task.FromResult<IArrowReader?>(ReadChunk(clientMgr, streamName));
+ return
RetryManager.ExecuteWithRetriesAsync<IArrowReader?>(clientMgr, func,
MaxRetryAttempts, RetryDelayMs).GetAwaiter().GetResult();
+ }
+
+ private static IArrowReader? ReadChunk(TokenProtectedReadClientManger
clientMgr, string streamName)
+ {
+ return ReadChunk(clientMgr.ReadClient, streamName);
+ }
+
+ private static IArrowReader? ReadChunk(BigQueryReadClient client,
string streamName)
{
// Ideally we wouldn't need to indirect through a stream, but the
necessary APIs in Arrow
// are internal. (TODO: consider changing Arrow).
- BigQueryReadClient.ReadRowsStream readRowsStream =
readClient.ReadRows(new ReadRowsRequest { ReadStream = streamName });
+ BigQueryReadClient.ReadRowsStream readRowsStream =
client.ReadRows(new ReadRowsRequest { ReadStream = streamName });
IAsyncEnumerator<ReadRowsResponse> enumerator =
readRowsStream.GetResponseStream().GetAsyncEnumerator();
ReadRowsStream stream = new ReadRowsStream(enumerator);
@@ -264,12 +355,17 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
{
QueryOptions options = new QueryOptions();
- if (this.client.ProjectId == BigQueryConstants.DetectProjectId)
+ if (Client.ProjectId == BigQueryConstants.DetectProjectId)
{
// An error occurs when calling CreateQueryJob without the ID
set,
// so use the first one that is found. This does not prevent
from calling
// to other 'project IDs' (catalogs) with a query.
- PagedEnumerable<ProjectList, CloudProject>? projects =
this.client.ListProjects();
+ Func<Task<PagedEnumerable<ProjectList, CloudProject>?>> func =
() => Task.Run(() =>
+ {
+ return Client?.ListProjects();
+ });
+
+ PagedEnumerable<ProjectList, CloudProject>? projects =
ExecuteWithRetriesAsync<PagedEnumerable<ProjectList,
CloudProject>?>(func).GetAwaiter().GetResult();
if (projects != null)
{
@@ -282,10 +378,10 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
}
}
- if (this.Options == null || this.Options.Count == 0)
+ if (Options == null || Options.Count == 0)
return options;
- foreach (KeyValuePair<string, string> keyValuePair in this.Options)
+ foreach (KeyValuePair<string, string> keyValuePair in Options)
{
if (keyValuePair.Key == BigQueryParameters.AllowLargeResults)
{
@@ -329,7 +425,11 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
return options;
}
- class MultiArrowReader : IArrowArrayStream
+ public bool TokenRequiresUpdate(Exception ex) =>
BigQueryUtils.TokenRequiresUpdate(ex);
+
+ private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action)
=> await RetryManager.ExecuteWithRetriesAsync<T>(this, action,
MaxRetryAttempts, RetryDelayMs);
+
+ private class MultiArrowReader : IArrowArrayStream
{
readonly Schema schema;
IEnumerator<IArrowReader>? readers;
@@ -341,7 +441,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
this.readers = readers.GetEnumerator();
}
- public Schema Schema { get { return schema; } }
+ public Schema Schema { get { return this.schema; } }
public async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
diff --git a/csharp/src/Drivers/BigQuery/BigQueryTableTypes.cs
b/csharp/src/Drivers/BigQuery/BigQueryStsTokenResponse.cs
similarity index 61%
copy from csharp/src/Drivers/BigQuery/BigQueryTableTypes.cs
copy to csharp/src/Drivers/BigQuery/BigQueryStsTokenResponse.cs
index e2d47fc7a..3770b30ce 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryTableTypes.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryStsTokenResponse.cs
@@ -14,12 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System.Collections.Generic;
+
+using System.Text.Json.Serialization;
namespace Apache.Arrow.Adbc.Drivers.BigQuery
{
- internal static class BigQueryTableTypes
+ /// <summary>
+ /// The token response from BigQuery
+ /// </summary>
+ internal class BigQueryStsTokenResponse
{
- public static readonly string[] TableTypes = new string[]{ "BASE
TABLE", "VIEW", "CLONE", "SNAPSHOT" };
+ [JsonPropertyName("access_token")]
+ public string? AccessToken { get; set; }
+
+ [JsonPropertyName("issued_token_type")]
+ public string? IssuedTokenType { get; set; }
+
+ [JsonPropertyName("token_type")]
+ public string? TokenType { get; set; }
+
+ [JsonPropertyName("expires_in")]
+ public int? ExpiresIn { get; set; }
}
}
diff --git a/csharp/src/Drivers/BigQuery/BigQueryTableTypes.cs
b/csharp/src/Drivers/BigQuery/BigQueryTableTypes.cs
index e2d47fc7a..d0372e60a 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryTableTypes.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryTableTypes.cs
@@ -14,12 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System.Collections.Generic;
-
namespace Apache.Arrow.Adbc.Drivers.BigQuery
{
internal static class BigQueryTableTypes
{
- public static readonly string[] TableTypes = new string[]{ "BASE
TABLE", "VIEW", "CLONE", "SNAPSHOT" };
+ public static readonly string[] TableTypes = new string[] { "BASE
TABLE", "VIEW", "CLONE", "SNAPSHOT" };
}
}
diff --git a/csharp/src/Drivers/BigQuery/BigQueryUtils.cs
b/csharp/src/Drivers/BigQuery/BigQueryUtils.cs
new file mode 100644
index 000000000..3b3167b4e
--- /dev/null
+++ b/csharp/src/Drivers/BigQuery/BigQueryUtils.cs
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Google;
+
+namespace Apache.Arrow.Adbc.Drivers.BigQuery
+{
+ internal class BigQueryUtils
+ {
+ public static bool TokenRequiresUpdate(Exception ex)
+ {
+ bool result = false;
+
+ if (ex is GoogleApiException gaex && gaex.HttpStatusCode ==
System.Net.HttpStatusCode.Unauthorized)
+ {
+ result = true;
+ }
+
+ return result;
+ }
+ }
+}
diff --git a/csharp/src/Drivers/BigQuery/ITokenProtectedResource.cs
b/csharp/src/Drivers/BigQuery/ITokenProtectedResource.cs
new file mode 100644
index 000000000..d501dd506
--- /dev/null
+++ b/csharp/src/Drivers/BigQuery/ITokenProtectedResource.cs
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow.Adbc.Drivers.BigQuery
+{
+ /// <summary>
+ /// Common interface for a token protected resource.
+ /// </summary>
+ internal interface ITokenProtectedResource
+ {
+ /// <summary>
+ /// The function to call when updating the token.
+ /// </summary>
+ Func<Task>? UpdateToken { get; set; }
+
+ /// <summary>
+ /// Determines the token needs to be updated.
+ /// </summary>
+ /// <param name="ex">The exception that occurs.</param>
+ /// <returns>True/False indicating a refresh is needed.</returns>
+ bool TokenRequiresUpdate(Exception ex);
+ }
+}
diff --git a/csharp/src/Drivers/BigQuery/RetryManager.cs
b/csharp/src/Drivers/BigQuery/RetryManager.cs
new file mode 100644
index 000000000..a205e62ef
--- /dev/null
+++ b/csharp/src/Drivers/BigQuery/RetryManager.cs
@@ -0,0 +1,82 @@
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow.Adbc.Drivers.BigQuery
+{
+ /// <summary>
+ /// Class that will retry calling a method with a backoff.
+ /// </summary>
+ internal class RetryManager
+ {
+ public static async Task<T> ExecuteWithRetriesAsync<T>(
+ ITokenProtectedResource tokenProtectedResource,
+ Func<Task<T>> action,
+ int maxRetries = 5,
+ int initialDelayMilliseconds = 200)
+ {
+ if (action == null)
+ {
+ throw new AdbcException("There is no method to retry",
AdbcStatusCode.InvalidArgument);
+ }
+
+ int retryCount = 0;
+ int delay = initialDelayMilliseconds;
+
+ while (retryCount < maxRetries)
+ {
+ try
+ {
+ T result = await action();
+ return result;
+ }
+ catch (Exception ex)
+ {
+ retryCount++;
+ if (retryCount >= maxRetries)
+ {
+ if ((tokenProtectedResource?.UpdateToken != null))
+ {
+ if
(tokenProtectedResource?.TokenRequiresUpdate(ex) == true)
+ {
+ throw new AdbcException($"Cannot update access
token after {maxRetries} tries", AdbcStatusCode.Unauthenticated, ex);
+ }
+ }
+
+ throw new AdbcException($"Cannot execute
{action.Method.Name} after {maxRetries} tries", AdbcStatusCode.UnknownError,
ex);
+ }
+
+ if ((tokenProtectedResource?.UpdateToken != null))
+ {
+ if (tokenProtectedResource.TokenRequiresUpdate(ex) ==
true)
+ {
+ await tokenProtectedResource.UpdateToken();
+ }
+ }
+
+ await Task.Delay(delay);
+ delay = Math.Min(2 * delay, 5000);
+ }
+ }
+
+ throw new AdbcException($"Could not successfully call
{action.Method.Name}", AdbcStatusCode.UnknownError);
+ }
+ }
+}
diff --git a/csharp/src/Drivers/BigQuery/TokenProtectedReadClient.cs
b/csharp/src/Drivers/BigQuery/TokenProtectedReadClient.cs
new file mode 100644
index 000000000..831464e26
--- /dev/null
+++ b/csharp/src/Drivers/BigQuery/TokenProtectedReadClient.cs
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading.Tasks;
+using Google.Apis.Auth.OAuth2;
+using Google.Cloud.BigQuery.Storage.V1;
+
+namespace Apache.Arrow.Adbc.Drivers.BigQuery
+{
+ /// <summary>
+ /// Manages a <see cref="BigQueryReadClient"/> that is protected by a
token.
+ /// </summary>
+ internal class TokenProtectedReadClientManger : ITokenProtectedResource
+ {
+ BigQueryReadClient bigQueryReadClient;
+
+ public TokenProtectedReadClientManger(GoogleCredential credential)
+ {
+ UpdateCredential(credential);
+
+ if (bigQueryReadClient == null)
+ {
+ throw new InvalidOperationException("could not create a read
client");
+ }
+ }
+
+ public BigQueryReadClient ReadClient => bigQueryReadClient;
+
+ public void UpdateCredential(GoogleCredential? credential)
+ {
+ if (credential == null)
+ {
+ throw new ArgumentNullException(nameof(credential));
+ }
+
+ BigQueryReadClientBuilder readClientBuilder = new
BigQueryReadClientBuilder();
+ readClientBuilder.Credential = credential;
+ this.bigQueryReadClient = readClientBuilder.Build();
+ }
+
+ public Func<Task>? UpdateToken { get; set; }
+
+ public bool TokenRequiresUpdate(Exception ex) =>
BigQueryUtils.TokenRequiresUpdate(ex);
+ }
+}
diff --git a/csharp/src/Drivers/BigQuery/readme.md
b/csharp/src/Drivers/BigQuery/readme.md
index 6b733ca2f..1ff14d816 100644
--- a/csharp/src/Drivers/BigQuery/readme.md
+++ b/csharp/src/Drivers/BigQuery/readme.md
@@ -34,13 +34,17 @@ The ADBC driver passes the configured credentials to
BigQuery, but you may need
The following parameters can be used to configure the driver behavior. The
parameters are case sensitive.
+**adbc.bigquery.access_token**<br>
+ Sets the access token to use as the credential.
Currently, this is for Microsoft Entra, but this could be used for other OAuth
implementations as well.
+
+**adbc.bigquery.audience_uri**<br>
+ Sets the audience URI for the authentication token.
Currently, this is for Microsoft Entra, but this could be used for other OAuth
implementations as well.
+
**adbc.bigquery.allow_large_results**<br>
Sets the
[AllowLargeResults](https://cloud.google.com/dotnet/docs/reference/Google.Cloud.BigQuery.V2/latest/Google.Cloud.BigQuery.V2.QueryOptions#Google_Cloud_BigQuery_V2_QueryOptions_AllowLargeResults)
value of the QueryOptions to `true` if configured; otherwise, the default is
`false`.
**adbc.bigquery.auth_type**<br>
- Required. Must be `user` or `service`
-
-https://cloud.google.com/dotnet/docs/reference/Google.Cloud.BigQuery.V2/latest/Google.Cloud.BigQuery.V2.QueryOptions#Google_Cloud_BigQuery_V2_QueryOptions_AllowLargeResults
+ Required. Must be `user`, `aad` (for Microsoft Entra)
or `service`.
**adbc.bigquery.billing_project_id**<br>
The [Project
ID](https://cloud.google.com/resource-manager/docs/creating-managing-projects)
used for accessing billing BigQuery. If not specified, will default to the
detected project ID.
@@ -60,6 +64,9 @@
https://cloud.google.com/dotnet/docs/reference/Google.Cloud.BigQuery.V2/latest/G
**adbc.bigquery.get_query_results_options.timeout**<br>
Optional. Sets the timeout (in seconds) for the
GetQueryResultsOptions value. If not set, defaults to 5 minutes. Similar to a
CommandTimeout.
+**adbc.bigquery.maximum_retries**<br>
+ Optional. The maximum number of retries. Defaults to 5.
+
**adbc.bigquery.max_fetch_concurrency**<br>
Optional. Sets the
[maxStreamCount](https://cloud.google.com/dotnet/docs/reference/Google.Cloud.BigQuery.Storage.V1/latest/Google.Cloud.BigQuery.Storage.V1.BigQueryReadClient#Google_Cloud_BigQuery_Storage_V1_BigQueryReadClient_CreateReadSession_System_String_Google_Cloud_BigQuery_Storage_V1_ReadSession_System_Int32_Google_Api_Gax_Grpc_CallSettings_)
for the CreateReadSession method. If not set, defaults to 1.
@@ -75,18 +82,21 @@
https://cloud.google.com/dotnet/docs/reference/Google.Cloud.BigQuery.V2/latest/G
**adbc.bigquery.include_constraints_getobjects**<br>
Optional. Some callers do not need the constraint
details when they get the table information and can improve the speed of
obtaining the results. Setting this value to `"false"` will not include the
constraint details. The default value is `"true"`.
+**adbc.bigquery.include_public_project_id**<br>
+ Include the `bigquery-public-data` project ID with the
list of project IDs.
+
**adbc.bigquery.large_results_destination_table**<br>
Optional. Sets the
[DestinationTable](https://cloud.google.com/dotnet/docs/reference/Google.Cloud.BigQuery.V2/latest/Google.Cloud.BigQuery.V2.QueryOptions#Google_Cloud_BigQuery_V2_QueryOptions_DestinationTable)
value of the QueryOptions if configured. Expects the format to be
`{projectId}.{datasetId}.{tableId}` to set the corresponding values in the
[TableReference](https://github.com/googleapis/google-api-dotnet-client/blob/6c415c73788b848711e47c6dd33c2f93c76faf9
[...]
**adbc.bigquery.project_id**<br>
The [Project
ID](https://cloud.google.com/resource-manager/docs/creating-managing-projects)
used for accessing BigQuery. If not specified, will default to detect the
projectIds the credentials have access to.
-**adbc.bigquery.include_public_project_id**<br>
- Include the `bigquery-public-data` project ID with the
list of project IDs.
-
**adbc.bigquery.refresh_token**<br>
The refresh token used for when the generated OAuth
token expires. Required for `user` authentication.
+**adbc.bigquery.retry_delay_ms**<br>
+ Optional The delay between retries. Defaults to 200ms.
The retries could take up to `adbc.bigquery.maximum_retries` x
`adbc.bigquery.retry_delay_ms` to complete.
+
**adbc.bigquery.scopes**<br>
Optional. Comma separated list of scopes to include
for the credential.
@@ -119,3 +129,19 @@ The following table depicts how the BigQuery ADBC driver
converts a BigQuery typ
+A JSON string
See [Arrow Schema
Details](https://cloud.google.com/bigquery/docs/reference/storage/#arrow_schema_details)
for how BigQuery handles Arrow types.
+
+## Microsoft Entra
+The driver supports authenticating with a [Microsoft
Entra](https://learn.microsoft.com/en-us/entra/fundamentals/what-is-entra) ID.
For long running operations, the Entra token may timeout if the operation takes
longer than the Entra token's lifetime. The driver has the ability to perform
token refreshes by subscribing to the `UpdateToken` delegate on the
`BigQueryConnection`. In this scenario, the driver will attempt to perform an
operation. If that operation fails due to an Unauthorize [...]
+
+Sample code to refresh the token:
+
+```
+Dictionary<string,string> properties = ...;
+BigQueryConnection connection = new BigQueryConnection(properties);
+connection.UpdateToken = () => Task.Run(() =>
+{
+ connection.SetOption(BigQueryParameters.AccessToken, GetAccessToken());
+});
+```
+
+In the sample above, when a new token is needed, the delegate is invoked and
updates the `adbc.bigquery.access_token` parameter on the connection object.
diff --git a/csharp/test/Apache.Arrow.Adbc.Tests/MultiEnvironmentTestUtils.cs
b/csharp/test/Apache.Arrow.Adbc.Tests/MultiEnvironmentTestUtils.cs
index 8aaee722a..38d7f51c5 100644
--- a/csharp/test/Apache.Arrow.Adbc.Tests/MultiEnvironmentTestUtils.cs
+++ b/csharp/test/Apache.Arrow.Adbc.Tests/MultiEnvironmentTestUtils.cs
@@ -39,7 +39,7 @@ namespace Apache.Arrow.Adbc.Tests
{
// use a JSON file for the various settings
string json = File.ReadAllText(environmentValue);
- testConfiguration =
JsonSerializer.Deserialize<T>(json)!;
+ testConfiguration =
JsonSerializer.Deserialize<T>(json);
}
}
}
diff --git
a/csharp/test/Drivers/BigQuery/Apache.Arrow.Adbc.Tests.Drivers.BigQuery.csproj
b/csharp/test/Drivers/BigQuery/Apache.Arrow.Adbc.Tests.Drivers.BigQuery.csproj
index 71d6b4ade..274e1386e 100644
---
a/csharp/test/Drivers/BigQuery/Apache.Arrow.Adbc.Tests.Drivers.BigQuery.csproj
+++
b/csharp/test/Drivers/BigQuery/Apache.Arrow.Adbc.Tests.Drivers.BigQuery.csproj
@@ -4,6 +4,7 @@
<TargetFrameworks
Condition="'$(TargetFrameworks)'==''">net8.0</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
+ <PackageReference Include="Azure.Identity" Version="1.13.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="System.Net.Http.WinHttpHandler"
Version="8.0.2" Condition="'$(TargetFrameworkIdentifier)' == '.NETStandard'" />
<PackageReference Include="xunit" Version="2.9.3" />
diff --git a/csharp/test/Drivers/BigQuery/AuthenticationTests.cs
b/csharp/test/Drivers/BigQuery/AuthenticationTests.cs
new file mode 100644
index 000000000..3936cac85
--- /dev/null
+++ b/csharp/test/Drivers/BigQuery/AuthenticationTests.cs
@@ -0,0 +1,121 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.BigQuery;
+using Apache.Arrow.Adbc.Tests.Xunit;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
+{
+ [TestCaseOrderer("Apache.Arrow.Adbc.Tests.Xunit.TestOrderer",
"Apache.Arrow.Adbc.Tests")]
+ public class AuthenticationTests
+ {
+ private BigQueryTestConfiguration _testConfiguration;
+ readonly List<BigQueryTestEnvironment> _environments;
+ readonly ITestOutputHelper _outputHelper;
+
+ public AuthenticationTests(ITestOutputHelper outputHelper)
+ {
+
Skip.IfNot(Utils.CanExecuteTestConfig(BigQueryTestingUtils.BIGQUERY_TEST_CONFIG_VARIABLE));
+
+ _testConfiguration =
MultiEnvironmentTestUtils.LoadMultiEnvironmentTestConfiguration<BigQueryTestConfiguration>(BigQueryTestingUtils.BIGQUERY_TEST_CONFIG_VARIABLE);
+ _environments =
MultiEnvironmentTestUtils.GetTestEnvironments<BigQueryTestEnvironment>(_testConfiguration);
+ _outputHelper = outputHelper;
+ }
+
+ /// <summary>
+ /// Validates if the Entra token can sign in.
+ /// </summary>
+ [SkippableFact, Order(1)]
+ public void CanSignInWithEntraToken()
+ {
+ BigQueryTestEnvironment? environment = _environments.Where(x =>
x.AuthenticationType ==
BigQueryConstants.EntraIdAuthenticationType).FirstOrDefault();
+ Assert.NotNull(environment);
+
+ BigQueryConnection? connection =
BigQueryTestingUtils.GetEntraProtectedBigQueryAdbcConnection(environment,
BigQueryTestingUtils.GetAccessToken(environment)) as BigQueryConnection;
+ Assert.NotNull(connection);
+
+ AdbcStatement statement = connection.CreateStatement();
+ statement.SqlQuery = environment.Query;
+
+ QueryResult queryResult = statement.ExecuteQuery();
+
+ Tests.DriverTests.CanExecuteQuery(queryResult,
environment.ExpectedResultsCount, environment.Name);
+ }
+
+ /// <summary>
+ /// Validates the behvior of a long running operation using Entra
token.
+ /// </summary>
+ /// <param name="withRefresh">
+ /// Indicates if a refresh should be performed. If true, the operation
should succeed.
+ /// If false, indicates that an error will be thrown after the number
of retries.
+ /// </param>
+ [SkippableTheory, Order(2)]
+ [InlineData(false)]
+ [InlineData(true)]
+ public void ValidateLongRunningQueryExpectedBehavior(bool withRefresh)
+ {
+ BigQueryTestEnvironment? environment = _environments.Where(x =>
x.AuthenticationType ==
BigQueryConstants.EntraIdAuthenticationType).FirstOrDefault();
+ Assert.NotNull(environment);
+
+ BigQueryConnection? connection;
+
+ if (withRefresh)
+ {
+ connection =
(BigQueryConnection)BigQueryTestingUtils.GetEntraProtectedBigQueryAdbcConnection(environment,
BigQueryTestingUtils.GetAccessToken(environment));
+ connection.UpdateToken = () => Task.Run(() =>
+ {
+ connection.SetOption(BigQueryParameters.AccessToken,
BigQueryTestingUtils.GetAccessToken(environment));
+ _outputHelper.WriteLine("Successfully set a new token");
+ });
+ }
+ else
+ {
+ // use two retries to shorten the time it takes to run the test
+ connection =
(BigQueryConnection)BigQueryTestingUtils.GetEntraProtectedBigQueryAdbcConnection(environment,
BigQueryTestingUtils.GetAccessToken(environment), 2);
+ }
+
+ Assert.NotNull(connection);
+
+ // create a query that takes 75 minutes because Entra tokens
typically expire in 60 minutes
+ AdbcStatement statement = connection.CreateStatement();
+ statement.SqlQuery = @"
+ DECLARE end_time TIMESTAMP;
+ SET end_time = TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 75
MINUTE);
+
+ WHILE CURRENT_TIMESTAMP() < end_time DO
+ END WHILE;
+
+ SELECT 'Query completed after 75 minutes' AS result;";
+
+ if (withRefresh)
+ {
+ QueryResult queryResult = statement.ExecuteQuery();
+ _outputHelper.WriteLine($"Retrieve query result with
{queryResult.RowCount} rows");
+ }
+ else
+ {
+ // throws AdbcException with the status as Unauthorized
+ Assert.ThrowsAny<AdbcException>(() =>
statement.ExecuteQuery());
+ }
+ }
+ }
+}
diff --git a/csharp/test/Drivers/BigQuery/BigQueryData.cs
b/csharp/test/Drivers/BigQuery/BigQueryData.cs
index ed0df8db5..7ef72391a 100644
--- a/csharp/test/Drivers/BigQuery/BigQueryData.cs
+++ b/csharp/test/Drivers/BigQuery/BigQueryData.cs
@@ -18,7 +18,6 @@
using System;
using System.Collections.Generic;
using System.Data.SqlTypes;
-using System.Dynamic;
using System.Text;
using System.Text.Json;
using Apache.Arrow.Types;
diff --git a/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs
b/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs
index 8b36e1db8..66a8dcd69 100644
--- a/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs
+++ b/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+using System.Collections.Generic;
using System.Text.Json.Serialization;
namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
@@ -40,6 +41,15 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
[JsonPropertyName("projectId")]
public string? ProjectId { get; set; }
+ [JsonPropertyName("authenticationType")]
+ public string AuthenticationType { get; set; } = string.Empty;
+
+ [JsonPropertyName("accessToken")]
+ public string AccessToken { get; set; } = string.Empty;
+
+ [JsonPropertyName("audience")]
+ public string Audience { get; set; } = string.Empty;
+
[JsonPropertyName("billingProjectId")]
public string? BillingProjectId { get; set; }
@@ -114,5 +124,17 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
/// </remarks>
[JsonPropertyName("structBehavior")]
public string? StructBehavior { get; set; }
+
+ [JsonPropertyName("entraConfiguration")]
+ public EntraConfiguration? EntraConfiguration { get; set; }
+ }
+
+ class EntraConfiguration
+ {
+ [JsonPropertyName("scopes")]
+ public string[]? Scopes { get; set; }
+
+ [JsonPropertyName("claims")]
+ public Dictionary<string, string>? Claims { get; set; }
}
}
diff --git a/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs
b/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs
index 07e1876b2..070ce8fef 100644
--- a/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs
+++ b/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs
@@ -15,11 +15,15 @@
* limitations under the License.
*/
+using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
+using System.Text.Json;
using Apache.Arrow.Adbc.Drivers.BigQuery;
+using Azure.Core;
+using Azure.Identity;
namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
{
@@ -44,6 +48,26 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
return connection;
}
+ internal static AdbcConnection GetEntraProtectedBigQueryAdbcConnection(
+ BigQueryTestEnvironment testEnvironment,
+ string accessToken,
+ int? maxRetries = null
+ )
+ {
+ testEnvironment.AccessToken = accessToken;
+ Dictionary<string, string> parameters =
GetBigQueryParameters(testEnvironment);
+
+ if (maxRetries.HasValue)
+ {
+ parameters.Add(BigQueryParameters.MaximumRetryAttempts,
maxRetries.Value.ToString());
+ }
+
+ AdbcDatabase database = new BigQueryDriver().Open(parameters);
+ AdbcConnection connection = database.Connect(new
Dictionary<string, string>());
+
+ return connection;
+ }
+
/// <summary>
/// Gets the parameters for connecting to BigQuery.
/// </summary>
@@ -51,7 +75,7 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
/// <returns></returns>
internal static Dictionary<string, string>
GetBigQueryParameters(BigQueryTestEnvironment testEnvironment)
{
- Dictionary<string, string> parameters = new Dictionary<string,
string>{};
+ Dictionary<string, string> parameters = new Dictionary<string,
string> { };
if (!string.IsNullOrEmpty(testEnvironment.ProjectId))
{
@@ -63,12 +87,43 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
parameters.Add(BigQueryParameters.BillingProjectId,
testEnvironment.BillingProjectId!);
}
+ if (testEnvironment.AuthenticationType != null)
+ {
+ if
(testEnvironment.AuthenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType,
StringComparison.OrdinalIgnoreCase))
+ {
+ parameters.Add(BigQueryParameters.AuthenticationType,
BigQueryConstants.ServiceAccountAuthenticationType);
+ parameters.Add(BigQueryParameters.JsonCredential,
testEnvironment.JsonCredential);
+ }
+ else if
(testEnvironment.AuthenticationType.Equals(BigQueryConstants.EntraIdAuthenticationType,
StringComparison.OrdinalIgnoreCase))
+ {
+ parameters.Add(BigQueryParameters.AuthenticationType,
BigQueryConstants.EntraIdAuthenticationType);
+
+ if (string.IsNullOrEmpty(testEnvironment.AccessToken))
+ {
+ parameters.Add(BigQueryParameters.AccessToken,
GetAccessToken(testEnvironment));
+ }
+ else
+ {
+ parameters.Add(BigQueryParameters.AccessToken,
testEnvironment.AccessToken);
+ }
+
+ parameters.Add(BigQueryParameters.AudienceUri,
testEnvironment.Audience);
+ }
+ else
+ {
+ parameters.Add(BigQueryParameters.AuthenticationType,
BigQueryConstants.UserAuthenticationType);
+ parameters.Add(BigQueryParameters.ClientId,
testEnvironment.ClientId);
+ parameters.Add(BigQueryParameters.ClientSecret,
testEnvironment.ClientSecret);
+ parameters.Add(BigQueryParameters.RefreshToken,
testEnvironment.RefreshToken);
+ }
+ }
+
if (!string.IsNullOrEmpty(testEnvironment.JsonCredential))
{
parameters.Add(BigQueryParameters.AuthenticationType,
BigQueryConstants.ServiceAccountAuthenticationType);
parameters.Add(BigQueryParameters.JsonCredential,
testEnvironment.JsonCredential);
}
- else
+ else if ((testEnvironment.AuthenticationType != null) &&
(testEnvironment.AuthenticationType.Equals(BigQueryConstants.UserAuthenticationType,
StringComparison.OrdinalIgnoreCase)))
{
parameters.Add(BigQueryParameters.AuthenticationType,
BigQueryConstants.UserAuthenticationType);
parameters.Add(BigQueryParameters.ClientId,
testEnvironment.ClientId);
@@ -133,6 +188,30 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
return parameters;
}
+ /// <summary>
+ /// Logs in to Entra using the currently logged in user found in
DefaultAzureCredential.
+ /// </summary>
+ /// <param name="environment"></param>
+ /// <returns></returns>
+ /// <exception cref="InvalidOperationException">A test environment is
not configured correctly.</exception>
+ internal static string GetAccessToken(BigQueryTestEnvironment
environment)
+ {
+ if (environment?.EntraConfiguration?.Scopes == null ||
environment?.EntraConfiguration?.Claims == null)
+ {
+ throw new InvalidOperationException("The test environment is
not configured correctly");
+ }
+
+ // the easiest way is to log in to Visual Studio using Tools >
Options > Azure Service Authentication
+ DefaultAzureCredential credential = new DefaultAzureCredential();
+
+ // Request the token
+ string claimJson =
JsonSerializer.Serialize(environment.EntraConfiguration.Claims);
+ TokenRequestContext requestContext = new
TokenRequestContext(environment.EntraConfiguration.Scopes, claims: claimJson);
+ AccessToken accessToken = credential.GetToken(requestContext);
+
+ return accessToken.Token;
+ }
+
/// <summary>
/// Parses the queries from resources/BigQueryData.sql
/// </summary>
diff --git a/csharp/test/Drivers/BigQuery/StatementTests.cs
b/csharp/test/Drivers/BigQuery/StatementTests.cs
new file mode 100644
index 000000000..f7ab55c20
--- /dev/null
+++ b/csharp/test/Drivers/BigQuery/StatementTests.cs
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System.Collections.Generic;
+using Apache.Arrow.Adbc.Drivers.BigQuery;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
+{
+ [TestCaseOrderer("Apache.Arrow.Adbc.Tests.Xunit.TestOrderer",
"Apache.Arrow.Adbc.Tests")]
+ public class StatementTests
+ {
+ BigQueryTestConfiguration _testConfiguration;
+ readonly List<BigQueryTestEnvironment> _environments;
+ readonly Dictionary<string, AdbcConnection> _configuredConnections =
new Dictionary<string, AdbcConnection>();
+ readonly ITestOutputHelper? _outputHelper;
+
+ public StatementTests(ITestOutputHelper? outputHelper)
+ {
+
Skip.IfNot(Utils.CanExecuteTestConfig(BigQueryTestingUtils.BIGQUERY_TEST_CONFIG_VARIABLE));
+
+ _testConfiguration =
MultiEnvironmentTestUtils.LoadMultiEnvironmentTestConfiguration<BigQueryTestConfiguration>(BigQueryTestingUtils.BIGQUERY_TEST_CONFIG_VARIABLE);
+ _environments =
MultiEnvironmentTestUtils.GetTestEnvironments<BigQueryTestEnvironment>(_testConfiguration);
+ _outputHelper = outputHelper;
+ }
+
+ [Fact]
+ public void CanSetStatementOption()
+ {
+ foreach (BigQueryTestEnvironment environment in _environments)
+ {
+ // ensure the value isn't already set
+ Assert.False(environment.AllowLargeResults);
+
+ AdbcConnection adbcConnection =
BigQueryTestingUtils.GetBigQueryAdbcConnection(environment);
+ AdbcStatement statement = adbcConnection.CreateStatement();
+ statement.SetOption(BigQueryParameters.AllowLargeResults,
"true");
+
+ // BigQuery is currently on ADBC 1.0, so it doesn't have the
GetOption interface. Therefore, use reflection to validate the value is set
correctly.
+ IReadOnlyDictionary<string, string>? options =
statement.GetType().GetProperty("Options")!.GetValue(statement) as
IReadOnlyDictionary<string, string>;
+ Assert.True(options != null);
+ Assert.True(options[BigQueryParameters.AllowLargeResults] ==
"true");
+ }
+ }
+ }
+}