CurtHagenlocher commented on code in PR #3361:
URL: https://github.com/apache/arrow-adbc/pull/3361#discussion_r2319453940
##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -193,25 +193,36 @@ private async Task<QueryResult>
ExecuteQueryInternalAsync()
}
}
- ReadSession rs = new ReadSession { Table = table, DataFormat =
DataFormat.Arrow };
-
- Func<Task<ReadSession>> createReadSession = () =>
clientMgr.ReadClient.CreateReadSessionAsync("projects/" +
results.TableReference.ProjectId, rs, maxStreamCount);
-
- ReadSession rrs = await
ExecuteWithRetriesAsync<ReadSession>(createReadSession, activity);
-
long totalRows = results.TotalRows == null ? -1L :
(long)results.TotalRows.Value;
- var readers = rrs.Streams
- .Select(s => ReadChunkWithRetries(clientMgr,
s.Name, activity))
- .Where(chunk => chunk != null)
- .Cast<IArrowReader>();
+ Func<Task<IEnumerable<IArrowReader>>> func = () =>
GetArrowReaders(clientMgr, table, results.TableReference.ProjectId,
maxStreamCount, activity);
+ IEnumerable<IArrowReader> readers = await
ExecuteWithRetriesAsync<IEnumerable<IArrowReader>>(func, activity);
IArrowArrayStream stream = new MultiArrowReader(this,
TranslateSchema(results.Schema), readers);
activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows,
totalRows);
return new QueryResult(totalRows, stream);
});
}
+ private async Task<IEnumerable<IArrowReader>> GetArrowReaders(
+ TokenProtectedReadClientManger clientMgr,
+ string table,
+ string projectId,
+ int maxStreamCount,
+ Activity? activity)
+ {
+ ReadSession rs = new ReadSession { Table = table, DataFormat =
DataFormat.Arrow };
Review Comment:
I suspect that moving the session creation into the retriable part is
probably what will fix this problem.
##########
csharp/src/Drivers/BigQuery/BigQueryStatement.cs:
##########
@@ -604,12 +604,19 @@ sealed class ReadRowsStream : Stream
public ReadRowsStream(IAsyncEnumerator<ReadRowsResponse> response)
{
- if (response.MoveNextAsync().Result && response.Current !=
null)
+ try
{
- this.currentBuffer =
response.Current.ArrowSchema.SerializedSchema.Memory;
- this.hasRows = true;
+ if (response.MoveNextAsync().Result && response.Current !=
null)
+ {
+ this.currentBuffer =
response.Current.ArrowSchema.SerializedSchema.Memory;
+ this.hasRows = true;
+ }
+ else
+ {
+ this.hasRows = false;
+ }
}
- else
+ catch (InvalidOperationException)
Review Comment:
Given that we're swallowing this exception, should we at least write
something to the logs (e.g. stack trace) to ensure it's not entirely lost?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]