This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 3e481a686 feat(csharp/src/Drivers/BigQuery): Enhanced tracing and
large resultset improvements (#3022)
3e481a686 is described below
commit 3e481a686bc2617e91a416f2f30c106ff101af1d
Author: davidhcoe <[email protected]>
AuthorDate: Mon Jul 7 14:27:57 2025 -0400
feat(csharp/src/Drivers/BigQuery): Enhanced tracing and large resultset
improvements (#3022)
- Adds tracing for the BigQuery driver
- Modifies the behavior for large result sets:
- Uses a default dataset ID for large result sets if one is not
specified.
- The caller can specify their own dataset. If it does not exist, the
driver will attempt to create it.
---------
Co-authored-by: David Coe <>
---
.../Tracing/ActivityExtensions.cs | 19 +
csharp/src/Drivers/BigQuery/ActivityExtensions.cs | 51 +
csharp/src/Drivers/BigQuery/BigQueryConnection.cs | 1108 +++++++++++---------
csharp/src/Drivers/BigQuery/BigQueryParameters.cs | 56 +-
csharp/src/Drivers/BigQuery/BigQueryStatement.cs | 460 +++++---
csharp/src/Drivers/BigQuery/BigQueryUtils.cs | 22 +
csharp/src/Drivers/BigQuery/RetryManager.cs | 8 +
csharp/src/Drivers/BigQuery/readme.md | 52 +
.../Drivers/BigQuery/BigQueryTestConfiguration.cs | 3 +
.../test/Drivers/BigQuery/BigQueryTestingUtils.cs | 5 +
10 files changed, 1096 insertions(+), 688 deletions(-)
diff --git a/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs
b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs
index f279f815f..1d5a0beb1 100644
--- a/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs
+++ b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs
@@ -63,6 +63,25 @@ namespace Apache.Arrow.Adbc.Tracing
return activity?.AddTag(key, value());
}
+ /// <summary>
+ /// Update the Activity to have a tag with an additional 'key' and
value 'value'.
+ /// This shows up in the <see cref="TagObjects"/> enumeration. It is
meant for information that
+ /// is useful to log but not needed for runtime control (for the
latter, <see cref="Baggage"/>)
+ /// </summary>
+ /// <returns><see langword="this" /> for convenient chaining.</returns>
+ /// <param name="key">The tag key name as a function</param>
+ /// <param name="value">The tag value mapped to the input key</param>
+ /// /// <param name="condition">The condition to check before adding
the tag</param>
+ public static Activity? AddConditionalTag(this Activity? activity,
string key, string? value, bool condition)
+ {
+ if (condition)
+ {
+ return activity?.AddTag(key, value);
+ }
+
+ return activity;
+ }
+
/// <summary>
/// Update the Activity to have a tag with an additional 'key' and
value 'value'.
/// This shows up in the <see cref="TagObjects"/> enumeration. It is
meant for information that
diff --git a/csharp/src/Drivers/BigQuery/ActivityExtensions.cs
b/csharp/src/Drivers/BigQuery/ActivityExtensions.cs
new file mode 100644
index 000000000..11e1db79c
--- /dev/null
+++ b/csharp/src/Drivers/BigQuery/ActivityExtensions.cs
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System.Diagnostics;
+using Apache.Arrow.Adbc.Tracing;
+
+namespace Apache.Arrow.Adbc.Drivers.BigQuery
+{
+ internal static class ActivityExtensions
+ {
+ private const string bigQueryKeyPrefix = "adbc.bigquery.tracing.";
+ private const string bigQueryParameterKeyValueSuffix = ".value";
+
+ public static Activity AddBigQueryTag(this Activity activity, string
key, object? value)
+ {
+ string bigQueryKey = bigQueryKeyPrefix + key;
+ return activity.AddTag(bigQueryKey, value);
+ }
+
+ public static Activity AddConditionalBigQueryTag(this Activity
activity, string key, string? value, bool condition)
+ {
+ string bigQueryKey = bigQueryKeyPrefix + key;
+ return activity.AddConditionalTag(key, value, condition)!;
+ }
+
+ public static Activity AddBigQueryParameterTag(this Activity activity,
string parameterName, object? value)
+ {
+ if (BigQueryParameters.IsSafeToLog(parameterName))
+ {
+ string bigQueryParameterValueKey = parameterName +
bigQueryParameterKeyValueSuffix;
+ return activity.AddTag(bigQueryParameterValueKey, value);
+ }
+
+ return activity;
+ }
+ }
+}
diff --git a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
index f480c5bc5..9855e76be 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Text;
@@ -24,6 +25,7 @@ using System.Text.Json;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Extensions;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
using Apache.Arrow.Types;
using Google.Api.Gax;
@@ -36,16 +38,15 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
/// <summary>
/// BigQuery-specific implementation of <see cref="AdbcConnection"/>
/// </summary>
- public class BigQueryConnection : AdbcConnection, ITokenProtectedResource
+ public class BigQueryConnection : TracingConnection,
ITokenProtectedResource
{
readonly Dictionary<string, string> properties;
readonly HttpClient httpClient;
bool includePublicProjectIds = false;
const string infoDriverName = "ADBC BigQuery Driver";
- const string infoDriverVersion = "1.0.1";
const string infoVendorName = "BigQuery";
- const string infoDriverArrowVersion = "19.0.0";
- const string publicProjectId = "bigquery-public-data";
+
+ private readonly string infoDriverArrowVersion =
BigQueryUtils.GetAssemblyVersion(typeof(IArrowArray));
readonly AdbcInfoCode[] infoSupportedCodes = new[] {
AdbcInfoCode.DriverName,
@@ -54,7 +55,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
AdbcInfoCode.VendorName
};
- public BigQueryConnection(IReadOnlyDictionary<string, string>
properties)
+ public BigQueryConnection(IReadOnlyDictionary<string, string>
properties) : base(properties)
{
if (properties == null)
{
@@ -89,6 +90,8 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
/// </summary>
public Func<Task>? UpdateToken { get; set; }
+ internal string DriverName => infoDriverName;
+
internal BigQueryClient? Client { get; private set; }
internal GoogleCredential? Credential { get; private set; }
@@ -97,136 +100,176 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
internal int RetryDelayMs { get; private set; } = 200;
+ public override string AssemblyVersion =>
BigQueryUtils.BigQueryAssemblyVersion;
+
+ public override string AssemblyName =>
BigQueryUtils.BigQueryAssemblyName;
+
/// <summary>
/// Initializes the internal BigQuery connection
/// </summary>
+ /// <param name="projectId">A project ID that has been specified by
the caller, not a user.</param>
/// <exception cref="ArgumentException"></exception>
- internal BigQueryClient Open()
+ internal BigQueryClient Open(string? projectId = null)
{
- string? projectId = null;
- string? billingProjectId = null;
- TimeSpan? clientTimeout = null;
-
- // if the caller doesn't specify a projectId, use the default
- if (!this.properties.TryGetValue(BigQueryParameters.ProjectId, out
projectId))
- projectId = BigQueryConstants.DetectProjectId;
-
- // in some situations, the publicProjectId gets passed and causes
an error when we try to create a query job:
- // Google.GoogleApiException : The service bigquery has thrown
an exception. HttpStatusCode is Forbidden.
- // Access Denied: Project bigquery-public-data: User does not
have bigquery.jobs.create permission in
- // project bigquery-public-data.
- // so if that is the case, treat it as if we need to detect the
projectId
- if (projectId.Equals(publicProjectId,
StringComparison.OrdinalIgnoreCase))
- projectId = BigQueryConstants.DetectProjectId;
-
- // the billing project can be null if it's not specified
- this.properties.TryGetValue(BigQueryParameters.BillingProjectId,
out billingProjectId);
-
- if
(this.properties.TryGetValue(BigQueryParameters.IncludePublicProjectId, out
string? result))
+ return this.TraceActivity(activity =>
{
- if (!string.IsNullOrEmpty(result))
- this.includePublicProjectIds = Convert.ToBoolean(result);
- }
+ string? billingProjectId = null;
+ TimeSpan? clientTimeout = null;
- if (this.properties.TryGetValue(BigQueryParameters.ClientTimeout,
out string? timeoutSeconds) &&
- int.TryParse(timeoutSeconds, out int seconds))
- {
- clientTimeout = TimeSpan.FromSeconds(seconds);
- }
+ if (string.IsNullOrEmpty(projectId))
+ {
+ // if the caller doesn't specify a projectId, use the
default
+ if
(!this.properties.TryGetValue(BigQueryParameters.ProjectId, out projectId))
+ {
+ projectId = BigQueryConstants.DetectProjectId;
+ }
+ else
+ {
+
activity?.AddBigQueryParameterTag(BigQueryParameters.ProjectId, projectId);
+ }
- SetCredential();
+ // in some situations, the publicProjectId gets passed and
causes an error when we try to create a query job:
+ // Google.GoogleApiException : The service bigquery
has thrown an exception. HttpStatusCode is Forbidden.
+ // Access Denied: Project bigquery-public-data: User
does not have bigquery.jobs.create permission in
+ // project bigquery-public-data.
+ // so if that is the case, treat it as if we need to
detect the projectId
+ if (projectId.Equals(BigQueryConstants.PublicProjectId,
StringComparison.OrdinalIgnoreCase))
+ {
+ projectId = BigQueryConstants.DetectProjectId;
+
activity?.AddBigQueryTag("change_public_projectId_to_detect_project_id",
projectId);
+ }
+ }
- BigQueryClientBuilder bigQueryClientBuilder = new
BigQueryClientBuilder()
- {
- ProjectId = projectId,
- QuotaProject = billingProjectId,
- GoogleCredential = Credential
- };
+ // the billing project can be null if it's not specified
+ if
(this.properties.TryGetValue(BigQueryParameters.BillingProjectId, out
billingProjectId))
+ {
+
activity?.AddBigQueryParameterTag((BigQueryParameters.BillingProjectId),
billingProjectId);
+ }
- BigQueryClient client = bigQueryClientBuilder.Build();
+ if
(this.properties.TryGetValue(BigQueryParameters.IncludePublicProjectId, out
string? result))
+ {
+ if (!string.IsNullOrEmpty(result))
+ {
+ this.includePublicProjectIds =
Convert.ToBoolean(result);
+
activity?.AddBigQueryParameterTag(BigQueryParameters.IncludePublicProjectId,
this.includePublicProjectIds);
+ }
+ }
- if (clientTimeout.HasValue)
- {
- client.Service.HttpClient.Timeout = clientTimeout.Value;
- }
+ if
(this.properties.TryGetValue(BigQueryParameters.ClientTimeout, out string?
timeoutSeconds) &&
+ int.TryParse(timeoutSeconds, out int seconds))
+ {
+ clientTimeout = TimeSpan.FromSeconds(seconds);
+
activity?.AddBigQueryParameterTag(BigQueryParameters.ClientTimeout, seconds);
+ }
+
+ SetCredential();
+
+ BigQueryClientBuilder bigQueryClientBuilder = new
BigQueryClientBuilder()
+ {
+ ProjectId = projectId,
+ QuotaProject = billingProjectId,
+ GoogleCredential = Credential
+ };
+
+ BigQueryClient client = bigQueryClientBuilder.Build();
- Client = client;
- return client;
+ if (clientTimeout.HasValue)
+ {
+ client.Service.HttpClient.Timeout = clientTimeout.Value;
+ }
+
+ Client = client;
+ return client;
+ });
}
internal void SetCredential()
{
- string? clientId = null;
- string? clientSecret = null;
- string? refreshToken = null;
- string? accessToken = null;
- string? audienceUri = null;
- string? authenticationType = null;
-
- string tokenEndpoint = BigQueryConstants.TokenEndpoint;
+ this.TraceActivity(activity =>
+ {
+ string? clientId = null;
+ string? clientSecret = null;
+ string? refreshToken = null;
+ string? accessToken = null;
+ string? audienceUri = null;
+ string? authenticationType = null;
- if
(!this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out
authenticationType))
- throw new ArgumentException($"The
{BigQueryParameters.AuthenticationType} parameter is not present");
+ string tokenEndpoint = BigQueryConstants.TokenEndpoint;
- if
(this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out string?
newAuthenticationType))
- {
- if (!string.IsNullOrEmpty(newAuthenticationType))
- authenticationType = newAuthenticationType;
+ if
(!this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out
authenticationType))
+ {
+ throw new ArgumentException($"The
{BigQueryParameters.AuthenticationType} parameter is not present");
+ }
- if
(!authenticationType.Equals(BigQueryConstants.UserAuthenticationType,
StringComparison.OrdinalIgnoreCase) &&
-
!authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType,
StringComparison.OrdinalIgnoreCase) &&
-
!authenticationType.Equals(BigQueryConstants.EntraIdAuthenticationType,
StringComparison.OrdinalIgnoreCase))
+ if
(this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out string?
newAuthenticationType))
{
- throw new ArgumentException($"The
{BigQueryParameters.AuthenticationType} parameter can only be
`{BigQueryConstants.UserAuthenticationType}`,
`{BigQueryConstants.ServiceAccountAuthenticationType}` or
`{BigQueryConstants.EntraIdAuthenticationType}`");
+ if (!string.IsNullOrEmpty(newAuthenticationType))
+ authenticationType = newAuthenticationType;
+
+ if
(!authenticationType.Equals(BigQueryConstants.UserAuthenticationType,
StringComparison.OrdinalIgnoreCase) &&
+
!authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType,
StringComparison.OrdinalIgnoreCase) &&
+
!authenticationType.Equals(BigQueryConstants.EntraIdAuthenticationType,
StringComparison.OrdinalIgnoreCase))
+ {
+ throw new ArgumentException($"The
{BigQueryParameters.AuthenticationType} parameter can only be
`{BigQueryConstants.UserAuthenticationType}`,
`{BigQueryConstants.ServiceAccountAuthenticationType}` or
`{BigQueryConstants.EntraIdAuthenticationType}`");
+ }
+ else
+ {
+
activity?.AddBigQueryParameterTag((BigQueryParameters.AuthenticationType),
authenticationType);
+ }
}
- }
- if (!string.IsNullOrEmpty(authenticationType) &&
authenticationType.Equals(BigQueryConstants.UserAuthenticationType,
StringComparison.OrdinalIgnoreCase))
- {
- if (!this.properties.TryGetValue(BigQueryParameters.ClientId,
out clientId))
- throw new ArgumentException($"The
{BigQueryParameters.ClientId} parameter is not present");
+ if (!string.IsNullOrEmpty(authenticationType) &&
authenticationType.Equals(BigQueryConstants.UserAuthenticationType,
StringComparison.OrdinalIgnoreCase))
+ {
+ if
(!this.properties.TryGetValue(BigQueryParameters.ClientId, out clientId))
+ throw new ArgumentException($"The
{BigQueryParameters.ClientId} parameter is not present");
- if
(!this.properties.TryGetValue(BigQueryParameters.ClientSecret, out
clientSecret))
- throw new ArgumentException($"The
{BigQueryParameters.ClientSecret} parameter is not present");
+ if
(!this.properties.TryGetValue(BigQueryParameters.ClientSecret, out
clientSecret))
+ throw new ArgumentException($"The
{BigQueryParameters.ClientSecret} parameter is not present");
- if
(!this.properties.TryGetValue(BigQueryParameters.RefreshToken, out
refreshToken))
- throw new ArgumentException($"The
{BigQueryParameters.RefreshToken} parameter is not present");
+ if
(!this.properties.TryGetValue(BigQueryParameters.RefreshToken, out
refreshToken))
+ throw new ArgumentException($"The
{BigQueryParameters.RefreshToken} parameter is not present");
- Credential =
ApplyScopes(GoogleCredential.FromAccessToken(GetAccessToken(clientId,
clientSecret, refreshToken, tokenEndpoint)));
- }
- else if (!string.IsNullOrEmpty(authenticationType) &&
authenticationType.Equals(BigQueryConstants.EntraIdAuthenticationType,
StringComparison.OrdinalIgnoreCase))
- {
- if
(!this.properties.TryGetValue(BigQueryParameters.AccessToken, out accessToken))
- throw new ArgumentException($"The
{BigQueryParameters.AccessToken} parameter is not present");
+ Credential =
ApplyScopes(GoogleCredential.FromAccessToken(GetAccessToken(clientId,
clientSecret, refreshToken, tokenEndpoint)));
+ }
+ else if (!string.IsNullOrEmpty(authenticationType) &&
authenticationType.Equals(BigQueryConstants.EntraIdAuthenticationType,
StringComparison.OrdinalIgnoreCase))
+ {
+ if
(!this.properties.TryGetValue(BigQueryParameters.AccessToken, out accessToken))
+ throw new ArgumentException($"The
{BigQueryParameters.AccessToken} parameter is not present");
- if
(!this.properties.TryGetValue(BigQueryParameters.AudienceUri, out audienceUri))
- throw new ArgumentException($"The
{BigQueryParameters.AudienceUri} parameter is not present");
+ if
(!this.properties.TryGetValue(BigQueryParameters.AudienceUri, out audienceUri))
+ throw new ArgumentException($"The
{BigQueryParameters.AudienceUri} parameter is not present");
- Credential =
ApplyScopes(GoogleCredential.FromAccessToken(TradeEntraIdTokenForBigQueryToken(audienceUri,
accessToken)));
- }
- else if (!string.IsNullOrEmpty(authenticationType) &&
authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType,
StringComparison.OrdinalIgnoreCase))
- {
- string? json = string.Empty;
+ Credential =
ApplyScopes(GoogleCredential.FromAccessToken(TradeEntraIdTokenForBigQueryToken(audienceUri,
accessToken)));
+ }
+ else if (!string.IsNullOrEmpty(authenticationType) &&
authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType,
StringComparison.OrdinalIgnoreCase))
+ {
+ string? json = string.Empty;
- if
(!this.properties.TryGetValue(BigQueryParameters.JsonCredential, out json))
- throw new ArgumentException($"The
{BigQueryParameters.JsonCredential} parameter is not present");
+ if
(!this.properties.TryGetValue(BigQueryParameters.JsonCredential, out json))
+ throw new ArgumentException($"The
{BigQueryParameters.JsonCredential} parameter is not present");
- Credential = ApplyScopes(GoogleCredential.FromJson(json));
- }
- else
- {
- throw new ArgumentException($"{authenticationType} is not a
valid authenticationType");
- }
+ Credential = ApplyScopes(GoogleCredential.FromJson(json));
+ }
+ else
+ {
+ throw new ArgumentException($"{authenticationType} is not
a valid authenticationType");
+ }
+ });
}
public override void SetOption(string key, string value)
{
- this.properties[key] = value;
-
- if (key.Equals(BigQueryParameters.AccessToken))
+ this.TraceActivity(activity =>
{
- UpdateClientToken();
- }
+ activity?.AddTag(key + ".set", value);
+
+ this.properties[key] = value;
+
+ if (key.Equals(BigQueryParameters.AccessToken))
+ {
+ UpdateClientToken();
+ }
+ });
}
/// <summary>
@@ -253,11 +296,13 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
public override IArrowArrayStream GetInfo(IReadOnlyList<AdbcInfoCode>
codes)
{
- const int strValTypeID = 0;
+ return this.TraceActivity(activity =>
+ {
+ const int strValTypeID = 0;
- UnionType infoUnionType = new UnionType(
- new Field[]
- {
+ UnionType infoUnionType = new UnionType(
+ new Field[]
+ {
new Field("string_value", StringType.Default, true),
new Field("bool_value", BooleanType.Default, true),
new Field("int64_value", Int64Type.Default, true),
@@ -282,89 +327,97 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
),
true
)
- },
- new int[] { 0, 1, 2, 3, 4, 5 }.ToArray(),
- UnionMode.Dense);
+ },
+ new int[] { 0, 1, 2, 3, 4, 5 }.ToArray(),
+ UnionMode.Dense);
- if (codes.Count == 0)
- {
- codes = infoSupportedCodes;
- }
+ if (codes.Count == 0)
+ {
+ codes = infoSupportedCodes;
+ }
- UInt32Array.Builder infoNameBuilder = new UInt32Array.Builder();
- ArrowBuffer.Builder<byte> typeBuilder = new
ArrowBuffer.Builder<byte>();
- ArrowBuffer.Builder<int> offsetBuilder = new
ArrowBuffer.Builder<int>();
- StringArray.Builder stringInfoBuilder = new StringArray.Builder();
- int nullCount = 0;
- int arrayLength = codes.Count;
+ UInt32Array.Builder infoNameBuilder = new
UInt32Array.Builder();
+ ArrowBuffer.Builder<byte> typeBuilder = new
ArrowBuffer.Builder<byte>();
+ ArrowBuffer.Builder<int> offsetBuilder = new
ArrowBuffer.Builder<int>();
+ StringArray.Builder stringInfoBuilder = new
StringArray.Builder();
+ int nullCount = 0;
+ int arrayLength = codes.Count;
- foreach (AdbcInfoCode code in codes)
- {
- switch (code)
+ foreach (AdbcInfoCode code in codes)
{
- case AdbcInfoCode.DriverName:
- infoNameBuilder.Append((UInt32)code);
- typeBuilder.Append(strValTypeID);
- offsetBuilder.Append(stringInfoBuilder.Length);
- stringInfoBuilder.Append(infoDriverName);
- break;
- case AdbcInfoCode.DriverVersion:
- infoNameBuilder.Append((UInt32)code);
- typeBuilder.Append(strValTypeID);
- offsetBuilder.Append(stringInfoBuilder.Length);
- stringInfoBuilder.Append(infoDriverVersion);
- break;
- case AdbcInfoCode.DriverArrowVersion:
- infoNameBuilder.Append((UInt32)code);
- typeBuilder.Append(strValTypeID);
- offsetBuilder.Append(stringInfoBuilder.Length);
- stringInfoBuilder.Append(infoDriverArrowVersion);
- break;
- case AdbcInfoCode.VendorName:
- infoNameBuilder.Append((UInt32)code);
- typeBuilder.Append(strValTypeID);
- offsetBuilder.Append(stringInfoBuilder.Length);
- stringInfoBuilder.Append(infoVendorName);
- break;
- default:
- infoNameBuilder.Append((UInt32)code);
- typeBuilder.Append(strValTypeID);
- offsetBuilder.Append(stringInfoBuilder.Length);
- stringInfoBuilder.AppendNull();
- nullCount++;
- break;
+ string tagKey =
SemanticConventions.Db.Operation.Parameter(code.ToString().ToLowerInvariant());
+ string? tagValue = null;
+ switch (code)
+ {
+ case AdbcInfoCode.DriverName:
+ infoNameBuilder.Append((UInt32)code);
+ typeBuilder.Append(strValTypeID);
+ offsetBuilder.Append(stringInfoBuilder.Length);
+ stringInfoBuilder.Append(infoDriverName);
+ tagValue = infoDriverName;
+ break;
+ case AdbcInfoCode.DriverVersion:
+ infoNameBuilder.Append((UInt32)code);
+ typeBuilder.Append(strValTypeID);
+ offsetBuilder.Append(stringInfoBuilder.Length);
+
stringInfoBuilder.Append(BigQueryUtils.BigQueryAssemblyVersion);
+ tagValue = BigQueryUtils.BigQueryAssemblyVersion;
+ break;
+ case AdbcInfoCode.DriverArrowVersion:
+ infoNameBuilder.Append((UInt32)code);
+ typeBuilder.Append(strValTypeID);
+ offsetBuilder.Append(stringInfoBuilder.Length);
+ stringInfoBuilder.Append(infoDriverArrowVersion);
+ tagValue = infoDriverArrowVersion;
+ break;
+ case AdbcInfoCode.VendorName:
+ infoNameBuilder.Append((UInt32)code);
+ typeBuilder.Append(strValTypeID);
+ offsetBuilder.Append(stringInfoBuilder.Length);
+ stringInfoBuilder.Append(infoVendorName);
+ tagValue = infoVendorName;
+ break;
+ default:
+ infoNameBuilder.Append((UInt32)code);
+ typeBuilder.Append(strValTypeID);
+ offsetBuilder.Append(stringInfoBuilder.Length);
+ stringInfoBuilder.AppendNull();
+ nullCount++;
+ break;
+ }
+ activity?.AddTag(tagKey, tagValue);
}
- }
- StructType entryType = new StructType(
- new Field[] {
+ StructType entryType = new StructType(
+ new Field[] {
new Field("key", Int32Type.Default, false),
new Field("value", Int32Type.Default, true)});
- StructArray entriesDataArray = new StructArray(entryType, 0,
- new[] { new Int32Array.Builder().Build(), new
Int32Array.Builder().Build() },
- new ArrowBuffer.BitmapBuilder().Build());
+ StructArray entriesDataArray = new StructArray(entryType, 0,
+ new[] { new Int32Array.Builder().Build(), new
Int32Array.Builder().Build() },
+ new ArrowBuffer.BitmapBuilder().Build());
- IArrowArray[] childrenArrays = new IArrowArray[]
- {
+ IArrowArray[] childrenArrays = new IArrowArray[]
+ {
stringInfoBuilder.Build(),
new BooleanArray.Builder().Build(),
new Int64Array.Builder().Build(),
new Int32Array.Builder().Build(),
new ListArray.Builder(StringType.Default).Build(),
new List<IArrowArray?>(){ entriesDataArray
}.BuildListArrayForType(entryType)
- };
+ };
- DenseUnionArray infoValue = new DenseUnionArray(infoUnionType,
arrayLength, childrenArrays, typeBuilder.Build(), offsetBuilder.Build(),
nullCount);
+ DenseUnionArray infoValue = new DenseUnionArray(infoUnionType,
arrayLength, childrenArrays, typeBuilder.Build(), offsetBuilder.Build(),
nullCount);
- IArrowArray[] dataArrays = new IArrowArray[]
- {
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
infoNameBuilder.Build(),
infoValue
- };
- StandardSchemas.GetInfoSchema.Validate(dataArrays);
+ };
+ StandardSchemas.GetInfoSchema.Validate(dataArrays);
- return new BigQueryInfoArrowStream(StandardSchemas.GetInfoSchema,
dataArrays);
+ return new
BigQueryInfoArrowStream(StandardSchemas.GetInfoSchema, dataArrays);
+ });
}
public override IArrowArrayStream GetObjects(
@@ -375,10 +428,13 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
IReadOnlyList<string>? tableTypes,
string? columnNamePattern)
{
- IArrowArray[] dataArrays = GetCatalogs(depth, catalogPattern,
dbSchemaPattern,
- tableNamePattern, tableTypes, columnNamePattern);
+ return this.TraceActivity(activity =>
+ {
+ IArrowArray[] dataArrays = GetCatalogs(depth, catalogPattern,
dbSchemaPattern,
+ tableNamePattern, tableTypes, columnNamePattern);
- return new
BigQueryInfoArrowStream(StandardSchemas.GetObjectsSchema, dataArrays);
+ return new
BigQueryInfoArrowStream(StandardSchemas.GetObjectsSchema, dataArrays);
+ });
}
/// <summary>
@@ -395,7 +451,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
/// </summary>
public bool TokenRequiresUpdate(Exception ex) =>
BigQueryUtils.TokenRequiresUpdate(ex);
- private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action)
=> await RetryManager.ExecuteWithRetriesAsync<T>(this, action,
MaxRetryAttempts, RetryDelayMs);
+ private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this,
action, activity, MaxRetryAttempts, RetryDelayMs);
/// <summary>
/// Executes the query using the BigQueryClient.
@@ -412,10 +468,15 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
{
if (Client == null) { Client = Open(); }
- Func<Task<BigQueryResults?>> func = () =>
Client.ExecuteQueryAsync(sql, parameters ??
Enumerable.Empty<BigQueryParameter>(), queryOptions, resultsOptions);
- BigQueryResults? result =
ExecuteWithRetriesAsync<BigQueryResults?>(func).GetAwaiter().GetResult();
+ return this.TraceActivity(activity =>
+ {
+ activity?.AddConditionalTag(SemanticConventions.Db.Query.Text,
sql, BigQueryUtils.IsSafeToTrace());
+
+ Func<Task<BigQueryResults?>> func = () =>
Client.ExecuteQueryAsync(sql, parameters ??
Enumerable.Empty<BigQueryParameter>(), queryOptions, resultsOptions);
+ BigQueryResults? result =
ExecuteWithRetriesAsync<BigQueryResults?>(func,
activity).GetAwaiter().GetResult();
- return result;
+ return result;
+ });
}
private IArrowArray[] GetCatalogs(
@@ -426,58 +487,61 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
IReadOnlyList<string>? tableTypes,
string? columnNamePattern)
{
- StringArray.Builder catalogNameBuilder = new StringArray.Builder();
- List<IArrowArray?> catalogDbSchemasValues = new
List<IArrowArray?>();
- string catalogRegexp = PatternToRegEx(catalogPattern);
- PagedEnumerable<ProjectList, CloudProject>? catalogs;
- List<string> projectIds = new List<string>();
-
- Func<Task<PagedEnumerable<ProjectList, CloudProject>?>> func = ()
=> Task.Run(() =>
+ return this.TraceActivity(activity =>
{
- // stick with this call because PagedAsyncEnumerable has
different behaviors for selecting items
- return Client?.ListProjects();
- });
+ StringArray.Builder catalogNameBuilder = new
StringArray.Builder();
+ List<IArrowArray?> catalogDbSchemasValues = new
List<IArrowArray?>();
+ string catalogRegexp = PatternToRegEx(catalogPattern);
+ PagedEnumerable<ProjectList, CloudProject>? catalogs;
+ List<string> projectIds = new List<string>();
- catalogs = ExecuteWithRetriesAsync<PagedEnumerable<ProjectList,
CloudProject>?>(func).GetAwaiter().GetResult();
+ Func<Task<PagedEnumerable<ProjectList, CloudProject>?>> func =
() => Task.Run(() =>
+ {
+ // stick with this call because PagedAsyncEnumerable has
different behaviors for selecting items
+ return Client?.ListProjects();
+ });
- if (catalogs != null)
- {
- projectIds = catalogs.Select(x => x.ProjectId).ToList();
- }
+ catalogs =
ExecuteWithRetriesAsync<PagedEnumerable<ProjectList, CloudProject>?>(func,
activity).GetAwaiter().GetResult();
- if (this.includePublicProjectIds &&
!projectIds.Contains(publicProjectId))
- projectIds.Add(publicProjectId);
+ if (catalogs != null)
+ {
+ projectIds = catalogs.Select(x => x.ProjectId).ToList();
+ }
- projectIds.Sort();
+ if (this.includePublicProjectIds &&
!projectIds.Contains(BigQueryConstants.PublicProjectId))
+ projectIds.Add(BigQueryConstants.PublicProjectId);
- foreach (string projectId in projectIds)
- {
- if (Regex.IsMatch(projectId, catalogRegexp,
RegexOptions.IgnoreCase))
- {
- catalogNameBuilder.Append(projectId);
+ projectIds.Sort();
- if (depth == GetObjectsDepth.Catalogs)
- {
- catalogDbSchemasValues.Add(null);
- }
- else
+ foreach (string projectId in projectIds)
+ {
+ if (Regex.IsMatch(projectId, catalogRegexp,
RegexOptions.IgnoreCase))
{
- catalogDbSchemasValues.Add(GetDbSchemas(
- depth, projectId, dbSchemaPattern,
- tableNamePattern, tableTypes, columnNamePattern));
+ catalogNameBuilder.Append(projectId);
+
+ if (depth == GetObjectsDepth.Catalogs)
+ {
+ catalogDbSchemasValues.Add(null);
+ }
+ else
+ {
+ catalogDbSchemasValues.Add(GetDbSchemas(
+ depth, projectId, dbSchemaPattern,
+ tableNamePattern, tableTypes,
columnNamePattern));
+ }
}
}
- }
- IArrowArray[] dataArrays = new IArrowArray[]
- {
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
catalogNameBuilder.Build(),
catalogDbSchemasValues.BuildListArrayForType(new
StructType(StandardSchemas.DbSchemaSchema)),
- };
+ };
- StandardSchemas.GetObjectsSchema.Validate(dataArrays);
+ StandardSchemas.GetObjectsSchema.Validate(dataArrays);
- return dataArrays;
+ return dataArrays;
+ });
}
private StructArray GetDbSchemas(
@@ -488,57 +552,60 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
IReadOnlyList<string>? tableTypes,
string? columnNamePattern)
{
- StringArray.Builder dbSchemaNameBuilder = new
StringArray.Builder();
- List<IArrowArray?> dbSchemaTablesValues = new List<IArrowArray?>();
- ArrowBuffer.BitmapBuilder nullBitmapBuffer = new
ArrowBuffer.BitmapBuilder();
- int length = 0;
+ return this.TraceActivity(activity =>
+ {
+ StringArray.Builder dbSchemaNameBuilder = new
StringArray.Builder();
+ List<IArrowArray?> dbSchemaTablesValues = new
List<IArrowArray?>();
+ ArrowBuffer.BitmapBuilder nullBitmapBuffer = new
ArrowBuffer.BitmapBuilder();
+ int length = 0;
- string dbSchemaRegexp = PatternToRegEx(dbSchemaPattern);
+ string dbSchemaRegexp = PatternToRegEx(dbSchemaPattern);
- Func<Task<PagedEnumerable<DatasetList, BigQueryDataset>?>> func =
() => Task.Run(() =>
- {
- // stick with this call because PagedAsyncEnumerable has
different behaviors for selecting items
- return Client?.ListDatasets(catalog);
- });
+ Func<Task<PagedEnumerable<DatasetList, BigQueryDataset>?>>
func = () => Task.Run(() =>
+ {
+ // stick with this call because PagedAsyncEnumerable has
different behaviors for selecting items
+ return Client?.ListDatasets(catalog);
+ });
- PagedEnumerable<DatasetList, BigQueryDataset>? schemas =
ExecuteWithRetriesAsync<PagedEnumerable<DatasetList,
BigQueryDataset>?>(func).GetAwaiter().GetResult();
+ PagedEnumerable<DatasetList, BigQueryDataset>? schemas =
ExecuteWithRetriesAsync<PagedEnumerable<DatasetList, BigQueryDataset>?>(func,
activity).GetAwaiter().GetResult();
- if (schemas != null)
- {
- foreach (BigQueryDataset schema in schemas)
+ if (schemas != null)
{
- if (Regex.IsMatch(schema.Reference.DatasetId,
dbSchemaRegexp, RegexOptions.IgnoreCase))
+ foreach (BigQueryDataset schema in schemas)
{
- dbSchemaNameBuilder.Append(schema.Reference.DatasetId);
- length++;
- nullBitmapBuffer.Append(true);
-
- if (depth == GetObjectsDepth.DbSchemas)
- {
- dbSchemaTablesValues.Add(null);
- }
- else
+ if (Regex.IsMatch(schema.Reference.DatasetId,
dbSchemaRegexp, RegexOptions.IgnoreCase))
{
- dbSchemaTablesValues.Add(GetTableSchemas(
- depth, catalog, schema.Reference.DatasetId,
- tableNamePattern, tableTypes,
columnNamePattern));
+
dbSchemaNameBuilder.Append(schema.Reference.DatasetId);
+ length++;
+ nullBitmapBuffer.Append(true);
+
+ if (depth == GetObjectsDepth.DbSchemas)
+ {
+ dbSchemaTablesValues.Add(null);
+ }
+ else
+ {
+ dbSchemaTablesValues.Add(GetTableSchemas(
+ depth, catalog, schema.Reference.DatasetId,
+ tableNamePattern, tableTypes,
columnNamePattern));
+ }
}
}
}
- }
- IArrowArray[] dataArrays = new IArrowArray[]
- {
- dbSchemaNameBuilder.Build(),
- dbSchemaTablesValues.BuildListArrayForType(new
StructType(StandardSchemas.TableSchema)),
- };
- StandardSchemas.DbSchemaSchema.Validate(dataArrays);
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
+ dbSchemaNameBuilder.Build(),
+ dbSchemaTablesValues.BuildListArrayForType(new
StructType(StandardSchemas.TableSchema)),
+ };
+ StandardSchemas.DbSchemaSchema.Validate(dataArrays);
- return new StructArray(
- new StructType(StandardSchemas.DbSchemaSchema),
- length,
- dataArrays,
- nullBitmapBuffer.Build());
+ return new StructArray(
+ new StructType(StandardSchemas.DbSchemaSchema),
+ length,
+ dataArrays,
+ nullBitmapBuffer.Build());
+ });
}
private StructArray GetTableSchemas(
@@ -549,87 +616,90 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
IReadOnlyList<string>? tableTypes,
string? columnNamePattern)
{
- StringArray.Builder tableNameBuilder = new StringArray.Builder();
- StringArray.Builder tableTypeBuilder = new StringArray.Builder();
- List<IArrowArray?> tableColumnsValues = new List<IArrowArray?>();
- List<IArrowArray?> tableConstraintsValues = new
List<IArrowArray?>();
- ArrowBuffer.BitmapBuilder nullBitmapBuffer = new
ArrowBuffer.BitmapBuilder();
- int length = 0;
+ return this.TraceActivity(activity =>
+ {
+ StringArray.Builder tableNameBuilder = new
StringArray.Builder();
+ StringArray.Builder tableTypeBuilder = new
StringArray.Builder();
+ List<IArrowArray?> tableColumnsValues = new
List<IArrowArray?>();
+ List<IArrowArray?> tableConstraintsValues = new
List<IArrowArray?>();
+ ArrowBuffer.BitmapBuilder nullBitmapBuffer = new
ArrowBuffer.BitmapBuilder();
+ int length = 0;
- string query = string.Format("SELECT * FROM
`{0}`.`{1}`.INFORMATION_SCHEMA.TABLES",
- Sanitize(catalog), Sanitize(dbSchema));
+ string query = string.Format("SELECT * FROM
`{0}`.`{1}`.INFORMATION_SCHEMA.TABLES",
+ Sanitize(catalog), Sanitize(dbSchema));
- if (tableNamePattern != null)
- {
- query = string.Concat(query, string.Format(" WHERE table_name
LIKE '{0}'", Sanitize(tableNamePattern)));
- if (tableTypes?.Count > 0)
+ if (tableNamePattern != null)
{
- IEnumerable<string> sanitizedTypes = tableTypes.Select(x
=> Sanitize(x));
- query = string.Concat(query, string.Format(" AND
table_type IN ('{0}')", string.Join("', '", sanitizedTypes).ToUpper()));
+ query = string.Concat(query, string.Format(" WHERE
table_name LIKE '{0}'", Sanitize(tableNamePattern)));
+ if (tableTypes?.Count > 0)
+ {
+ IEnumerable<string> sanitizedTypes =
tableTypes.Select(x => Sanitize(x));
+ query = string.Concat(query, string.Format(" AND
table_type IN ('{0}')", string.Join("', '", sanitizedTypes).ToUpper()));
+ }
}
- }
- else
- {
- if (tableTypes?.Count > 0)
+ else
{
- IEnumerable<string> sanitizedTypes = tableTypes.Select(x
=> Sanitize(x));
- query = string.Concat(query, string.Format(" WHERE
table_type IN ('{0}')", string.Join("', '", sanitizedTypes).ToUpper()));
+ if (tableTypes?.Count > 0)
+ {
+ IEnumerable<string> sanitizedTypes =
tableTypes.Select(x => Sanitize(x));
+ query = string.Concat(query, string.Format(" WHERE
table_type IN ('{0}')", string.Join("', '", sanitizedTypes).ToUpper()));
+ }
}
- }
- BigQueryResults? result = ExecuteQuery(query, parameters: null);
-
- if (result != null)
- {
- bool includeConstraints = true;
-
- if
(this.properties.TryGetValue(BigQueryParameters.IncludeConstraintsWithGetObjects,
out string? includeConstraintsValue))
- {
- bool.TryParse(includeConstraintsValue, out
includeConstraints);
- }
+ BigQueryResults? result = ExecuteQuery(query, parameters:
null);
- foreach (BigQueryRow row in result)
+ if (result != null)
{
- tableNameBuilder.Append(GetValue(row["table_name"]));
- tableTypeBuilder.Append(GetValue(row["table_type"]));
- nullBitmapBuffer.Append(true);
- length++;
+ bool includeConstraints = true;
- if (depth == GetObjectsDepth.All && includeConstraints)
- {
- tableConstraintsValues.Add(GetConstraintSchema(
- depth, catalog, dbSchema,
GetValue(row["table_name"]), columnNamePattern));
- }
- else
+ if
(this.properties.TryGetValue(BigQueryParameters.IncludeConstraintsWithGetObjects,
out string? includeConstraintsValue))
{
- tableConstraintsValues.Add(null);
+ bool.TryParse(includeConstraintsValue, out
includeConstraints);
}
- if (depth == GetObjectsDepth.Tables)
+ foreach (BigQueryRow row in result)
{
- tableColumnsValues.Add(null);
- }
- else
- {
- tableColumnsValues.Add(GetColumnSchema(catalog,
dbSchema, GetValue(row["table_name"]), columnNamePattern));
+ tableNameBuilder.Append(GetValue(row["table_name"]));
+ tableTypeBuilder.Append(GetValue(row["table_type"]));
+ nullBitmapBuffer.Append(true);
+ length++;
+
+ if (depth == GetObjectsDepth.All && includeConstraints)
+ {
+ tableConstraintsValues.Add(GetConstraintSchema(
+ depth, catalog, dbSchema,
GetValue(row["table_name"]), columnNamePattern));
+ }
+ else
+ {
+ tableConstraintsValues.Add(null);
+ }
+
+ if (depth == GetObjectsDepth.Tables)
+ {
+ tableColumnsValues.Add(null);
+ }
+ else
+ {
+ tableColumnsValues.Add(GetColumnSchema(catalog,
dbSchema, GetValue(row["table_name"]), columnNamePattern));
+ }
}
}
- }
- IArrowArray[] dataArrays = new IArrowArray[]
- {
- tableNameBuilder.Build(),
- tableTypeBuilder.Build(),
- tableColumnsValues.BuildListArrayForType(new
StructType(StandardSchemas.ColumnSchema)),
- tableConstraintsValues.BuildListArrayForType(new
StructType(StandardSchemas.ConstraintSchema))
- };
- StandardSchemas.TableSchema.Validate(dataArrays);
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
+ tableNameBuilder.Build(),
+ tableTypeBuilder.Build(),
+ tableColumnsValues.BuildListArrayForType(new
StructType(StandardSchemas.ColumnSchema)),
+ tableConstraintsValues.BuildListArrayForType(new
StructType(StandardSchemas.ConstraintSchema))
+ };
+ StandardSchemas.TableSchema.Validate(dataArrays);
- return new StructArray(
- new StructType(StandardSchemas.TableSchema),
- length,
- dataArrays,
- nullBitmapBuffer.Build());
+ return new StructArray(
+ new StructType(StandardSchemas.TableSchema),
+ length,
+ dataArrays,
+ nullBitmapBuffer.Build());
+ });
}
private StructArray GetColumnSchema(
@@ -638,111 +708,114 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
string table,
string? columnNamePattern)
{
- StringArray.Builder columnNameBuilder = new StringArray.Builder();
- Int32Array.Builder ordinalPositionBuilder = new
Int32Array.Builder();
- StringArray.Builder remarksBuilder = new StringArray.Builder();
- Int16Array.Builder xdbcDataTypeBuilder = new Int16Array.Builder();
- StringArray.Builder xdbcTypeNameBuilder = new
StringArray.Builder();
- Int32Array.Builder xdbcColumnSizeBuilder = new
Int32Array.Builder();
- Int16Array.Builder xdbcDecimalDigitsBuilder = new
Int16Array.Builder();
- Int16Array.Builder xdbcNumPrecRadixBuilder = new
Int16Array.Builder();
- Int16Array.Builder xdbcNullableBuilder = new Int16Array.Builder();
- StringArray.Builder xdbcColumnDefBuilder = new
StringArray.Builder();
- Int16Array.Builder xdbcSqlDataTypeBuilder = new
Int16Array.Builder();
- Int16Array.Builder xdbcDatetimeSubBuilder = new
Int16Array.Builder();
- Int32Array.Builder xdbcCharOctetLengthBuilder = new
Int32Array.Builder();
- StringArray.Builder xdbcIsNullableBuilder = new
StringArray.Builder();
- StringArray.Builder xdbcScopeCatalogBuilder = new
StringArray.Builder();
- StringArray.Builder xdbcScopeSchemaBuilder = new
StringArray.Builder();
- StringArray.Builder xdbcScopeTableBuilder = new
StringArray.Builder();
- BooleanArray.Builder xdbcIsAutoincrementBuilder = new
BooleanArray.Builder();
- BooleanArray.Builder xdbcIsGeneratedcolumnBuilder = new
BooleanArray.Builder();
- ArrowBuffer.BitmapBuilder nullBitmapBuffer = new
ArrowBuffer.BitmapBuilder();
- int length = 0;
-
- string query = string.Format("SELECT * FROM
`{0}`.`{1}`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{2}'",
- Sanitize(catalog), Sanitize(dbSchema), Sanitize(table));
-
- if (columnNamePattern != null)
+ return this.TraceActivity(activity =>
{
- query = string.Concat(query, string.Format("AND column_name
LIKE '{0}'", Sanitize(columnNamePattern)));
- }
+ StringArray.Builder columnNameBuilder = new
StringArray.Builder();
+ Int32Array.Builder ordinalPositionBuilder = new
Int32Array.Builder();
+ StringArray.Builder remarksBuilder = new StringArray.Builder();
+ Int16Array.Builder xdbcDataTypeBuilder = new
Int16Array.Builder();
+ StringArray.Builder xdbcTypeNameBuilder = new
StringArray.Builder();
+ Int32Array.Builder xdbcColumnSizeBuilder = new
Int32Array.Builder();
+ Int16Array.Builder xdbcDecimalDigitsBuilder = new
Int16Array.Builder();
+ Int16Array.Builder xdbcNumPrecRadixBuilder = new
Int16Array.Builder();
+ Int16Array.Builder xdbcNullableBuilder = new
Int16Array.Builder();
+ StringArray.Builder xdbcColumnDefBuilder = new
StringArray.Builder();
+ Int16Array.Builder xdbcSqlDataTypeBuilder = new
Int16Array.Builder();
+ Int16Array.Builder xdbcDatetimeSubBuilder = new
Int16Array.Builder();
+ Int32Array.Builder xdbcCharOctetLengthBuilder = new
Int32Array.Builder();
+ StringArray.Builder xdbcIsNullableBuilder = new
StringArray.Builder();
+ StringArray.Builder xdbcScopeCatalogBuilder = new
StringArray.Builder();
+ StringArray.Builder xdbcScopeSchemaBuilder = new
StringArray.Builder();
+ StringArray.Builder xdbcScopeTableBuilder = new
StringArray.Builder();
+ BooleanArray.Builder xdbcIsAutoincrementBuilder = new
BooleanArray.Builder();
+ BooleanArray.Builder xdbcIsGeneratedcolumnBuilder = new
BooleanArray.Builder();
+ ArrowBuffer.BitmapBuilder nullBitmapBuffer = new
ArrowBuffer.BitmapBuilder();
+ int length = 0;
+
+ string query = string.Format("SELECT * FROM
`{0}`.`{1}`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{2}'",
+ Sanitize(catalog), Sanitize(dbSchema), Sanitize(table));
+
+ if (columnNamePattern != null)
+ {
+ query = string.Concat(query, string.Format("AND
column_name LIKE '{0}'", Sanitize(columnNamePattern)));
+ }
- BigQueryResults? result = ExecuteQuery(query, parameters: null);
+ BigQueryResults? result = ExecuteQuery(query, parameters:
null);
- if (result != null)
- {
- foreach (BigQueryRow row in result)
+ if (result != null)
{
- columnNameBuilder.Append(GetValue(row["column_name"]));
-
ordinalPositionBuilder.Append((int)(long)row["ordinal_position"]);
- remarksBuilder.Append("");
+ foreach (BigQueryRow row in result)
+ {
+ columnNameBuilder.Append(GetValue(row["column_name"]));
+
ordinalPositionBuilder.Append((int)(long)row["ordinal_position"]);
+ remarksBuilder.Append("");
- string dataType = ToTypeName(GetValue(row["data_type"]),
out string suffix);
+ string dataType =
ToTypeName(GetValue(row["data_type"]), out string suffix);
- if ((dataType.StartsWith("NUMERIC") ||
- dataType.StartsWith("DECIMAL") ||
- dataType.StartsWith("BIGNUMERIC") ||
- dataType.StartsWith("BIGDECIMAL"))
- && !string.IsNullOrEmpty(suffix))
- {
- ParsedDecimalValues values =
ParsePrecisionAndScale(suffix);
- xdbcColumnSizeBuilder.Append(values.Precision);
-
xdbcDecimalDigitsBuilder.Append(Convert.ToInt16(values.Scale));
- }
- else
- {
- xdbcColumnSizeBuilder.AppendNull();
- xdbcDecimalDigitsBuilder.AppendNull();
- }
+ if ((dataType.StartsWith("NUMERIC") ||
+ dataType.StartsWith("DECIMAL") ||
+ dataType.StartsWith("BIGNUMERIC") ||
+ dataType.StartsWith("BIGDECIMAL"))
+ && !string.IsNullOrEmpty(suffix))
+ {
+ ParsedDecimalValues values =
ParsePrecisionAndScale(suffix);
+ xdbcColumnSizeBuilder.Append(values.Precision);
+
xdbcDecimalDigitsBuilder.Append(Convert.ToInt16(values.Scale));
+ }
+ else
+ {
+ xdbcColumnSizeBuilder.AppendNull();
+ xdbcDecimalDigitsBuilder.AppendNull();
+ }
- xdbcDataTypeBuilder.AppendNull();
- xdbcTypeNameBuilder.Append(dataType);
- xdbcNumPrecRadixBuilder.AppendNull();
- xdbcNullableBuilder.AppendNull();
- xdbcColumnDefBuilder.AppendNull();
-
xdbcSqlDataTypeBuilder.Append((short)ToXdbcDataType(dataType));
- xdbcDatetimeSubBuilder.AppendNull();
- xdbcCharOctetLengthBuilder.AppendNull();
-
xdbcIsNullableBuilder.Append(row["is_nullable"].ToString());
- xdbcScopeCatalogBuilder.AppendNull();
- xdbcScopeSchemaBuilder.AppendNull();
- xdbcScopeTableBuilder.AppendNull();
- xdbcIsAutoincrementBuilder.AppendNull();
-
xdbcIsGeneratedcolumnBuilder.Append(GetValue(row["is_generated"]).ToUpper() ==
"YES");
- nullBitmapBuffer.Append(true);
- length++;
+ xdbcDataTypeBuilder.AppendNull();
+ xdbcTypeNameBuilder.Append(dataType);
+ xdbcNumPrecRadixBuilder.AppendNull();
+ xdbcNullableBuilder.AppendNull();
+ xdbcColumnDefBuilder.AppendNull();
+
xdbcSqlDataTypeBuilder.Append((short)ToXdbcDataType(dataType));
+ xdbcDatetimeSubBuilder.AppendNull();
+ xdbcCharOctetLengthBuilder.AppendNull();
+
xdbcIsNullableBuilder.Append(row["is_nullable"].ToString());
+ xdbcScopeCatalogBuilder.AppendNull();
+ xdbcScopeSchemaBuilder.AppendNull();
+ xdbcScopeTableBuilder.AppendNull();
+ xdbcIsAutoincrementBuilder.AppendNull();
+
xdbcIsGeneratedcolumnBuilder.Append(GetValue(row["is_generated"]).ToUpper() ==
"YES");
+ nullBitmapBuffer.Append(true);
+ length++;
+ }
}
- }
- IArrowArray[] dataArrays = new IArrowArray[]
- {
- columnNameBuilder.Build(),
- ordinalPositionBuilder.Build(),
- remarksBuilder.Build(),
- xdbcDataTypeBuilder.Build(),
- xdbcTypeNameBuilder.Build(),
- xdbcColumnSizeBuilder.Build(),
- xdbcDecimalDigitsBuilder.Build(),
- xdbcNumPrecRadixBuilder.Build(),
- xdbcNullableBuilder.Build(),
- xdbcColumnDefBuilder.Build(),
- xdbcSqlDataTypeBuilder.Build(),
- xdbcDatetimeSubBuilder.Build(),
- xdbcCharOctetLengthBuilder.Build(),
- xdbcIsNullableBuilder.Build(),
- xdbcScopeCatalogBuilder.Build(),
- xdbcScopeSchemaBuilder.Build(),
- xdbcScopeTableBuilder.Build(),
- xdbcIsAutoincrementBuilder.Build(),
- xdbcIsGeneratedcolumnBuilder.Build()
- };
- StandardSchemas.ColumnSchema.Validate(dataArrays);
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
+ columnNameBuilder.Build(),
+ ordinalPositionBuilder.Build(),
+ remarksBuilder.Build(),
+ xdbcDataTypeBuilder.Build(),
+ xdbcTypeNameBuilder.Build(),
+ xdbcColumnSizeBuilder.Build(),
+ xdbcDecimalDigitsBuilder.Build(),
+ xdbcNumPrecRadixBuilder.Build(),
+ xdbcNullableBuilder.Build(),
+ xdbcColumnDefBuilder.Build(),
+ xdbcSqlDataTypeBuilder.Build(),
+ xdbcDatetimeSubBuilder.Build(),
+ xdbcCharOctetLengthBuilder.Build(),
+ xdbcIsNullableBuilder.Build(),
+ xdbcScopeCatalogBuilder.Build(),
+ xdbcScopeSchemaBuilder.Build(),
+ xdbcScopeTableBuilder.Build(),
+ xdbcIsAutoincrementBuilder.Build(),
+ xdbcIsGeneratedcolumnBuilder.Build()
+ };
+ StandardSchemas.ColumnSchema.Validate(dataArrays);
- return new StructArray(
- new StructType(StandardSchemas.ColumnSchema),
- length,
- dataArrays,
- nullBitmapBuffer.Build());
+ return new StructArray(
+ new StructType(StandardSchemas.ColumnSchema),
+ length,
+ dataArrays,
+ nullBitmapBuffer.Build());
+ });
}
private StructArray GetConstraintSchema(
@@ -752,66 +825,69 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
string table,
string? columnNamePattern)
{
- StringArray.Builder constraintNameBuilder = new
StringArray.Builder();
- StringArray.Builder constraintTypeBuilder = new
StringArray.Builder();
- List<IArrowArray?> constraintColumnNamesValues = new
List<IArrowArray?>();
- List<IArrowArray?> constraintColumnUsageValues = new
List<IArrowArray?>();
- ArrowBuffer.BitmapBuilder nullBitmapBuffer = new
ArrowBuffer.BitmapBuilder();
- int length = 0;
+ return this.TraceActivity(activity =>
+ {
+ StringArray.Builder constraintNameBuilder = new
StringArray.Builder();
+ StringArray.Builder constraintTypeBuilder = new
StringArray.Builder();
+ List<IArrowArray?> constraintColumnNamesValues = new
List<IArrowArray?>();
+ List<IArrowArray?> constraintColumnUsageValues = new
List<IArrowArray?>();
+ ArrowBuffer.BitmapBuilder nullBitmapBuffer = new
ArrowBuffer.BitmapBuilder();
+ int length = 0;
- string query = string.Format("SELECT * FROM
`{0}`.`{1}`.INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE table_name = '{2}'",
- Sanitize(catalog), Sanitize(dbSchema), Sanitize(table));
+ string query = string.Format("SELECT * FROM
`{0}`.`{1}`.INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE table_name = '{2}'",
+ Sanitize(catalog), Sanitize(dbSchema), Sanitize(table));
- BigQueryResults? result = ExecuteQuery(query, parameters: null);
+ BigQueryResults? result = ExecuteQuery(query, parameters:
null);
- if (result != null)
- {
- foreach (BigQueryRow row in result)
+ if (result != null)
{
- string constraintName = GetValue(row["constraint_name"]);
- constraintNameBuilder.Append(constraintName);
- string constraintType = GetValue(row["constraint_type"]);
- constraintTypeBuilder.Append(constraintType);
- nullBitmapBuffer.Append(true);
- length++;
-
- if (depth == GetObjectsDepth.All || depth ==
GetObjectsDepth.Tables)
+ foreach (BigQueryRow row in result)
{
-
constraintColumnNamesValues.Add(GetConstraintColumnNames(
- catalog, dbSchema, table, constraintName));
- if (constraintType.ToUpper() == "FOREIGN KEY")
+ string constraintName =
GetValue(row["constraint_name"]);
+ constraintNameBuilder.Append(constraintName);
+ string constraintType =
GetValue(row["constraint_type"]);
+ constraintTypeBuilder.Append(constraintType);
+ nullBitmapBuffer.Append(true);
+ length++;
+
+ if (depth == GetObjectsDepth.All || depth ==
GetObjectsDepth.Tables)
{
-
constraintColumnUsageValues.Add(GetConstraintsUsage(
+
constraintColumnNamesValues.Add(GetConstraintColumnNames(
catalog, dbSchema, table, constraintName));
+ if (constraintType.ToUpper() == "FOREIGN KEY")
+ {
+
constraintColumnUsageValues.Add(GetConstraintsUsage(
+ catalog, dbSchema, table, constraintName));
+ }
+ else
+ {
+ constraintColumnUsageValues.Add(null);
+ }
}
else
{
+ constraintColumnNamesValues.Add(null);
constraintColumnUsageValues.Add(null);
}
}
- else
- {
- constraintColumnNamesValues.Add(null);
- constraintColumnUsageValues.Add(null);
- }
}
- }
- IArrowArray[] dataArrays = new IArrowArray[]
- {
- constraintNameBuilder.Build(),
- constraintTypeBuilder.Build(),
-
constraintColumnNamesValues.BuildListArrayForType(StringType.Default),
- constraintColumnUsageValues.BuildListArrayForType(new
StructType(StandardSchemas.UsageSchema))
- };
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
+ constraintNameBuilder.Build(),
+ constraintTypeBuilder.Build(),
+
constraintColumnNamesValues.BuildListArrayForType(StringType.Default),
+ constraintColumnUsageValues.BuildListArrayForType(new
StructType(StandardSchemas.UsageSchema))
+ };
- StandardSchemas.ConstraintSchema.Validate(dataArrays);
+ StandardSchemas.ConstraintSchema.Validate(dataArrays);
- return new StructArray(
- new StructType(StandardSchemas.ConstraintSchema),
- length,
- dataArrays,
- nullBitmapBuffer.Build());
+ return new StructArray(
+ new StructType(StandardSchemas.ConstraintSchema),
+ length,
+ dataArrays,
+ nullBitmapBuffer.Build());
+ });
}
private StringArray GetConstraintColumnNames(
@@ -820,23 +896,26 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
string table,
string constraintName)
{
- string query = string.Format("SELECT * FROM
`{0}`.`{1}`.INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE table_name = '{2}' AND
constraint_name = '{3}' ORDER BY ordinal_position",
+ return this.TraceActivity(activity =>
+ {
+ string query = string.Format("SELECT * FROM
`{0}`.`{1}`.INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE table_name = '{2}' AND
constraint_name = '{3}' ORDER BY ordinal_position",
Sanitize(catalog), Sanitize(dbSchema), Sanitize(table),
Sanitize(constraintName));
- StringArray.Builder constraintColumnNamesBuilder = new
StringArray.Builder();
+ StringArray.Builder constraintColumnNamesBuilder = new
StringArray.Builder();
- BigQueryResults? result = ExecuteQuery(query, parameters: null);
+ BigQueryResults? result = ExecuteQuery(query, parameters:
null);
- if (result != null)
- {
- foreach (BigQueryRow row in result)
+ if (result != null)
{
- string column = GetValue(row["column_name"]);
- constraintColumnNamesBuilder.Append(column);
+ foreach (BigQueryRow row in result)
+ {
+ string column = GetValue(row["column_name"]);
+ constraintColumnNamesBuilder.Append(column);
+ }
}
- }
- return constraintColumnNamesBuilder.Build();
+ return constraintColumnNamesBuilder.Build();
+ });
}
private StructArray GetConstraintsUsage(
@@ -845,51 +924,54 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
string table,
string constraintName)
{
- StringArray.Builder constraintFkCatalogBuilder = new
StringArray.Builder();
- StringArray.Builder constraintFkDbSchemaBuilder = new
StringArray.Builder();
- StringArray.Builder constraintFkTableBuilder = new
StringArray.Builder();
- StringArray.Builder constraintFkColumnNameBuilder = new
StringArray.Builder();
- ArrowBuffer.BitmapBuilder nullBitmapBuffer = new
ArrowBuffer.BitmapBuilder();
- int length = 0;
+ return this.TraceActivity(activity =>
+ {
+ StringArray.Builder constraintFkCatalogBuilder = new
StringArray.Builder();
+ StringArray.Builder constraintFkDbSchemaBuilder = new
StringArray.Builder();
+ StringArray.Builder constraintFkTableBuilder = new
StringArray.Builder();
+ StringArray.Builder constraintFkColumnNameBuilder = new
StringArray.Builder();
+ ArrowBuffer.BitmapBuilder nullBitmapBuffer = new
ArrowBuffer.BitmapBuilder();
+ int length = 0;
- string query = string.Format("SELECT * FROM
`{0}`.`{1}`.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE WHERE constraint_name =
'{2}'",
- Sanitize(catalog), Sanitize(dbSchema),
Sanitize(constraintName));
+ string query = string.Format("SELECT * FROM
`{0}`.`{1}`.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE WHERE constraint_name =
'{2}'",
+ Sanitize(catalog), Sanitize(dbSchema),
Sanitize(constraintName));
- BigQueryResults? result = ExecuteQuery(query, parameters: null);
+ BigQueryResults? result = ExecuteQuery(query, parameters:
null);
- if (result != null)
- {
- foreach (BigQueryRow row in result)
+ if (result != null)
{
- string constraint_catalog =
GetValue(row["constraint_catalog"]);
- string constraint_schema =
GetValue(row["constraint_schema"]);
- string table_name = GetValue(row["table_name"]);
- string column_name = GetValue(row["column_name"]);
-
- constraintFkCatalogBuilder.Append(constraint_catalog);
- constraintFkDbSchemaBuilder.Append(constraint_schema);
- constraintFkTableBuilder.Append(table_name);
- constraintFkColumnNameBuilder.Append(column_name);
-
- nullBitmapBuffer.Append(true);
- length++;
+ foreach (BigQueryRow row in result)
+ {
+ string constraint_catalog =
GetValue(row["constraint_catalog"]);
+ string constraint_schema =
GetValue(row["constraint_schema"]);
+ string table_name = GetValue(row["table_name"]);
+ string column_name = GetValue(row["column_name"]);
+
+ constraintFkCatalogBuilder.Append(constraint_catalog);
+ constraintFkDbSchemaBuilder.Append(constraint_schema);
+ constraintFkTableBuilder.Append(table_name);
+ constraintFkColumnNameBuilder.Append(column_name);
+
+ nullBitmapBuffer.Append(true);
+ length++;
+ }
}
- }
- IArrowArray[] dataArrays = new IArrowArray[]
- {
- constraintFkCatalogBuilder.Build(),
- constraintFkDbSchemaBuilder.Build(),
- constraintFkTableBuilder.Build(),
- constraintFkColumnNameBuilder.Build()
- };
- StandardSchemas.UsageSchema.Validate(dataArrays);
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
+ constraintFkCatalogBuilder.Build(),
+ constraintFkDbSchemaBuilder.Build(),
+ constraintFkTableBuilder.Build(),
+ constraintFkColumnNameBuilder.Build()
+ };
+ StandardSchemas.UsageSchema.Validate(dataArrays);
- return new StructArray(
- new StructType(StandardSchemas.UsageSchema),
- length,
- dataArrays,
- nullBitmapBuffer.Build());
+ return new StructArray(
+ new StructType(StandardSchemas.UsageSchema),
+ length,
+ dataArrays,
+ nullBitmapBuffer.Build());
+ });
}
private string PatternToRegEx(string? pattern)
@@ -989,22 +1071,25 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
public override Schema GetTableSchema(string? catalog, string?
dbSchema, string tableName)
{
- string query = string.Format("SELECT * FROM
`{0}`.`{1}`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{2}'",
+ return this.TraceActivity(activity =>
+ {
+ string query = string.Format("SELECT * FROM
`{0}`.`{1}`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{2}'",
Sanitize(catalog), Sanitize(dbSchema), Sanitize(tableName));
- BigQueryResults? result = ExecuteQuery(query, parameters: null);
+ BigQueryResults? result = ExecuteQuery(query, parameters:
null);
- List<Field> fields = new List<Field>();
+ List<Field> fields = new List<Field>();
- if (result != null)
- {
- foreach (BigQueryRow row in result)
+ if (result != null)
{
- fields.Add(DescToField(row));
+ foreach (BigQueryRow row in result)
+ {
+ fields.Add(DescToField(row));
+ }
}
- }
- return new Schema(fields, null);
+ return new Schema(fields, null);
+ });
}
private Field DescToField(BigQueryRow row)
@@ -1014,7 +1099,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
metaData.Add("ORDINAL_POSITION",
GetValue(row["ordinal_position"]));
metaData.Add("DATA_TYPE", GetValue(row["data_type"]));
- Field.Builder fieldBuilder =
SchemaFieldGenerator(GetValue(row["column_name"]).ToLower(),
GetValue(row["data_type"]));
+ Field.Builder fieldBuilder =
SchemaFieldGenerator(GetValue(row["column_name"]), GetValue(row["data_type"]));
fieldBuilder.Metadata(metaData);
if (!GetValue(row["is_nullable"]).Equals("YES",
StringComparison.OrdinalIgnoreCase))
@@ -1022,7 +1107,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
fieldBuilder.Nullable(false);
}
- fieldBuilder.Name(GetValue(row["column_name"]).ToLower());
+ fieldBuilder.Name(GetValue(row["column_name"]));
return fieldBuilder.Build();
}
@@ -1163,6 +1248,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
BigQueryParameters.AllowLargeResults,
BigQueryParameters.UseLegacySQL,
BigQueryParameters.LargeDecimalsAsString,
+ BigQueryParameters.LargeResultsDataset,
BigQueryParameters.LargeResultsDestinationTable,
BigQueryParameters.GetQueryResultsOptionsTimeout,
BigQueryParameters.MaxFetchConcurrency,
diff --git a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs
b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs
index 45125856b..f0002c609 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs
@@ -15,6 +15,9 @@
* limitations under the License.
*/
+using System;
+using System.Collections.Generic;
+
namespace Apache.Arrow.Adbc.Drivers.BigQuery
{
/// <summary>
@@ -23,29 +26,48 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
internal class BigQueryParameters
{
public const string AccessToken = "adbc.bigquery.access_token";
+ public const string AllowLargeResults =
"adbc.bigquery.allow_large_results";
public const string AudienceUri = "adbc.bigquery.audience_uri";
- public const string ProjectId = "adbc.bigquery.project_id";
+ public const string AuthenticationType = "adbc.bigquery.auth_type";
public const string BillingProjectId =
"adbc.bigquery.billing_project_id";
public const string ClientId = "adbc.bigquery.client_id";
public const string ClientSecret = "adbc.bigquery.client_secret";
- public const string RefreshToken = "adbc.bigquery.refresh_token";
- public const string AuthenticationType = "adbc.bigquery.auth_type";
+ public const string ClientTimeout = "adbc.bigquery.client.timeout";
+ public const string EvaluationKind =
"adbc.bigquery.multiple_statement.evaluation_kind";
+ public const string GetQueryResultsOptionsTimeout =
"adbc.bigquery.get_query_results_options.timeout";
+ public const string IncludeConstraintsWithGetObjects =
"adbc.bigquery.include_constraints_getobjects";
+ public const string IncludePublicProjectId =
"adbc.bigquery.include_public_project_id";
public const string JsonCredential =
"adbc.bigquery.auth_json_credential";
- public const string AllowLargeResults =
"adbc.bigquery.allow_large_results";
- public const string LargeResultsDestinationTable =
"adbc.bigquery.large_results_destination_table";
- public const string UseLegacySQL = "adbc.bigquery.use_legacy_sql";
public const string LargeDecimalsAsString =
"adbc.bigquery.large_decimals_as_string";
- public const string Scopes = "adbc.bigquery.scopes";
- public const string IncludeConstraintsWithGetObjects =
"adbc.bigquery.include_constraints_getobjects";
- public const string ClientTimeout = "adbc.bigquery.client.timeout";
+ public const string LargeResultsDataset =
"adbc.bigquery.large_results_dataset";
+ public const string LargeResultsDestinationTable =
"adbc.bigquery.large_results_destination_table";
+ public const string MaxFetchConcurrency =
"adbc.bigquery.max_fetch_concurrency";
public const string MaximumRetryAttempts =
"adbc.bigquery.maximum_retries";
+ public const string ProjectId = "adbc.bigquery.project_id";
+ public const string RefreshToken = "adbc.bigquery.refresh_token";
public const string RetryDelayMs = "adbc.bigquery.retry_delay_ms";
- public const string GetQueryResultsOptionsTimeout =
"adbc.bigquery.get_query_results_options.timeout";
- public const string MaxFetchConcurrency =
"adbc.bigquery.max_fetch_concurrency";
- public const string IncludePublicProjectId =
"adbc.bigquery.include_public_project_id";
- public const string StatementType =
"adbc.bigquery.multiple_statement.statement_type";
+ public const string Scopes = "adbc.bigquery.scopes";
public const string StatementIndex =
"adbc.bigquery.multiple_statement.statement_index";
- public const string EvaluationKind =
"adbc.bigquery.multiple_statement.evaluation_kind";
+ public const string StatementType =
"adbc.bigquery.multiple_statement.statement_type";
+ public const string UseLegacySQL = "adbc.bigquery.use_legacy_sql";
+
+ // these values are safe to log any time
+ private static HashSet<string> safeToLog = new
HashSet<string>(StringComparer.OrdinalIgnoreCase)
+ {
+ AllowLargeResults, AuthenticationType, BillingProjectId, ClientId,
ClientTimeout,
+ EvaluationKind, GetQueryResultsOptionsTimeout,
IncludeConstraintsWithGetObjects,
+ IncludePublicProjectId, LargeDecimalsAsString,
LargeResultsDataset, LargeResultsDestinationTable,
+ MaxFetchConcurrency, MaximumRetryAttempts, ProjectId,
RetryDelayMs, StatementIndex,
+ StatementType, UseLegacySQL
+ };
+
+ public static bool IsSafeToLog(string name)
+ {
+ if (safeToLog.Contains(name))
+ return true;
+
+ return false;
+ }
}
/// <summary>
@@ -68,5 +90,11 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
// default value per
https://pkg.go.dev/cloud.google.com/go/bigquery#section-readme
public const string DetectProjectId = "*detect-project-id*";
+
+ // Reuse what the ODBC driver already has in place, in case a caller
+ // has permission issues trying to create a new dataset
+ public const string DefaultLargeDatasetId = "_bqodbc_temp_tables";
+
+ public const string PublicProjectId = "bigquery-public-data";
}
}
diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
index 356260c97..a148e0f22 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
@@ -17,13 +17,16 @@
using System;
using System.Collections.Generic;
+using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
using Apache.Arrow.Types;
+using Google;
using Google.Api.Gax;
using Google.Apis.Auth.OAuth2;
using Google.Apis.Bigquery.v2.Data;
@@ -37,11 +40,11 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
/// <summary>
/// BigQuery-specific implementation of <see cref="AdbcStatement"/>
/// </summary>
- class BigQueryStatement : AdbcStatement, ITokenProtectedResource,
IDisposable
+ class BigQueryStatement : TracingStatement, ITokenProtectedResource,
IDisposable
{
readonly BigQueryConnection bigQueryConnection;
- public BigQueryStatement(BigQueryConnection bigQueryConnection)
+ public BigQueryStatement(BigQueryConnection bigQueryConnection) :
base(bigQueryConnection)
{
if (bigQueryConnection == null) { throw new
AdbcException($"{nameof(bigQueryConnection)} cannot be null",
AdbcStatusCode.InvalidArgument); }
@@ -63,6 +66,10 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
private int RetryDelayMs => this.bigQueryConnection.RetryDelayMs;
+ public override string AssemblyVersion =>
BigQueryUtils.BigQueryAssemblyVersion;
+
+ public override string AssemblyName =>
BigQueryUtils.BigQueryAssemblyName;
+
public override void SetOption(string key, string value)
{
if (Options == null)
@@ -80,122 +87,129 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
private async Task<QueryResult> ExecuteQueryInternalAsync()
{
- QueryOptions queryOptions = ValidateOptions();
- BigQueryJob job = await Client.CreateQueryJobAsync(SqlQuery, null,
queryOptions);
-
- JobReference jobReference = job.Reference;
- GetQueryResultsOptions getQueryResultsOptions = new
GetQueryResultsOptions();
-
- if
(Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out
string? timeoutSeconds) == true &&
- int.TryParse(timeoutSeconds, out int seconds) &&
- seconds >= 0)
+ return await this.TraceActivity(async activity =>
{
- getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds);
- }
+ QueryOptions queryOptions = ValidateOptions(activity);
- // We can't checkJobStatus, Otherwise, the timeout in
QueryResultsOptions is meaningless.
- // When encountering a long-running job, it should be controlled
by the timeout in the Google SDK instead of blocking in a while loop.
- Func<Task<BigQueryResults>> getJobResults = async () =>
- {
- // if the authentication token was reset, then we need a new
job with the latest token
- BigQueryJob completedJob = await
Client.GetJobAsync(jobReference);
- return await
completedJob.GetQueryResultsAsync(getQueryResultsOptions);
- };
+ activity?.AddConditionalTag(SemanticConventions.Db.Query.Text,
SqlQuery, BigQueryUtils.IsSafeToTrace());
- BigQueryResults results = await
ExecuteWithRetriesAsync(getJobResults);
+ BigQueryJob job = await Client.CreateQueryJobAsync(SqlQuery,
null, queryOptions);
- TokenProtectedReadClientManger clientMgr = new
TokenProtectedReadClientManger(Credential);
- clientMgr.UpdateToken = () => Task.Run(() =>
- {
- this.bigQueryConnection.SetCredential();
- clientMgr.UpdateCredential(Credential);
- });
+ JobReference jobReference = job.Reference;
+ GetQueryResultsOptions getQueryResultsOptions = new
GetQueryResultsOptions();
- // For multi-statement queries, StatementType == "SCRIPT"
- if (results.TableReference == null ||
job.Statistics.Query.StatementType.Equals("SCRIPT",
StringComparison.OrdinalIgnoreCase))
- {
- string statementType = string.Empty;
- if (Options?.TryGetValue(BigQueryParameters.StatementType, out
string? statementTypeString) == true)
+ if
(Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out
string? timeoutSeconds) == true &&
+ int.TryParse(timeoutSeconds, out int seconds) &&
+ seconds >= 0)
{
- statementType = statementTypeString;
+ getQueryResultsOptions.Timeout =
TimeSpan.FromSeconds(seconds);
+
activity?.AddBigQueryParameterTag(BigQueryParameters.GetQueryResultsOptionsTimeout,
seconds);
}
- int statementIndex = 1;
- if (Options?.TryGetValue(BigQueryParameters.StatementIndex,
out string? statementIndexString) == true &&
- int.TryParse(statementIndexString, out int
statementIndexInt) &&
- statementIndexInt > 0)
+
+ // We can't checkJobStatus, Otherwise, the timeout in
QueryResultsOptions is meaningless.
+ // When encountering a long-running job, it should be
controlled by the timeout in the Google SDK instead of blocking in a while loop.
+ Func<Task<BigQueryResults>> getJobResults = async () =>
{
- statementIndex = statementIndexInt;
- }
- string evaluationKind = string.Empty;
- if (Options?.TryGetValue(BigQueryParameters.EvaluationKind,
out string? evaluationKindString) == true)
+ // if the authentication token was reset, then we need a
new job with the latest token
+ BigQueryJob completedJob = await
Client.GetJobAsync(jobReference);
+ return await
completedJob.GetQueryResultsAsync(getQueryResultsOptions);
+ };
+
+ BigQueryResults results = await
ExecuteWithRetriesAsync(getJobResults, activity);
+
+ TokenProtectedReadClientManger clientMgr = new
TokenProtectedReadClientManger(Credential);
+ clientMgr.UpdateToken = () => Task.Run(() =>
{
- evaluationKind = evaluationKindString;
- }
+ this.bigQueryConnection.SetCredential();
+ clientMgr.UpdateCredential(Credential);
+ });
- Func<Task<BigQueryResults>> getMultiJobResults = async () =>
+ // For multi-statement queries, StatementType == "SCRIPT"
+ if (results.TableReference == null ||
job.Statistics.Query.StatementType.Equals("SCRIPT",
StringComparison.OrdinalIgnoreCase))
{
- // To get the results of all statements in a
multi-statement query, enumerate the child jobs. Related public docs:
https://cloud.google.com/bigquery/docs/multi-statement-queries#get_all_executed_statements.
- // Can filter by StatementType and EvaluationKind. Related
public docs:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobstatistics2,
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#evaluationkind
- ListJobsOptions listJobsOptions = new ListJobsOptions();
- listJobsOptions.ParentJobId = results.JobReference.JobId;
- var joblist = Client.ListJobs(listJobsOptions)
- .Select(job => Client.GetJob(job.Reference))
- .Where(job => string.IsNullOrEmpty(evaluationKind) ||
job.Statistics.ScriptStatistics.EvaluationKind.Equals(evaluationKind,
StringComparison.OrdinalIgnoreCase))
- .Where(job => string.IsNullOrEmpty(statementType) ||
job.Statistics.Query.StatementType.Equals(statementType,
StringComparison.OrdinalIgnoreCase))
- .OrderBy(job => job.Resource.Statistics.CreationTime)
- .ToList();
-
- if (joblist.Count > 0)
+ string statementType = string.Empty;
+ if (Options?.TryGetValue(BigQueryParameters.StatementType,
out string? statementTypeString) == true)
+ {
+ statementType = statementTypeString;
+ }
+ int statementIndex = 1;
+ if
(Options?.TryGetValue(BigQueryParameters.StatementIndex, out string?
statementIndexString) == true &&
+ int.TryParse(statementIndexString, out int
statementIndexInt) &&
+ statementIndexInt > 0)
+ {
+ statementIndex = statementIndexInt;
+ }
+ string evaluationKind = string.Empty;
+ if
(Options?.TryGetValue(BigQueryParameters.EvaluationKind, out string?
evaluationKindString) == true)
{
- if (statementIndex < 1 || statementIndex >
joblist.Count)
+ evaluationKind = evaluationKindString;
+ }
+
+ Func<Task<BigQueryResults>> getMultiJobResults = async ()
=>
+ {
+ // To get the results of all statements in a
multi-statement query, enumerate the child jobs. Related public docs:
https://cloud.google.com/bigquery/docs/multi-statement-queries#get_all_executed_statements.
+ // Can filter by StatementType and EvaluationKind.
Related public docs:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobstatistics2,
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#evaluationkind
+ ListJobsOptions listJobsOptions = new
ListJobsOptions();
+ listJobsOptions.ParentJobId =
results.JobReference.JobId;
+ var joblist = Client.ListJobs(listJobsOptions)
+ .Select(job => Client.GetJob(job.Reference))
+ .Where(job => string.IsNullOrEmpty(evaluationKind)
|| job.Statistics.ScriptStatistics.EvaluationKind.Equals(evaluationKind,
StringComparison.OrdinalIgnoreCase))
+ .Where(job => string.IsNullOrEmpty(statementType)
|| job.Statistics.Query.StatementType.Equals(statementType,
StringComparison.OrdinalIgnoreCase))
+ .OrderBy(job =>
job.Resource.Statistics.CreationTime)
+ .ToList();
+
+ if (joblist.Count > 0)
{
- throw new ArgumentOutOfRangeException($"The
specified index {statementIndex} is out of range. There are {joblist.Count}
jobs available.");
+ if (statementIndex < 1 || statementIndex >
joblist.Count)
+ {
+ throw new ArgumentOutOfRangeException($"The
specified index {statementIndex} is out of range. There are {joblist.Count}
jobs available.");
+ }
+ return await joblist[statementIndex -
1].GetQueryResultsAsync(getQueryResultsOptions);
}
- return await joblist[statementIndex -
1].GetQueryResultsAsync(getQueryResultsOptions);
- }
- throw new AdbcException($"Unable to obtain result from
statement [{statementIndex}]", AdbcStatusCode.InvalidData);
- };
+ throw new AdbcException($"Unable to obtain result from
statement [{statementIndex}]", AdbcStatusCode.InvalidData);
+ };
- results = await ExecuteWithRetriesAsync(getMultiJobResults);
- }
+ results = await
ExecuteWithRetriesAsync(getMultiJobResults, activity);
+ }
- if (results?.TableReference == null)
- {
- throw new AdbcException("There is no query statement");
- }
+ if (results?.TableReference == null)
+ {
+ throw new AdbcException("There is no query statement");
+ }
- string table =
$"projects/{results.TableReference.ProjectId}/datasets/{results.TableReference.DatasetId}/tables/{results.TableReference.TableId}";
+ string table =
$"projects/{results.TableReference.ProjectId}/datasets/{results.TableReference.DatasetId}/tables/{results.TableReference.TableId}";
- int maxStreamCount = 1;
+ int maxStreamCount = 1;
- if (Options?.TryGetValue(BigQueryParameters.MaxFetchConcurrency,
out string? maxStreamCountString) == true)
- {
- if (int.TryParse(maxStreamCountString, out int count))
+ if
(Options?.TryGetValue(BigQueryParameters.MaxFetchConcurrency, out string?
maxStreamCountString) == true)
{
- if (count >= 0)
+ if (int.TryParse(maxStreamCountString, out int count))
{
- maxStreamCount = count;
+ if (count >= 0)
+ {
+ maxStreamCount = count;
+ }
}
}
- }
-
- ReadSession rs = new ReadSession { Table = table, DataFormat =
DataFormat.Arrow };
- Func<Task<ReadSession>> createReadSession = () =>
clientMgr.ReadClient.CreateReadSessionAsync("projects/" +
results.TableReference.ProjectId, rs, maxStreamCount);
+ ReadSession rs = new ReadSession { Table = table, DataFormat =
DataFormat.Arrow };
- ReadSession rrs = await
ExecuteWithRetriesAsync<ReadSession>(createReadSession);
+ Func<Task<ReadSession>> createReadSession = () =>
clientMgr.ReadClient.CreateReadSessionAsync("projects/" +
results.TableReference.ProjectId, rs, maxStreamCount);
- long totalRows = results.TotalRows == null ? -1L :
(long)results.TotalRows.Value;
+ ReadSession rrs = await
ExecuteWithRetriesAsync<ReadSession>(createReadSession, activity);
- var readers = rrs.Streams
- .Select(s => ReadChunkWithRetries(clientMgr,
s.Name))
- .Where(chunk => chunk != null)
- .Cast<IArrowReader>();
+ long totalRows = results.TotalRows == null ? -1L :
(long)results.TotalRows.Value;
- IArrowArrayStream stream = new
MultiArrowReader(TranslateSchema(results.Schema), readers);
+ var readers = rrs.Streams
+ .Select(s => ReadChunkWithRetries(clientMgr,
s.Name, activity))
+ .Where(chunk => chunk != null)
+ .Cast<IArrowReader>();
- return new QueryResult(totalRows, stream);
+ IArrowArrayStream stream = new MultiArrowReader(this,
TranslateSchema(results.Schema), readers);
+ activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows,
totalRows);
+ return new QueryResult(totalRows, stream);
+ });
}
public override UpdateResult ExecuteUpdate()
@@ -205,21 +219,28 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
private async Task<UpdateResult> ExecuteUpdateInternalAsync()
{
- GetQueryResultsOptions getQueryResultsOptions = new
GetQueryResultsOptions();
-
- if
(Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out
string? timeoutSeconds) == true &&
- int.TryParse(timeoutSeconds, out int seconds) &&
- seconds >= 0)
+ return await this.TraceActivity(async activity =>
{
- getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds);
- }
+ GetQueryResultsOptions getQueryResultsOptions = new
GetQueryResultsOptions();
- // Cannot set destination table in jobs with DDL statements,
otherwise an error will be prompted
- Func<Task<BigQueryResults?>> func = () =>
Client.ExecuteQueryAsync(SqlQuery, null, null, getQueryResultsOptions);
- BigQueryResults? result = await
ExecuteWithRetriesAsync<BigQueryResults?>(func);
- long updatedRows = result?.NumDmlAffectedRows.HasValue == true ?
result.NumDmlAffectedRows.Value : -1L;
+ if
(Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out
string? timeoutSeconds) == true &&
+ int.TryParse(timeoutSeconds, out int seconds) &&
+ seconds >= 0)
+ {
+ getQueryResultsOptions.Timeout =
TimeSpan.FromSeconds(seconds);
+
activity?.AddBigQueryParameterTag(BigQueryParameters.GetQueryResultsOptionsTimeout,
seconds);
+ }
+
+ activity?.AddConditionalTag(SemanticConventions.Db.Query.Text,
SqlQuery, BigQueryUtils.IsSafeToTrace());
+
+ // Cannot set destination table in jobs with DDL statements,
otherwise an error will be prompted
+ Func<Task<BigQueryResults?>> func = () =>
Client.ExecuteQueryAsync(SqlQuery, null, null, getQueryResultsOptions);
+ BigQueryResults? result = await
ExecuteWithRetriesAsync<BigQueryResults?>(func, activity);
+ long updatedRows = result?.NumDmlAffectedRows.HasValue == true
? result.NumDmlAffectedRows.Value : -1L;
- return new UpdateResult(updatedRows);
+ activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows,
updatedRows);
+ return new UpdateResult(updatedRows);
+ });
}
private Schema TranslateSchema(TableSchema schema)
@@ -229,7 +250,13 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
private Field TranslateField(TableFieldSchema field)
{
- return new Field(field.Name, TranslateType(field), field.Mode ==
"NULLABLE");
+ List<KeyValuePair<string, string>> metadata = new
List<KeyValuePair<string, string>>()
+ {
+ new KeyValuePair<string, string>("BIGQUERY_TYPE", field.Type),
+ new KeyValuePair<string, string>("BIGQUERY_MODE", field.Mode)
+ };
+
+ return new Field(field.Name, TranslateType(field), field.Mode ==
"NULLABLE", metadata);
}
private IArrowType TranslateType(TableFieldSchema field)
@@ -302,25 +329,27 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
return type;
}
- private IArrowReader?
ReadChunkWithRetries(TokenProtectedReadClientManger clientMgr, string
streamName)
+ private IArrowReader?
ReadChunkWithRetries(TokenProtectedReadClientManger clientMgr, string
streamName, Activity? activity)
{
- Func<Task<IArrowReader?>> func = () =>
Task.FromResult<IArrowReader?>(ReadChunk(clientMgr, streamName));
- return
RetryManager.ExecuteWithRetriesAsync<IArrowReader?>(clientMgr, func,
MaxRetryAttempts, RetryDelayMs).GetAwaiter().GetResult();
+ Func<Task<IArrowReader?>> func = () =>
Task.FromResult<IArrowReader?>(ReadChunk(clientMgr, streamName, activity));
+ return
RetryManager.ExecuteWithRetriesAsync<IArrowReader?>(clientMgr, func, activity,
MaxRetryAttempts, RetryDelayMs).GetAwaiter().GetResult();
}
- private static IArrowReader? ReadChunk(TokenProtectedReadClientManger
clientMgr, string streamName)
+ private static IArrowReader? ReadChunk(TokenProtectedReadClientManger
clientMgr, string streamName, Activity? activity)
{
- return ReadChunk(clientMgr.ReadClient, streamName);
+ return ReadChunk(clientMgr.ReadClient, streamName, activity);
}
- private static IArrowReader? ReadChunk(BigQueryReadClient client,
string streamName)
+ private static IArrowReader? ReadChunk(BigQueryReadClient client,
string streamName, Activity? activity)
{
// Ideally we wouldn't need to indirect through a stream, but the
necessary APIs in Arrow
// are internal. (TODO: consider changing Arrow).
+ activity?.AddConditionalBigQueryTag("read_stream", streamName,
BigQueryUtils.IsSafeToTrace());
BigQueryReadClient.ReadRowsStream readRowsStream =
client.ReadRows(new ReadRowsRequest { ReadStream = streamName });
IAsyncEnumerator<ReadRowsResponse> enumerator =
readRowsStream.GetResponseStream().GetAsyncEnumerator();
ReadRowsStream stream = new ReadRowsStream(enumerator);
+ activity?.AddBigQueryTag("read_stream.has_rows", stream.HasRows);
if (stream.HasRows)
{
@@ -332,12 +361,14 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
}
}
- private QueryOptions ValidateOptions()
+ private QueryOptions ValidateOptions(Activity? activity)
{
QueryOptions options = new QueryOptions();
if (Client.ProjectId == BigQueryConstants.DetectProjectId)
{
+ activity?.AddBigQueryTag("client_project_id",
BigQueryConstants.DetectProjectId);
+
// An error occurs when calling CreateQueryJob without the ID
set,
// so use the first one that is found. This does not prevent
from calling
// to other 'project IDs' (catalogs) with a query.
@@ -346,7 +377,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
return Client?.ListProjects();
});
- PagedEnumerable<ProjectList, CloudProject>? projects =
ExecuteWithRetriesAsync<PagedEnumerable<ProjectList,
CloudProject>?>(func).GetAwaiter().GetResult();
+ PagedEnumerable<ProjectList, CloudProject>? projects =
ExecuteWithRetriesAsync<PagedEnumerable<ProjectList, CloudProject>?>(func,
activity).GetAwaiter().GetResult();
if (projects != null)
{
@@ -355,6 +386,9 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
if (firstProjectId != null)
{
options.ProjectId = firstProjectId;
+ activity?.AddBigQueryTag("detected_client_project_id",
firstProjectId);
+ // need to reopen the Client with the projectId
specified
+ this.bigQueryConnection.Open(firstProjectId);
}
}
}
@@ -362,105 +396,205 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
if (Options == null || Options.Count == 0)
return options;
+ string largeResultDatasetId =
BigQueryConstants.DefaultLargeDatasetId;
+
foreach (KeyValuePair<string, string> keyValuePair in Options)
{
- if (keyValuePair.Key == BigQueryParameters.AllowLargeResults)
+ switch (keyValuePair.Key)
{
- options.AllowLargeResults = true ?
keyValuePair.Value.ToLower().Equals("true") : false;
- }
- if (keyValuePair.Key ==
BigQueryParameters.LargeResultsDestinationTable)
- {
- string destinationTable = keyValuePair.Value;
+ case BigQueryParameters.AllowLargeResults:
+ options.AllowLargeResults = true ?
keyValuePair.Value.Equals("true", StringComparison.OrdinalIgnoreCase) : false;
+
activity?.AddBigQueryParameterTag(BigQueryParameters.AllowLargeResults,
options.AllowLargeResults);
+ break;
+ case BigQueryParameters.LargeResultsDataset:
+ largeResultDatasetId = keyValuePair.Value;
+
activity?.AddBigQueryParameterTag(BigQueryParameters.LargeResultsDataset,
largeResultDatasetId);
+ break;
+ case BigQueryParameters.LargeResultsDestinationTable:
+ string destinationTable = keyValuePair.Value;
- if (!destinationTable.Contains("."))
- throw new
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable}
is invalid");
+ if (!destinationTable.Contains("."))
+ throw new
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable}
is invalid");
- string projectId = string.Empty;
- string datasetId = string.Empty;
- string tableId = string.Empty;
+ string projectId = string.Empty;
+ string datasetId = string.Empty;
+ string tableId = string.Empty;
- string[] segments = destinationTable.Split('.');
+ string[] segments = destinationTable.Split('.');
- if (segments.Length != 3)
- throw new
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable}
cannot be parsed");
+ if (segments.Length != 3)
+ throw new
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable}
cannot be parsed");
- projectId = segments[0];
- datasetId = segments[1];
- tableId = segments[2];
+ projectId = segments[0];
+ datasetId = segments[1];
+ tableId = segments[2];
- if (string.IsNullOrEmpty(projectId.Trim()) ||
string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim()))
- throw new
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable}
contains invalid values");
+ if (string.IsNullOrEmpty(projectId.Trim()) ||
string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim()))
+ throw new
InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable}
contains invalid values");
- options.DestinationTable = new TableReference()
+ options.DestinationTable = new TableReference()
+ {
+ ProjectId = projectId,
+ DatasetId = datasetId,
+ TableId = tableId
+ };
+
activity?.AddBigQueryParameterTag(BigQueryParameters.LargeResultsDestinationTable,
destinationTable);
+ break;
+ case BigQueryParameters.UseLegacySQL:
+ options.UseLegacySql = true ?
keyValuePair.Value.Equals("true", StringComparison.OrdinalIgnoreCase) : false;
+
activity?.AddBigQueryParameterTag(BigQueryParameters.UseLegacySQL,
options.UseLegacySql);
+ break;
+ }
+ }
+
+ if (options.AllowLargeResults == true && options.DestinationTable
== null)
+ {
+ options.DestinationTable =
TryGetLargeDestinationTableReference(largeResultDatasetId, activity);
+ }
+
+ return options;
+ }
+
+ /// <summary>
+ /// Attempts to retrieve or create the specified dataset.
+ /// </summary>
+ /// <param name="datasetId">The name of the dataset.</param>
+ /// <returns>A <see cref="TableReference"/> to a randomly generated
table name in the specified dataset.</returns>
+ private TableReference TryGetLargeDestinationTableReference(string
datasetId, Activity? activity)
+ {
+ BigQueryDataset? dataset = null;
+
+ try
+ {
+ activity?.AddBigQueryTag("large_results.dataset.try_find",
datasetId);
+ dataset = this.Client.GetDataset(datasetId);
+ activity?.AddBigQueryTag("large_results.dataset.found",
datasetId);
+ }
+ catch (GoogleApiException gaEx)
+ {
+ if (gaEx.HttpStatusCode != System.Net.HttpStatusCode.NotFound)
+ {
+ activity?.AddException(gaEx);
+ throw new AdbcException($"Failure trying to retrieve
dataset {datasetId}", gaEx);
+ }
+ }
+
+ if (dataset == null)
+ {
+ try
+ {
+
activity?.AddBigQueryTag("large_results.dataset.try_create", datasetId);
+ DatasetReference reference =
this.Client.GetDatasetReference(datasetId);
+ BigQueryDataset bigQueryDataset = new
BigQueryDataset(this.Client, new Dataset()
{
- ProjectId = projectId,
- DatasetId = datasetId,
- TableId = tableId
- };
+ DatasetReference = reference,
+ DefaultTableExpirationMs =
(long)TimeSpan.FromDays(1).TotalMilliseconds,
+ Labels = new Dictionary<string, string>()
+ {
+ // lower case, no spaces or periods per
https://cloud.google.com/bigquery/docs/labels-intro
+ { "created_by",
this.bigQueryConnection.DriverName.ToLowerInvariant().Replace(" ","_") + "_v_"
+ AssemblyVersion.Replace(".","_") }
+ }
+ });
+
+ dataset = this.Client.CreateDataset(datasetId,
bigQueryDataset.Resource);
+ activity?.AddBigQueryTag("large_results.dataset.created",
datasetId);
}
- if (keyValuePair.Key == BigQueryParameters.UseLegacySQL)
+ catch (Exception ex)
{
- options.UseLegacySql = true ?
keyValuePair.Value.ToLower().Equals("true") : false;
+ activity?.AddException(ex);
+ throw new AdbcException($"Could not create dataset
{datasetId}", ex);
}
}
- return options;
+
+ if (dataset == null)
+ {
+ throw new AdbcException($"Could not find dataset {datasetId}",
AdbcStatusCode.NotFound);
+ }
+ else
+ {
+ TableReference reference = new TableReference()
+ {
+ ProjectId = this.Client.ProjectId,
+ DatasetId = datasetId,
+ TableId = "lg_" + Guid.NewGuid().ToString().Replace("-",
"")
+ };
+
+ activity?.AddBigQueryTag("large_results.table_reference",
reference.ToString());
+
+ return reference;
+ }
}
public bool TokenRequiresUpdate(Exception ex) =>
BigQueryUtils.TokenRequiresUpdate(ex);
- private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action)
=> await RetryManager.ExecuteWithRetriesAsync<T>(this, action,
MaxRetryAttempts, RetryDelayMs);
+ private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action,
Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this,
action, activity, MaxRetryAttempts, RetryDelayMs);
- private class MultiArrowReader : IArrowArrayStream
+ private class MultiArrowReader : TracingReader
{
+ private static readonly string s_assemblyName =
BigQueryUtils.GetAssemblyName(typeof(BigQueryStatement));
+ private static readonly string s_assemblyVersion =
BigQueryUtils.GetAssemblyVersion(typeof(BigQueryStatement));
+
readonly Schema schema;
IEnumerator<IArrowReader>? readers;
IArrowReader? reader;
- public MultiArrowReader(Schema schema, IEnumerable<IArrowReader>
readers)
+ public MultiArrowReader(BigQueryStatement statement, Schema
schema, IEnumerable<IArrowReader> readers) : base(statement)
{
this.schema = schema;
this.readers = readers.GetEnumerator();
}
- public Schema Schema { get { return this.schema; } }
+ public override Schema Schema { get { return this.schema; } }
+
+ public override string AssemblyVersion => s_assemblyVersion;
- public async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+ public override string AssemblyName => s_assemblyName;
+
+ public override async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
- if (this.readers == null)
+ return await this.TraceActivityAsync(async activity =>
{
- return null;
- }
+ if (this.readers == null)
+ {
+ return null;
+ }
- while (true)
- {
- if (this.reader == null)
+ while (true)
{
- if (!this.readers.MoveNext())
+ if (this.reader == null)
{
- Dispose(); // TODO: Remove this line
- return null;
+ if (!this.readers.MoveNext())
+ {
+ Dispose(); // TODO: Remove this line
+ return null;
+ }
+ this.reader = this.readers.Current;
}
- this.reader = this.readers.Current;
- }
- RecordBatch result = await
this.reader.ReadNextRecordBatchAsync(cancellationToken);
+ RecordBatch result = await
this.reader.ReadNextRecordBatchAsync(cancellationToken);
- if (result != null)
- {
- return result;
- }
+ if (result != null)
+ {
+ return result;
+ }
- this.reader = null;
- }
+ this.reader = null;
+ }
+ });
}
- public void Dispose()
+ protected override void Dispose(bool disposing)
{
- if (this.readers != null)
+ if (disposing)
{
- this.readers.Dispose();
- this.readers = null;
+ if (this.readers != null)
+ {
+ this.readers.Dispose();
+ this.readers = null;
+ }
}
+
+ base.Dispose(disposing);
}
}
diff --git a/csharp/src/Drivers/BigQuery/BigQueryUtils.cs
b/csharp/src/Drivers/BigQuery/BigQueryUtils.cs
index 3b3167b4e..b8fca804f 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryUtils.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryUtils.cs
@@ -16,6 +16,7 @@
*/
using System;
+using System.Diagnostics;
using Google;
namespace Apache.Arrow.Adbc.Drivers.BigQuery
@@ -33,5 +34,26 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
return result;
}
+
+ internal static string BigQueryAssemblyName =
GetAssemblyName(typeof(BigQueryConnection));
+
+ internal static string BigQueryAssemblyVersion =
GetAssemblyVersion(typeof(BigQueryConnection));
+
+ internal static string GetAssemblyName(Type type) =>
type.Assembly.GetName().Name!;
+
+ internal static string GetAssemblyVersion(Type type) =>
FileVersionInfo.GetVersionInfo(type.Assembly.Location).ProductVersion ??
string.Empty;
+
+ /// <summary>
+ /// Conditional used to determines if it is safe to trace
+ /// </summary>
+ /// <remarks>
+ /// It is safe to write to some output types (ie, files) but not
others (ie, a shared resource).
+ /// </remarks>
+ /// <returns></returns>
+ internal static bool IsSafeToTrace()
+ {
+ // TODO: Add logic to determine if a file writer is listening
+ return false;
+ }
}
}
diff --git a/csharp/src/Drivers/BigQuery/RetryManager.cs
b/csharp/src/Drivers/BigQuery/RetryManager.cs
index a205e62ef..6848fbcc1 100644
--- a/csharp/src/Drivers/BigQuery/RetryManager.cs
+++ b/csharp/src/Drivers/BigQuery/RetryManager.cs
@@ -17,6 +17,7 @@
*/
using System;
+using System.Diagnostics;
using System.Threading.Tasks;
namespace Apache.Arrow.Adbc.Drivers.BigQuery
@@ -29,6 +30,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
public static async Task<T> ExecuteWithRetriesAsync<T>(
ITokenProtectedResource tokenProtectedResource,
Func<Task<T>> action,
+ Activity? activity,
int maxRetries = 5,
int initialDelayMilliseconds = 200)
{
@@ -49,6 +51,9 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
}
catch (Exception ex)
{
+ activity?.AddBigQueryTag("retry_attempt", retryCount);
+ activity?.AddException(ex);
+
retryCount++;
if (retryCount >= maxRetries)
{
@@ -56,6 +61,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
{
if
(tokenProtectedResource?.TokenRequiresUpdate(ex) == true)
{
+
activity?.AddBigQueryTag("update_token.status", "Expired");
throw new AdbcException($"Cannot update access
token after {maxRetries} tries", AdbcStatusCode.Unauthenticated, ex);
}
}
@@ -67,7 +73,9 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
{
if (tokenProtectedResource.TokenRequiresUpdate(ex) ==
true)
{
+ activity?.AddBigQueryTag("update_token.status",
"Required");
await tokenProtectedResource.UpdateToken();
+ activity?.AddBigQueryTag("update_token.status",
"Completed");
}
}
diff --git a/csharp/src/Drivers/BigQuery/readme.md
b/csharp/src/Drivers/BigQuery/readme.md
index 1ff14d816..d49018afa 100644
--- a/csharp/src/Drivers/BigQuery/readme.md
+++ b/csharp/src/Drivers/BigQuery/readme.md
@@ -85,6 +85,9 @@ The following parameters can be used to configure the driver
behavior. The param
**adbc.bigquery.include_public_project_id**<br>
Include the `bigquery-public-data` project ID with the
list of project IDs.
+**adbc.bigquery.large_results_dataset**<br>
+ Optional. Sets the dataset ID to use for large
results. The dataset needs to be in the same region as the data being queried.
If no value is specified, the driver will attempt to use or create
`_bqodbc_temp_tables`. A randomly generated table name will be used for the
DestinationTable.
+
**adbc.bigquery.large_results_destination_table**<br>
Optional. Sets the
[DestinationTable](https://cloud.google.com/dotnet/docs/reference/Google.Cloud.BigQuery.V2/latest/Google.Cloud.BigQuery.V2.QueryOptions#Google_Cloud_BigQuery_V2_QueryOptions_DestinationTable)
value of the QueryOptions if configured. Expects the format to be
`{projectId}.{datasetId}.{tableId}` to set the corresponding values in the
[TableReference](https://github.com/googleapis/google-api-dotnet-client/blob/6c415c73788b848711e47c6dd33c2f93c76faf9
[...]
@@ -145,3 +148,52 @@ connection.UpdateToken = () => Task.Run(() =>
```
In the sample above, when a new token is needed, the delegate is invoked and
updates the `adbc.bigquery.access_token` parameter on the connection object.
+
+## Default Project ID
+
+If a `adbc.bigquery.project_id` is not specified, or if it equals
`bigquery-public-data`, the driver will query for the first project ID that is
associated with the credentials provided. This will be the project ID that is
used to perform queries.
+
+## Large Results
+
+If a result set will contain large results, the
`adbc.bigquery.allow_large_results` parameter should be set to `"true"`. If
this value is set, a destination must be specified.
+The caller can either explicitly specify the fully qualified name of the
destination table using the `adbc.bigquery.large_results_destination_table`
value, or they can specify
+a dataset using the `adbc.bigquery.large_results_dataset` parameter.
+
+Behavior:
+- If a destination table is explicitly set, the driver will use that value.
+- If only a dataset value is set, the driver will attempt to retrieve the
dataset. If the dataset does not exist, the driver will attempt to
+ create it. The default table expiration will be set to 1 day and a
`created_by` label will be included with the driver name and version that
created the dataset. For example `created_by :
adbc_bigquery_driver_v_0_19_0_0`. A randomly generated name will be used for
the table name.
+- If a destination table and a dataset are not specified, the driver will
attempt to use or create the `_bqodbc_temp_tables` dataset using the same
defaults and label specified above. A randomly generated name will be used for
the table name.
+
+## Permissions
+
+The ADBC driver uses the BigQuery Client APIs to communicate with BigQuery.
The following actions are performed in the driver and require the calling user
to have the specified permissions. For more details on the permissions, or what
roles may already have the permissions required, please see the additional
references section below.
+
+|Action|Permissions Required
+|:----------|:-------------|
+|Create Dataset<sup>*+</sup>|bigquery.datasets.create|
+|Create Query Job|bigquery.jobs.create|
+|Create Read Session|bigquery.readsessions.create<br> bigquery.tables.getData|
+|Execute Query|bigquery.jobs.create<br> bigquery.jobs.get<br>
bigquery.jobs.list|
+|Get Dataset<sup>*</sup>|bigquery.datasets.get|
+|Get Job|bigquery.jobs.get|
+|Get Query Results|bigquery.jobs.get|
+|List Jobs|bigquery.jobs.list|
+|Read Rows|bigquery.readsessions.getData|
+
+<sup>
+*Only for large result sets<br>
++If a specified dataset does not already exist.
+</sup>
+<br>
+<br>
+
+Some environments may also require:
+- [VPC Service
Controls](https://cloud.google.com/vpc-service-controls/docs/troubleshooting)
+- [Service Usage
Consumer](https://cloud.google.com/service-usage/docs/access-control#serviceusage.serviceUsageConsumer)
permissions
+
+**Additional References**:
+- [BigQuery IAM roles and permissions | Google
Cloud](https://cloud.google.com/bigquery/docs/access-control)
+- [Running jobs programmatically | BigQuery | Google
Cloud](https://cloud.google.com/bigquery/docs/running-jobs)
+- [Create datasets | BigQuery | Google
Cloud](https://cloud.google.com/bigquery/docs/datasets#required_permissions)
+- [Use the BigQuery Storage Read API to read table data | Google
Cloud](https://cloud.google.com/bigquery/docs/reference/storage/#permissions)
diff --git a/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs
b/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs
index 66a8dcd69..17071465b 100644
--- a/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs
+++ b/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs
@@ -71,6 +71,9 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
[JsonPropertyName("allowLargeResults")]
public bool AllowLargeResults { get; set; }
+ [JsonPropertyName("largeResultsDataset")]
+ public string LargeResultsDataset { get; set; } = string.Empty;
+
[JsonPropertyName("largeResultsDestinationTable")]
public string LargeResultsDestinationTable { get; set; } =
string.Empty;
diff --git a/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs
b/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs
index bc0afdcea..e3590212e 100644
--- a/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs
+++ b/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs
@@ -145,6 +145,11 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
parameters.Add(BigQueryParameters.IncludePublicProjectId,
testEnvironment.IncludePublicProjectId.ToString());
+ if (!string.IsNullOrEmpty(testEnvironment.LargeResultsDataset))
+ {
+ parameters.Add(BigQueryParameters.LargeResultsDataset,
testEnvironment.LargeResultsDataset);
+ }
+
if
(!string.IsNullOrEmpty(testEnvironment.LargeResultsDestinationTable))
{
parameters.Add(BigQueryParameters.LargeResultsDestinationTable,
testEnvironment.LargeResultsDestinationTable);