birschick-bq commented on code in PR #1948:
URL: https://github.com/apache/arrow-adbc/pull/1948#discussion_r1684702031
##########
csharp/src/Drivers/Apache/Spark/SparkConnection.cs:
##########
@@ -278,23 +283,53 @@ protected override async ValueTask<TProtocol>
CreateProtocolAsync()
else
token = properties[SparkParameters.Password];
- HttpClient httpClient = new HttpClient();
- httpClient.BaseAddress = new UriBuilder(Uri.UriSchemeHttps,
hostName, -1, path).Uri;
- httpClient.DefaultRequestHeaders.Authorization = new
AuthenticationHeaderValue("Bearer", token);
- httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(UserAgent);
- httpClient.DefaultRequestHeaders.AcceptEncoding.Clear();
- httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new
StringWithQualityHeaderValue("identity"));
- httpClient.DefaultRequestHeaders.ExpectContinue = false;
-
TConfiguration config = new TConfiguration();
-
- ThriftHttpTransport transport = new
ThriftHttpTransport(httpClient, config);
+ _httpClient.BaseAddress = new UriBuilder(Uri.UriSchemeHttps,
hostName, -1, path).Uri;
+ _httpClient.DefaultRequestHeaders.Authorization = new
AuthenticationHeaderValue("Bearer", token);
+
_httpClient.DefaultRequestHeaders.UserAgent.ParseAdd("SimbaSparkJDBCDriver/2.06.15
Python/PyHive");
Review Comment:
I'd update the `UserAgent` value and leave a comment as to why this needs to
be particular constant. Ideally, the ADBC driver agent name could also be added
to your whitelist.
##########
csharp/src/Drivers/Apache/Spark/SparkConnection.cs:
##########
@@ -278,23 +283,53 @@ protected override async ValueTask<TProtocol>
CreateProtocolAsync()
else
token = properties[SparkParameters.Password];
- HttpClient httpClient = new HttpClient();
- httpClient.BaseAddress = new UriBuilder(Uri.UriSchemeHttps,
hostName, -1, path).Uri;
- httpClient.DefaultRequestHeaders.Authorization = new
AuthenticationHeaderValue("Bearer", token);
- httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(UserAgent);
- httpClient.DefaultRequestHeaders.AcceptEncoding.Clear();
- httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new
StringWithQualityHeaderValue("identity"));
- httpClient.DefaultRequestHeaders.ExpectContinue = false;
-
TConfiguration config = new TConfiguration();
-
- ThriftHttpTransport transport = new
ThriftHttpTransport(httpClient, config);
+ _httpClient.BaseAddress = new UriBuilder(Uri.UriSchemeHttps,
hostName, -1, path).Uri;
+ _httpClient.DefaultRequestHeaders.Authorization = new
AuthenticationHeaderValue("Bearer", token);
+
_httpClient.DefaultRequestHeaders.UserAgent.ParseAdd("SimbaSparkJDBCDriver/2.06.15
Python/PyHive");
+ _httpClient.DefaultRequestHeaders.AcceptEncoding.Clear();
+ _httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new
StringWithQualityHeaderValue("identity"));
+ _httpClient.DefaultRequestHeaders.ExpectContinue = false;
+ _httpClient.DefaultRequestHeaders.Add("traceparent", Generate());
+ ThriftHttpTransport transport = new
ThriftHttpTransport(_httpClient, config);
// can switch to the one below if want to use the experimental one
with IPeekableTransport
// ThriftHttpTransport transport = new
ThriftHttpTransport(httpClient, config);
await transport.OpenAsync(CancellationToken.None);
return new TBinaryProtocol(transport);
}
+ private static readonly char[] Characters =
"0123456789abcdef".ToCharArray();
+ private static readonly Random Random = new Random();
+
+ public string Generate()
Review Comment:
Needs a more descriptive name
```suggestion
public string GenerateTraceId()
```
##########
csharp/test/Drivers/Apache/Spark/DriverTests.cs:
##########
@@ -505,14 +510,71 @@ public async Task CanGetTableTypes()
[SkippableFact, Order(10)]
public void CanExecuteQuery()
{
- using AdbcConnection adbcConnection = NewConnection();
+ long count = 0;
+ long target = 80;
+ Queue<String> log = new Queue<string>();
+ string FileName =
$@"c:\temp\hmsmeta\{DateTime.Now.ToString("yyyyMMdd_HHmm")}__{target}.log";
Review Comment:
I would add a `logFolder` property into the configuration file properties.
This is not likely portable or desirable.
##########
csharp/src/Drivers/Apache/Spark/SparkStatement.cs:
##########
@@ -32,6 +32,17 @@ internal SparkStatement(SparkConnection connection)
: base(connection)
{
}
+ public override QueryResult ExecuteQuery()
Review Comment:
nit: empty line
```suggestion
public override QueryResult ExecuteQuery()
```
##########
csharp/test/Drivers/Apache/Spark/DriverTests.cs:
##########
@@ -505,14 +510,71 @@ public async Task CanGetTableTypes()
[SkippableFact, Order(10)]
public void CanExecuteQuery()
{
- using AdbcConnection adbcConnection = NewConnection();
+ long count = 0;
+ long target = 80;
+ Queue<String> log = new Queue<string>();
+ string FileName =
$@"c:\temp\hmsmeta\{DateTime.Now.ToString("yyyyMMdd_HHmm")}__{target}.log";
- using AdbcStatement statement = adbcConnection.CreateStatement();
- statement.SqlQuery = TestConfiguration.Query;
-
- QueryResult queryResult = statement.ExecuteQuery();
+ for (int i = 0; i < target; i++)
+ {
+ ThreadPool.QueueUserWorkItem((_) =>
+ {
+ using AdbcConnection adbcConnection = NewConnection();
+ var sparkConn = adbcConnection as SparkConnection;
+ if(sparkConn == null)
+ {
+ throw new InvalidCastException();
+ }
+
+ for (int i = 0; i < 10000; i++)
+ {
+ var sw = new Stopwatch();
+ sw.Start();
+ bool succeed = false;
+ try
+ {
+ using AdbcStatement statement =
adbcConnection.CreateStatement();
+ statement.SqlQuery = TestConfiguration.Query;
+
+ QueryResult queryResult = statement.ExecuteQuery();
+ Tests.DriverTests.CanExecuteQuery(queryResult,
TestConfiguration.ExpectedResultsCount);
Review Comment:
This could be the lambda fed into the performance logger.
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs:
##########
@@ -106,10 +130,11 @@ public override void SetOption(string key, string value)
}
}
- protected async Task ExecuteStatementAsync()
+ protected async Task<TExecuteStatementResp> ExecuteStatementAsync()
{
TExecuteStatementReq executeRequest = new
TExecuteStatementReq(this.connection.sessionHandle, this.SqlQuery);
SetStatementProperties(executeRequest);
+ executeRequest.GetDirectResults = new
TSparkGetDirectResults(BatchSizeDefault);
Review Comment:
Set it to the "current" value of BatchSize, in case it's been update through
configuration/programmatically.
```suggestion
executeRequest.GetDirectResults = new
TSparkGetDirectResults(BatchSize);
```
##########
csharp/src/Drivers/Apache/Spark/SparkConnection.cs:
##########
@@ -278,23 +283,53 @@ protected override async ValueTask<TProtocol>
CreateProtocolAsync()
else
token = properties[SparkParameters.Password];
- HttpClient httpClient = new HttpClient();
- httpClient.BaseAddress = new UriBuilder(Uri.UriSchemeHttps,
hostName, -1, path).Uri;
- httpClient.DefaultRequestHeaders.Authorization = new
AuthenticationHeaderValue("Bearer", token);
- httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(UserAgent);
- httpClient.DefaultRequestHeaders.AcceptEncoding.Clear();
- httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new
StringWithQualityHeaderValue("identity"));
- httpClient.DefaultRequestHeaders.ExpectContinue = false;
-
TConfiguration config = new TConfiguration();
-
- ThriftHttpTransport transport = new
ThriftHttpTransport(httpClient, config);
+ _httpClient.BaseAddress = new UriBuilder(Uri.UriSchemeHttps,
hostName, -1, path).Uri;
+ _httpClient.DefaultRequestHeaders.Authorization = new
AuthenticationHeaderValue("Bearer", token);
+
_httpClient.DefaultRequestHeaders.UserAgent.ParseAdd("SimbaSparkJDBCDriver/2.06.15
Python/PyHive");
+ _httpClient.DefaultRequestHeaders.AcceptEncoding.Clear();
+ _httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new
StringWithQualityHeaderValue("identity"));
+ _httpClient.DefaultRequestHeaders.ExpectContinue = false;
+ _httpClient.DefaultRequestHeaders.Add("traceparent", Generate());
+ ThriftHttpTransport transport = new
ThriftHttpTransport(_httpClient, config);
// can switch to the one below if want to use the experimental one
with IPeekableTransport
// ThriftHttpTransport transport = new
ThriftHttpTransport(httpClient, config);
await transport.OpenAsync(CancellationToken.None);
return new TBinaryProtocol(transport);
}
+ private static readonly char[] Characters =
"0123456789abcdef".ToCharArray();
+ private static readonly Random Random = new Random();
+
+ public string Generate()
+ {
+ StringBuilder result = new StringBuilder("00-");
+
+ for (int i = 0; i < 32; i++)
+ {
+ result.Append(Characters[Random.Next(Characters.Length)]);
+ }
Review Comment:
This could be extracted to a function with the length as the parameter and
then resused for lines 315-318.
##########
csharp/src/Drivers/Apache/Spark/SparkStatement.cs:
##########
@@ -32,6 +32,17 @@ internal SparkStatement(SparkConnection connection)
: base(connection)
{
}
+ public override QueryResult ExecuteQuery()
Review Comment:
Should this override code be applied to ExecuteQueryAsync, too?
##########
csharp/test/Drivers/Apache/Spark/DriverTests.cs:
##########
@@ -505,14 +510,71 @@ public async Task CanGetTableTypes()
[SkippableFact, Order(10)]
public void CanExecuteQuery()
{
- using AdbcConnection adbcConnection = NewConnection();
+ long count = 0;
+ long target = 80;
+ Queue<String> log = new Queue<string>();
+ string FileName =
$@"c:\temp\hmsmeta\{DateTime.Now.ToString("yyyyMMdd_HHmm")}__{target}.log";
- using AdbcStatement statement = adbcConnection.CreateStatement();
- statement.SqlQuery = TestConfiguration.Query;
-
- QueryResult queryResult = statement.ExecuteQuery();
+ for (int i = 0; i < target; i++)
Review Comment:
This test could be refactored to handle any test "kernel" (lambda) to wrap
the test such that we could capture performance information in a log file. What
do you think?
##########
csharp/src/Drivers/Apache/Spark/SparkStatement.cs:
##########
@@ -32,6 +32,17 @@ internal SparkStatement(SparkConnection connection)
: base(connection)
{
}
+ public override QueryResult ExecuteQuery()
+ {
+ var conn = (connection as SparkConnection);
+
+ if(conn != null)
+ {
+ conn.ResetTraceId();
+ }
Review Comment:
More concise?
```suggestion
if (connection is SparkConnection sparkConnection)
sparkConnection.ResetTraceId();
```
##########
csharp/src/Drivers/Apache/Spark/SparkConnection.cs:
##########
@@ -278,23 +283,53 @@ protected override async ValueTask<TProtocol>
CreateProtocolAsync()
else
token = properties[SparkParameters.Password];
- HttpClient httpClient = new HttpClient();
- httpClient.BaseAddress = new UriBuilder(Uri.UriSchemeHttps,
hostName, -1, path).Uri;
- httpClient.DefaultRequestHeaders.Authorization = new
AuthenticationHeaderValue("Bearer", token);
- httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(UserAgent);
- httpClient.DefaultRequestHeaders.AcceptEncoding.Clear();
- httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new
StringWithQualityHeaderValue("identity"));
- httpClient.DefaultRequestHeaders.ExpectContinue = false;
-
TConfiguration config = new TConfiguration();
-
- ThriftHttpTransport transport = new
ThriftHttpTransport(httpClient, config);
+ _httpClient.BaseAddress = new UriBuilder(Uri.UriSchemeHttps,
hostName, -1, path).Uri;
+ _httpClient.DefaultRequestHeaders.Authorization = new
AuthenticationHeaderValue("Bearer", token);
+
_httpClient.DefaultRequestHeaders.UserAgent.ParseAdd("SimbaSparkJDBCDriver/2.06.15
Python/PyHive");
+ _httpClient.DefaultRequestHeaders.AcceptEncoding.Clear();
+ _httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new
StringWithQualityHeaderValue("identity"));
+ _httpClient.DefaultRequestHeaders.ExpectContinue = false;
+ _httpClient.DefaultRequestHeaders.Add("traceparent", Generate());
+ ThriftHttpTransport transport = new
ThriftHttpTransport(_httpClient, config);
// can switch to the one below if want to use the experimental one
with IPeekableTransport
// ThriftHttpTransport transport = new
ThriftHttpTransport(httpClient, config);
await transport.OpenAsync(CancellationToken.None);
return new TBinaryProtocol(transport);
}
+ private static readonly char[] Characters =
"0123456789abcdef".ToCharArray();
Review Comment:
```suggestion
private static readonly char[] HexCharacters =
"0123456789abcdef".ToCharArray();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]