CurtHagenlocher commented on code in PR #44783: URL: https://github.com/apache/arrow/pull/44783#discussion_r1947995089
########## csharp/src/Apache.Arrow.Flight.Sql/TableRef.cs: ########## @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Apache.Arrow.Flight.Sql; + +public class TableRef +{ + public string? Catalog { get; set; } + public string DbSchema { get; set; } = null!; Review Comment: The `null!` seems dodgy. As these are required to be set, consider using a pair of constructors `TableRef(string, string)` and `TableRef(string, string, string)` instead. ########## csharp/src/Apache.Arrow.Flight.Sql/SchemaExtensions.cs: ########## @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using Apache.Arrow.Ipc; + +namespace Apache.Arrow.Flight.Sql; + +public static class SchemaExtensions +{ + /// <summary> + /// Deserializes a schema from a byte array. + /// </summary> + /// <param name="serializedSchema">The byte array representing the serialized schema.</param> + /// <returns>The deserialized Schema object.</returns> + public static Schema DeserializeSchema(byte[] serializedSchema) + { + if (serializedSchema == null || serializedSchema.Length == 0) + { + throw new ArgumentException("Invalid serialized schema"); + } + + using var stream = new MemoryStream(serializedSchema); + var reader = new ArrowStreamReader(stream); Review Comment: There's a ctor for `ArrowStreamReader` which takes a `ReadOnlyMemory<byte>` as an argument, so you can just pass in the bytes. This is probably a bit more efficient. ########## csharp/src/Apache.Arrow.Flight.Sql/DoPutResult.cs: ########## @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Threading.Tasks; +using Apache.Arrow.Flight.Client; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Sql; + +public class DoPutResult +{ + public FlightClientRecordBatchStreamWriter Writer { get; } + public IAsyncStreamReader<FlightPutResult> Reader { get; } + + public DoPutResult(FlightClientRecordBatchStreamWriter writer, IAsyncStreamReader<FlightPutResult> reader) + { + Writer = writer; + Reader = reader; + } + + /// <summary> + /// Reads the metadata asynchronously from the reader. + /// </summary> + /// <returns>A ByteString containing the metadata read from the reader.</returns> + public async Task<Google.Protobuf.ByteString> ReadMetadataAsync() Review Comment: In general, none of the `async` methods in this change take a `CancellationToken`. Shouldn't they? ########## csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlClientTests.cs: ########## @@ -0,0 +1,855 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Apache.Arrow.Flight.Client; +using Apache.Arrow.Flight.Sql.Client; +using Apache.Arrow.Flight.TestWeb; +using Apache.Arrow.Types; +using Arrow.Flight.Protocol.Sql; +using Google.Protobuf; +using Grpc.Core.Utils; +using Xunit; + +namespace Apache.Arrow.Flight.Sql.Tests; + +public class FlightSqlClientTests : IDisposable +{ + readonly TestFlightSqlWebFactory _testWebFactory; + readonly FlightStore _flightStore; + private readonly FlightSqlClient _flightSqlClient; + private readonly FlightSqlTestUtils _testUtils; + + public FlightSqlClientTests() + { + _flightStore = new FlightStore(); + _testWebFactory = new TestFlightSqlWebFactory(_flightStore); + FlightClient flightClient = new(_testWebFactory.GetChannel()); + _flightSqlClient = new FlightSqlClient(flightClient); + + _testUtils = new FlightSqlTestUtils(_testWebFactory, _flightStore); + } + + #region Transactions + + [Fact] + public async Task CommitTransactionAsync() + { + // Arrange + string transactionId = "sample-transaction-id"; + var transaction = new Transaction(transactionId); + + // Act + var streamCall = _flightSqlClient.CommitAsync(transaction); + var result = await streamCall.ResponseStream.ToListAsync(); + + // Assert + Assert.NotNull(result); + Assert.Equal(transaction.TransactionId, result.FirstOrDefault()?.Body); + } + + [Fact] + public async Task BeginTransactionAsync() + { + // Arrange + string expectedTransactionId = "sample-transaction-id"; + + // Act + var transaction = await _flightSqlClient.BeginTransactionAsync(); + + // Assert + Assert.NotNull(transaction); + Assert.Equal(ByteString.CopyFromUtf8(expectedTransactionId), transaction.TransactionId); + } + + [Fact] + public async Task RollbackTransactionAsync() + { + // Arrange + string transactionId = "sample-transaction-id"; + var transaction = new Transaction(transactionId); + + // Act + var streamCall = _flightSqlClient.RollbackAsync(transaction); + var result = await streamCall.ResponseStream.ToListAsync(); + + // Assert + Assert.NotNull(transaction); + Assert.Equal(result.FirstOrDefault()?.Body, transaction.TransactionId); + } + + #endregion + + #region PreparedStatement + + [Fact] + public async Task PreparedAsync() + { + // Arrange + string query = "INSERT INTO users (id, name) VALUES (1, 'John Doe')"; + var transaction = new Transaction("sample-transaction-id"); + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + + // Create a sample schema for the dataset and parameters + var schema = new Schema.Builder() + .Field(f => f.Name("id").DataType(Int32Type.Default)) + .Field(f => f.Name("name").DataType(StringType.Default)) + .Build(); + + var recordBatch = new RecordBatch(schema, new Array[] + { + new Int32Array.Builder().Append(1).Build(), + new StringArray.Builder().Append("John Doe").Build() + }, 1); + + var flightHolder = new FlightHolder(flightDescriptor, schema, _testWebFactory.GetAddress()); + flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch)); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + var datasetSchemaBytes = SchemaExtensions.SerializeSchema(schema); + var parameterSchemaBytes = SchemaExtensions.SerializeSchema(schema); + + var preparedStatementResponse = new ActionCreatePreparedStatementResult + { + PreparedStatementHandle = ByteString.CopyFromUtf8("prepared-handle"), + DatasetSchema = ByteString.CopyFrom(datasetSchemaBytes), + ParameterSchema = ByteString.CopyFrom(parameterSchemaBytes) + }; + + // Act + var preparedStatement = await _flightSqlClient.PrepareAsync(query, transaction); + var deserializedDatasetSchema = SchemaExtensions.DeserializeSchema(preparedStatementResponse.DatasetSchema.ToByteArray()); + var deserializedParameterSchema = SchemaExtensions.DeserializeSchema(preparedStatementResponse.ParameterSchema.ToByteArray()); + + // Assert + Assert.NotNull(preparedStatement); + Assert.NotNull(deserializedDatasetSchema); + Assert.NotNull(deserializedParameterSchema); + CompareSchemas(schema, deserializedDatasetSchema); + CompareSchemas(schema, deserializedParameterSchema); + } + + #endregion + + [Fact] + public async Task ExecuteUpdateAsync() + { + // Arrange + string query = "UPDATE test_table SET column1 = 'value' WHERE column2 = 'condition'"; + var transaction = new Transaction("sample-transaction-id"); + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + + var schema = new Schema.Builder() + .Field(f => f.Name("id").DataType(Int32Type.Default)) + .Field(f => f.Name("name").DataType(StringType.Default)) + .Build(); + + var recordBatch = new RecordBatch(schema, new Array[] + { + new Int32Array.Builder().Append(1).Build(), + new StringArray.Builder().Append("John Doe").Build() + }, 1); + + + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, _testWebFactory.GetAddress()); + flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch)); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + // Act + long affectedRows = await _flightSqlClient.ExecuteUpdateAsync(query, transaction); + + // Assert + Assert.Equal(1, affectedRows); + } + + [Fact] + public async Task ExecuteAsync() + { + // Arrange + string query = "SELECT * FROM test_table"; + var transaction = new Transaction("sample-transaction-id"); + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + var recordBatch = _testUtils.CreateTestBatch(0, 100); + + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, _testWebFactory.GetAddress()); + flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch)); + + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + // Act + var flightInfo = await _flightSqlClient.ExecuteAsync(query, transaction); + + // Assert + Assert.NotNull(flightInfo); + Assert.Single(flightInfo.Endpoints); + } + + [Fact] + public async Task ExecuteAsync_ShouldReturnFlightInfo_WhenValidInputsAreProvided() + { + // Arrange + string query = "SELECT * FROM test_table"; + var transaction = new Transaction("sample-transaction-id"); + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + var recordBatch = _testUtils.CreateTestBatch(0, 100); + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, + _testWebFactory.GetAddress()); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + // Act + var flightInfo = await _flightSqlClient.ExecuteAsync(query, transaction); + + // Assert + Assert.NotNull(flightInfo); + Assert.IsType<FlightInfo>(flightInfo); + } + + [Fact] + public async Task ExecuteAsync_ShouldThrowArgumentException_WhenQueryIsEmpty() + { + // Arrange + string emptyQuery = string.Empty; + var transaction = new Transaction("sample-transaction-id"); + + // Act & Assert + await Assert.ThrowsAsync<ArgumentException>(async () => + await _flightSqlClient.ExecuteAsync(emptyQuery, transaction)); + } + + [Fact] + public async Task ExecuteAsync_ShouldReturnFlightInfo_WhenTransactionIsNoTransaction() + { + // Arrange + string query = "SELECT * FROM test_table"; + var transaction = Transaction.NoTransaction; + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + var recordBatch = _testUtils.CreateTestBatch(0, 100); + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, + _testWebFactory.GetAddress()); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + // Act + var flightInfo = await _flightSqlClient.ExecuteAsync(query, transaction); + + // Assert + Assert.NotNull(flightInfo); + Assert.IsType<FlightInfo>(flightInfo); + } + + + [Fact] + public async Task GetFlightInfoAsync() + { + // Arrange + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + var recordBatch = _testUtils.CreateTestBatch(0, 100); + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, + _testWebFactory.GetAddress()); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + // Act + var flightInfo = await _flightSqlClient.GetFlightInfoAsync(flightDescriptor); + + // Assert + Assert.NotNull(flightInfo); + } + + [Fact] + public async Task GetExecuteSchemaAsync() + { + // Arrange + string query = "SELECT * FROM test_table"; + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + var recordBatch = _testUtils.CreateTestBatch(0, 100); + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, + _testWebFactory.GetAddress()); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + // Act + Schema resultSchema = + await _flightSqlClient.GetExecuteSchemaAsync(query, new Transaction("sample-transaction-id")); + + // Assert + Assert.NotNull(resultSchema); + Assert.Equal(recordBatch.Schema.FieldsList.Count, resultSchema.FieldsList.Count); + CompareSchemas(resultSchema, recordBatch.Schema); + } + + [Fact] + public async Task GetCatalogsAsync() + { + // Arrange + var options = new FlightCallOptions(); + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + var recordBatch = _testUtils.CreateTestBatch(0, 100); + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, + _testWebFactory.GetAddress()); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + // Act + var result = await _flightSqlClient.GetCatalogsAsync(options); + + // Assert + Assert.NotNull(result); + Assert.Equal(flightHolder.GetFlightInfo().Endpoints.Count, result.Endpoints.Count); + Assert.Equal(flightDescriptor, result.Descriptor); + } + + [Fact] + public async Task GetSchemaAsync() + { + // Arrange + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + var recordBatch = _testUtils.CreateTestBatch(0, 100); + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, + _testWebFactory.GetAddress()); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + // Act + var result = await _flightSqlClient.GetSchemaAsync(flightDescriptor); + + // Assert + Assert.NotNull(result); + Assert.Equal(recordBatch.Schema.FieldsList.Count, result.FieldsList.Count); + CompareSchemas(result, recordBatch.Schema); + } + + [Fact] + public async Task GetDbSchemasAsync() + { + // Arrange + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + var recordBatch = _testUtils.CreateTestBatch(0, 100); + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, + _testWebFactory.GetAddress()); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + string catalog = "test-catalog"; + string dbSchemaFilterPattern = "test-schema-pattern"; + + // Act + var result = await _flightSqlClient.GetDbSchemasAsync(catalog, dbSchemaFilterPattern); + + // Assert + Assert.NotNull(result); + var expectedFlightInfo = flightHolder.GetFlightInfo(); + Assert.Equal(recordBatch.Schema.FieldsList.Count, result.Schema.FieldsList.Count); + Assert.Equal(expectedFlightInfo.Descriptor.Command, result.Descriptor.Command); + Assert.Equal(expectedFlightInfo.Descriptor.Type, result.Descriptor.Type); + Assert.Equal(expectedFlightInfo.Schema.FieldsList.Count, result.Schema.FieldsList.Count); + Assert.Equal(expectedFlightInfo.Endpoints.Count, result.Endpoints.Count); + + for (int i = 0; i < expectedFlightInfo.Schema.FieldsList.Count; i++) + { + var expectedField = expectedFlightInfo.Schema.FieldsList[i]; + var actualField = result.Schema.FieldsList[i]; + + Assert.Equal(expectedField.Name, actualField.Name); + Assert.Equal(expectedField.DataType, actualField.DataType); + Assert.Equal(expectedField.IsNullable, actualField.IsNullable); + Assert.Equal(expectedField.Metadata?.Count ?? 0, actualField.Metadata?.Count ?? 0); + } + + for (int i = 0; i < expectedFlightInfo.Endpoints.Count; i++) + { + var expectedEndpoint = expectedFlightInfo.Endpoints[i]; + var actualEndpoint = result.Endpoints[i]; + + Assert.Equal(expectedEndpoint.Ticket, actualEndpoint.Ticket); + Assert.Equal(expectedEndpoint.Locations.Count(), actualEndpoint.Locations.Count()); + } + } + + [Fact] + public async Task GetPrimaryKeysAsync() + { + // Arrange + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + var recordBatch = _testUtils.CreateTestBatch(0, 100); + var tableRef = new TableRef { Catalog = "test-catalog", Table = "test-table", DbSchema = "test-schema" }; + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, + _testWebFactory.GetAddress()); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + // Act + var result = await _flightSqlClient.GetPrimaryKeysAsync(tableRef); + + // Assert + Assert.NotNull(result); + var expectedFlightInfo = flightHolder.GetFlightInfo(); + Assert.Equal(expectedFlightInfo.Descriptor.Command, result.Descriptor.Command); + Assert.Equal(expectedFlightInfo.Descriptor.Type, result.Descriptor.Type); + Assert.Equal(expectedFlightInfo.Schema.FieldsList.Count, result.Schema.FieldsList.Count); + + for (int i = 0; i < expectedFlightInfo.Schema.FieldsList.Count; i++) + { + var expectedField = expectedFlightInfo.Schema.FieldsList[i]; + var actualField = result.Schema.FieldsList[i]; + Assert.Equal(expectedField.Name, actualField.Name); + Assert.Equal(expectedField.DataType, actualField.DataType); + Assert.Equal(expectedField.IsNullable, actualField.IsNullable); + Assert.Equal(expectedField.Metadata?.Count ?? 0, actualField.Metadata?.Count ?? 0); + } + + Assert.Equal(expectedFlightInfo.Endpoints.Count, result.Endpoints.Count); + + for (int i = 0; i < expectedFlightInfo.Endpoints.Count; i++) + { + var expectedEndpoint = expectedFlightInfo.Endpoints[i]; + var actualEndpoint = result.Endpoints[i]; + + Assert.Equal(expectedEndpoint.Ticket, actualEndpoint.Ticket); + Assert.Equal(expectedEndpoint.Locations.Count(), actualEndpoint.Locations.Count()); + } + } + + [Fact] + public async Task GetTablesAsync() + { + // Arrange + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + var recordBatch = _testUtils.CreateTestBatch(0, 100); + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, + _testWebFactory.GetAddress()); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + string catalog = "sample_catalog"; + string dbSchemaFilterPattern = "sample_schema"; + string tableFilterPattern = "sample_table"; + bool includeSchema = true; + var tableTypes = new List<string> { "BASE TABLE" }; + + // Act + var result = await _flightSqlClient.GetTablesAsync(catalog, dbSchemaFilterPattern, tableFilterPattern, + includeSchema, tableTypes); + + // Assert + Assert.NotNull(result); + Assert.Single(result); + + var expectedFlightInfo = flightHolder.GetFlightInfo(); + var flightInfo = result.First(); + Assert.Equal(expectedFlightInfo.Descriptor.Command, flightInfo.Descriptor.Command); + Assert.Equal(expectedFlightInfo.Descriptor.Type, flightInfo.Descriptor.Type); + Assert.Equal(expectedFlightInfo.Schema.FieldsList.Count, flightInfo.Schema.FieldsList.Count); + for (int i = 0; i < expectedFlightInfo.Schema.FieldsList.Count; i++) + { + var expectedField = expectedFlightInfo.Schema.FieldsList[i]; + var actualField = flightInfo.Schema.FieldsList[i]; + Assert.Equal(expectedField.Name, actualField.Name); + Assert.Equal(expectedField.DataType, actualField.DataType); + Assert.Equal(expectedField.IsNullable, actualField.IsNullable); + } + + Assert.Equal(expectedFlightInfo.Endpoints.Count, flightInfo.Endpoints.Count); + } + + Review Comment: nit: extra blank line ########## csharp/src/Apache.Arrow.Flight.Sql/Transaction.cs: ########## @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Protobuf; + +namespace Apache.Arrow.Flight.Sql; + +public class Transaction Review Comment: Should this be a value type? ########## csharp/examples/Examples.sln: ########## @@ -7,6 +7,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FluentBuilderExample", "Flu EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow", "..\src\Apache.Arrow\Apache.Arrow.csproj", "{1FE1DE95-FF6E-4895-82E7-909713C53524}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FlightClientExample", "FlightClientExample\FlightClientExample.csproj", "{CBB46C39-530D-465A-9367-4E771595209A}" +EndProject Review Comment: FlightClientExample was added to Examples.sln but its files are not part of this pull request. ########## csharp/src/Apache.Arrow.Flight/FlightInfoCancelRequest.cs: ########## @@ -0,0 +1,38 @@ +using System; Review Comment: Added files must have the Apache header. It's missing from this file as well as FlightInfoCancelResult.cs. ########## csharp/src/Apache.Arrow.Flight.Sql/Transaction.cs: ########## @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Protobuf; + +namespace Apache.Arrow.Flight.Sql; + +public class Transaction +{ + private static readonly ByteString TransactionIdDefaultValue = ByteString.Empty; + private ByteString? _transactionId; + + public ByteString TransactionId + { + get => _transactionId ?? TransactionIdDefaultValue; + set => _transactionId = ProtoPreconditions.CheckNotNull(value, nameof(value)); + } + + public static readonly Transaction NoTransaction = new(TransactionIdDefaultValue); + + public Transaction(ByteString transactionId) + { + TransactionId = transactionId; + } + + public Transaction(string transactionId) + { + _transactionId = ByteString.CopyFromUtf8(transactionId); + } + + public bool IsValid() => TransactionId.Length > 0; + public void ResetTransaction() Review Comment: `ResetTransaction` doesnt appear to be used. ########## csharp/src/Apache.Arrow.Flight.Sql/Transaction.cs: ########## @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Protobuf; + +namespace Apache.Arrow.Flight.Sql; + +public class Transaction +{ + private static readonly ByteString TransactionIdDefaultValue = ByteString.Empty; + private ByteString? _transactionId; + + public ByteString TransactionId + { + get => _transactionId ?? TransactionIdDefaultValue; + set => _transactionId = ProtoPreconditions.CheckNotNull(value, nameof(value)); + } + + public static readonly Transaction NoTransaction = new(TransactionIdDefaultValue); + + public Transaction(ByteString transactionId) + { + TransactionId = transactionId; + } + + public Transaction(string transactionId) + { + _transactionId = ByteString.CopyFromUtf8(transactionId); + } + + public bool IsValid() => TransactionId.Length > 0; Review Comment: Should this be a property? ########## csharp/src/Apache.Arrow.Flight.Sql/RecordBatchExtensions.cs: ########## @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Apache.Arrow.Ipc; +using Google.Protobuf; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Sql; + +public static class RecordBatchExtensions +{ + /// <summary> + /// Converts a RecordBatch into an asynchronous stream of FlightData. + /// </summary> + /// <param name="recordBatch">The RecordBatch to convert.</param> + /// <param name="flightDescriptor">The FlightDescriptor describing the Flight data.</param> + /// <returns>An asynchronous stream of FlightData objects.</returns> + public static async IAsyncEnumerable<FlightData> ToFlightDataStreamAsync(this RecordBatch recordBatch, + FlightDescriptor flightDescriptor) + { + if (recordBatch == null) + { + throw new ArgumentNullException(nameof(recordBatch)); + } + + // Use a memory stream to write the Arrow RecordBatch into FlightData format + using var memoryStream = new MemoryStream(); + var writer = new ArrowStreamWriter(memoryStream, recordBatch.Schema); + + // Write the RecordBatch to the stream + await writer.WriteRecordBatchAsync(recordBatch).ConfigureAwait(false); + await writer.WriteEndAsync().ConfigureAwait(false); + + // Reset the memory stream position + memoryStream.Position = 0; + + // Read back the data to create FlightData + var flightData = new FlightData(flightDescriptor, ByteString.CopyFrom(memoryStream.ToArray()), Review Comment: `ByteString.CopyFrom(memoryStream.ToArray())` makes an extra copy of the data. This can be avoided by doing `ByteString.CopyFrom(memoryStream.GetBuffer(), 0, (int)memoryStream.Length)`. ########## csharp/src/Apache.Arrow.Flight.Sql/PreparedStatement.cs: ########## @@ -0,0 +1,383 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Apache.Arrow.Flight.Sql.Client; +using Arrow.Flight.Protocol.Sql; +using Google.Protobuf; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Sql; + +public class PreparedStatement : IDisposable +{ + private readonly FlightSqlClient _client; + private readonly string _handle; + private Schema _datasetSchema; + private Schema _parameterSchema; + private RecordBatch? _recordsBatch; + private bool _isClosed; + public bool IsClosed => _isClosed; + public string Handle => _handle; + public RecordBatch? ParametersBatch => _recordsBatch; + + /// <summary> + /// Initializes a new instance of the <see cref="PreparedStatement"/> class. + /// </summary> + /// <param name="client">The Flight SQL client used for executing SQL operations.</param> + /// <param name="handle">The handle representing the prepared statement.</param> + /// <param name="datasetSchema">The schema of the result dataset.</param> + /// <param name="parameterSchema">The schema of the parameters for this prepared statement.</param> + public PreparedStatement(FlightSqlClient client, string handle, Schema datasetSchema, Schema parameterSchema) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + _handle = handle ?? throw new ArgumentNullException(nameof(handle)); + _datasetSchema = datasetSchema ?? throw new ArgumentNullException(nameof(datasetSchema)); + _parameterSchema = parameterSchema ?? throw new ArgumentNullException(nameof(parameterSchema)); + _isClosed = false; + } + + /// <summary> + /// Retrieves the schema associated with the prepared statement asynchronously. + /// </summary> + /// <param name="options">The options used to configure the Flight call.</param> + /// <returns>A task representing the asynchronous operation, which returns the schema of the result set.</returns> + /// <exception cref="InvalidOperationException">Thrown when the schema is empty or invalid.</exception> + public async Task<Schema> GetSchemaAsync(FlightCallOptions? options = default) + { + EnsureStatementIsNotClosed(); + + try + { + var command = new CommandPreparedStatementQuery + { + PreparedStatementHandle = ByteString.CopyFrom(_handle, Encoding.UTF8) + }; + var descriptor = FlightDescriptor.CreateCommandDescriptor(command.PackAndSerialize()); + var schema = await _client.GetSchemaAsync(descriptor, options).ConfigureAwait(false); + if (schema == null || !schema.FieldsList.Any()) + { + throw new InvalidOperationException("Schema is empty or invalid."); + } + return schema; + } + catch (RpcException ex) + { + throw new InvalidOperationException("Failed to retrieve the schema for the prepared statement", ex); + } + } + + Review Comment: nit: extra blank line ########## csharp/src/Apache.Arrow.Flight.Sql/RecordBatchExtensions.cs: ########## @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Apache.Arrow.Ipc; +using Google.Protobuf; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Sql; + +public static class RecordBatchExtensions +{ + /// <summary> + /// Converts a RecordBatch into an asynchronous stream of FlightData. + /// </summary> + /// <param name="recordBatch">The RecordBatch to convert.</param> + /// <param name="flightDescriptor">The FlightDescriptor describing the Flight data.</param> + /// <returns>An asynchronous stream of FlightData objects.</returns> + public static async IAsyncEnumerable<FlightData> ToFlightDataStreamAsync(this RecordBatch recordBatch, + FlightDescriptor flightDescriptor) + { + if (recordBatch == null) + { + throw new ArgumentNullException(nameof(recordBatch)); + } + + // Use a memory stream to write the Arrow RecordBatch into FlightData format + using var memoryStream = new MemoryStream(); + var writer = new ArrowStreamWriter(memoryStream, recordBatch.Schema); + + // Write the RecordBatch to the stream + await writer.WriteRecordBatchAsync(recordBatch).ConfigureAwait(false); + await writer.WriteEndAsync().ConfigureAwait(false); + + // Reset the memory stream position + memoryStream.Position = 0; + + // Read back the data to create FlightData + var flightData = new FlightData(flightDescriptor, ByteString.CopyFrom(memoryStream.ToArray()), + ByteString.CopyFrom(memoryStream.ToArray())); Review Comment: Is it correct for the `dataBody` and `dataHeader` arguments to have the same value? The sync version passes `ByteString.Empty` as the `dataHeader` parameter. ########## csharp/src/Apache.Arrow.Flight.Sql/FlightCallOptions.cs: ########## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Buffers; +using System.Threading; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Sql; + +public class FlightCallOptions +{ + public FlightCallOptions() + { + Timeout = TimeSpan.FromSeconds(-1); + } + // Implement any necessary options for RPC calls + public Metadata Headers { get; set; } = new(); + + /// <summary> + /// Gets or sets a token to enable interactive user cancellation of long-running requests. + /// </summary> + public CancellationToken StopToken { get; set; } + + /// <summary> + /// Gets or sets the optional timeout for this call. + /// Negative durations mean an implementation-defined default behavior will be used instead. + /// </summary> + public TimeSpan Timeout { get; set; } + + /// <summary> + /// Gets or sets an optional memory manager to control where to allocate incoming data. + /// </summary> + public MemoryManager<ArrowBuffer>? MemoryManager { get; set; } Review Comment: Nothing references `MemoryManager`. To the extent that these are aspirational, it would be better to remove them now and add them back in a separate PR once implemented instead of putting them in unused and giving the impression that they might do something. ########## csharp/src/Apache.Arrow.Flight.Sql/PreparedStatement.cs: ########## @@ -0,0 +1,383 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Apache.Arrow.Flight.Sql.Client; +using Arrow.Flight.Protocol.Sql; +using Google.Protobuf; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Sql; + +public class PreparedStatement : IDisposable +{ + private readonly FlightSqlClient _client; + private readonly string _handle; + private Schema _datasetSchema; + private Schema _parameterSchema; Review Comment: Neither of these schemas appear to be used. ########## csharp/src/Apache.Arrow.Flight.Sql/RecordBatchExtensions.cs: ########## @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Apache.Arrow.Ipc; +using Google.Protobuf; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Sql; + +public static class RecordBatchExtensions +{ + /// <summary> + /// Converts a RecordBatch into an asynchronous stream of FlightData. + /// </summary> + /// <param name="recordBatch">The RecordBatch to convert.</param> + /// <param name="flightDescriptor">The FlightDescriptor describing the Flight data.</param> + /// <returns>An asynchronous stream of FlightData objects.</returns> + public static async IAsyncEnumerable<FlightData> ToFlightDataStreamAsync(this RecordBatch recordBatch, + FlightDescriptor flightDescriptor) + { + if (recordBatch == null) + { + throw new ArgumentNullException(nameof(recordBatch)); + } + + // Use a memory stream to write the Arrow RecordBatch into FlightData format + using var memoryStream = new MemoryStream(); + var writer = new ArrowStreamWriter(memoryStream, recordBatch.Schema); + + // Write the RecordBatch to the stream + await writer.WriteRecordBatchAsync(recordBatch).ConfigureAwait(false); + await writer.WriteEndAsync().ConfigureAwait(false); + + // Reset the memory stream position + memoryStream.Position = 0; + + // Read back the data to create FlightData + var flightData = new FlightData(flightDescriptor, ByteString.CopyFrom(memoryStream.ToArray()), + ByteString.CopyFrom(memoryStream.ToArray())); + yield return flightData; + } + + /// <summary> + /// Converts a RecordBatch into an IAsyncStreamReader<FlightData>. + /// </summary> + /// <param name="recordBatch">The RecordBatch to convert.</param> + /// <param name="flightDescriptor">The FlightDescriptor describing the Flight data.</param> + /// <returns>An IAsyncStreamReader of FlightData.</returns> + public static IAsyncStreamReader<FlightData> ToFlightDataStream(this RecordBatch recordBatch, FlightDescriptor flightDescriptor) + { + if (recordBatch == null) throw new ArgumentNullException(nameof(recordBatch)); + if (flightDescriptor == null) throw new ArgumentNullException(nameof(flightDescriptor)); + + var channel = Channel.CreateUnbounded<FlightData>(); + + try + { + if (recordBatch.Schema == null || !recordBatch.Schema.FieldsList.Any()) + { + throw new InvalidOperationException("The record batch has an invalid or empty schema."); + } + + using var memoryStream = new MemoryStream(); + using var writer = new ArrowStreamWriter(memoryStream, recordBatch.Schema); + writer.WriteRecordBatch(recordBatch); + writer.WriteEnd(); + memoryStream.Position = 0; + var flightData = new FlightData(flightDescriptor, ByteString.CopyFrom(memoryStream.ToArray()), ByteString.Empty, ByteString.Empty); + if (flightData.DataBody.IsEmpty) + { + throw new InvalidOperationException( + "The generated FlightData is empty. Check the RecordBatch content."); + } + + channel.Writer.TryWrite(flightData); Review Comment: What would it mean if this returned `false`? ########## csharp/src/Apache.Arrow.Flight.Sql/RecordBatchExtensions.cs: ########## @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Apache.Arrow.Ipc; +using Google.Protobuf; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Sql; + +public static class RecordBatchExtensions +{ + /// <summary> + /// Converts a RecordBatch into an asynchronous stream of FlightData. + /// </summary> + /// <param name="recordBatch">The RecordBatch to convert.</param> + /// <param name="flightDescriptor">The FlightDescriptor describing the Flight data.</param> + /// <returns>An asynchronous stream of FlightData objects.</returns> + public static async IAsyncEnumerable<FlightData> ToFlightDataStreamAsync(this RecordBatch recordBatch, + FlightDescriptor flightDescriptor) + { + if (recordBatch == null) + { + throw new ArgumentNullException(nameof(recordBatch)); + } + + // Use a memory stream to write the Arrow RecordBatch into FlightData format + using var memoryStream = new MemoryStream(); + var writer = new ArrowStreamWriter(memoryStream, recordBatch.Schema); + + // Write the RecordBatch to the stream + await writer.WriteRecordBatchAsync(recordBatch).ConfigureAwait(false); + await writer.WriteEndAsync().ConfigureAwait(false); + + // Reset the memory stream position + memoryStream.Position = 0; + + // Read back the data to create FlightData + var flightData = new FlightData(flightDescriptor, ByteString.CopyFrom(memoryStream.ToArray()), + ByteString.CopyFrom(memoryStream.ToArray())); + yield return flightData; + } + + /// <summary> + /// Converts a RecordBatch into an IAsyncStreamReader<FlightData>. + /// </summary> + /// <param name="recordBatch">The RecordBatch to convert.</param> + /// <param name="flightDescriptor">The FlightDescriptor describing the Flight data.</param> + /// <returns>An IAsyncStreamReader of FlightData.</returns> + public static IAsyncStreamReader<FlightData> ToFlightDataStream(this RecordBatch recordBatch, FlightDescriptor flightDescriptor) + { + if (recordBatch == null) throw new ArgumentNullException(nameof(recordBatch)); + if (flightDescriptor == null) throw new ArgumentNullException(nameof(flightDescriptor)); + + var channel = Channel.CreateUnbounded<FlightData>(); + + try + { + if (recordBatch.Schema == null || !recordBatch.Schema.FieldsList.Any()) + { + throw new InvalidOperationException("The record batch has an invalid or empty schema."); + } + + using var memoryStream = new MemoryStream(); + using var writer = new ArrowStreamWriter(memoryStream, recordBatch.Schema); + writer.WriteRecordBatch(recordBatch); + writer.WriteEnd(); + memoryStream.Position = 0; + var flightData = new FlightData(flightDescriptor, ByteString.CopyFrom(memoryStream.ToArray()), ByteString.Empty, ByteString.Empty); + if (flightData.DataBody.IsEmpty) Review Comment: In addition to the same comment about `ByteString.CopyFrom`, it should be possible to check for an empty body before creating the `FlightData` by looking at `memoryStream.Length`. Does this length check belong in the async version too? ########## csharp/src/Apache.Arrow.Flight.Sql/FlightExtensions.cs: ########## @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; + +namespace Apache.Arrow.Flight.Sql; + +internal static class FlightExtensions +{ + public static byte[] PackAndSerialize(this IMessage command) => Any.Pack(command).ToByteArray(); + public static T ParseAndUnpack<T>(this ByteString source) where T : IMessage<T>, new() => Any.Parser.ParseFrom(source).Unpack<T>(); + + public static IEnumerable<long> ExtractRowCount(this RecordBatch batch) + { + foreach (var array in batch.Arrays) + { + var values = ExtractValues(array); + foreach (var value in values) + { + yield return value switch + { + long l => l, + int i => i != 0 ? i : 0, Review Comment: Isn't this just `int i => i`? ########## csharp/src/Apache.Arrow.Flight.Sql/FlightCallOptions.cs: ########## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Buffers; +using System.Threading; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Sql; + +public class FlightCallOptions +{ + public FlightCallOptions() + { + Timeout = TimeSpan.FromSeconds(-1); + } + // Implement any necessary options for RPC calls + public Metadata Headers { get; set; } = new(); + + /// <summary> + /// Gets or sets a token to enable interactive user cancellation of long-running requests. + /// </summary> + public CancellationToken StopToken { get; set; } Review Comment: Nothing references `StopToken` ########## csharp/src/Apache.Arrow.Flight.Sql/FlightCallOptions.cs: ########## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Buffers; +using System.Threading; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Sql; + +public class FlightCallOptions +{ + public FlightCallOptions() + { + Timeout = TimeSpan.FromSeconds(-1); + } Review Comment: nit; insert blank line after closing bracket ########## csharp/src/Apache.Arrow.Flight.Sql/FlightExtensions.cs: ########## @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; + +namespace Apache.Arrow.Flight.Sql; + +internal static class FlightExtensions +{ + public static byte[] PackAndSerialize(this IMessage command) => Any.Pack(command).ToByteArray(); + public static T ParseAndUnpack<T>(this ByteString source) where T : IMessage<T>, new() => Any.Parser.ParseFrom(source).Unpack<T>(); + + public static IEnumerable<long> ExtractRowCount(this RecordBatch batch) Review Comment: I have to admit, I can't tell what the semantics of this are supposed to be :(. I did a quick search to see what the result of `ExecuteUpdate` was supposed to look like but that didn't help. Putting the explanation in a comment might be worthwhile. ########## csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlPreparedStatementTests.cs: ########## @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Apache.Arrow.Flight.Client; +using Apache.Arrow.Flight.Sql.Client; +using Apache.Arrow.Flight.TestWeb; +using Apache.Arrow.Types; +using Arrow.Flight.Protocol.Sql; +using Google.Protobuf; +using Grpc.Core; +using Xunit; + +namespace Apache.Arrow.Flight.Sql.Tests +{ + public class FlightSqlPreparedStatementTests + { + readonly TestFlightSqlWebFactory _testWebFactory; + readonly FlightStore _flightStore; + readonly FlightSqlClient _flightSqlClient; + private readonly PreparedStatement _preparedStatement; + private readonly Schema _schema; + private readonly FlightDescriptor _flightDescriptor; + private readonly RecordBatch _parameterBatch; + + public FlightSqlPreparedStatementTests() + { + _flightStore = new FlightStore(); + _testWebFactory = new TestFlightSqlWebFactory(_flightStore); + _flightSqlClient = new FlightSqlClient(new FlightClient(_testWebFactory.GetChannel())); + + _flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test-query"); + _schema = CreateSchema(); + _parameterBatch = CreateParameterBatch(); + _preparedStatement = new PreparedStatement(_flightSqlClient, "test-handle-guid", _schema, _schema); + } + + private static Schema CreateSchema() + { + return new Schema.Builder() + .Field(f => f.Name("DATA_TYPE_ID").DataType(Int32Type.Default).Nullable(false)) + .Build(); + } + + private RecordBatch CreateParameterBatch() + { + return new RecordBatch(_schema, + new IArrowArray[] + { + new Int32Array.Builder().AppendRange(new[] { 1, 2, 3 }).Build(), + new StringArray.Builder().AppendRange(new[] { "INTEGER", "VARCHAR", "BOOLEAN" }).Build(), + new Int32Array.Builder().AppendRange(new[] { 32, 255, 1 }).Build() + }, 3); + } + + Review Comment: nit: extra blank line ########## csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlTestUtils.cs: ########## @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Linq; +using Apache.Arrow.Flight.Tests; +using Apache.Arrow.Flight.TestWeb; + +namespace Apache.Arrow.Flight.Sql.Tests; + +public class FlightSqlTestUtils +{ + private readonly TestFlightSqlWebFactory _testWebFactory; + private readonly FlightStore _flightStore; + + public FlightSqlTestUtils(TestFlightSqlWebFactory testWebFactory, FlightStore flightStore) + { + _testWebFactory = testWebFactory; + _flightStore = flightStore; + } + + public RecordBatch CreateTestBatch(int startValue, int length) + { + var batchBuilder = new RecordBatch.Builder(); + Int32Array.Builder builder = new(); + for (int i = 0; i < length; i++) + { + builder.Append(startValue + i); + } + + batchBuilder.Append("test", true, builder.Build()); + return batchBuilder.Build(); + } + + Review Comment: nit: extra blank line ########## csharp/src/Apache.Arrow.Flight.Sql/FlightExtensions.cs: ########## @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; + +namespace Apache.Arrow.Flight.Sql; + +internal static class FlightExtensions +{ + public static byte[] PackAndSerialize(this IMessage command) => Any.Pack(command).ToByteArray(); + public static T ParseAndUnpack<T>(this ByteString source) where T : IMessage<T>, new() => Any.Parser.ParseFrom(source).Unpack<T>(); + + public static IEnumerable<long> ExtractRowCount(this RecordBatch batch) + { + foreach (var array in batch.Arrays) + { + var values = ExtractValues(array); + foreach (var value in values) + { + yield return value switch + { + long l => l, + int i => i != 0 ? i : 0, + _ => 0L + }; + } + } + } + + private static IEnumerable<object?> ExtractValues(IArrowArray array) + { + return array switch + { + Int32Array int32Array => ExtractPrimitiveValues(int32Array), + Int64Array int64Array => ExtractPrimitiveValues(int64Array), + FloatArray floatArray => ExtractPrimitiveValues(floatArray), + BooleanArray booleanArray => ExtractBooleanValues(booleanArray), + StringArray stringArray => ExtractStringValues(stringArray), + _ => throw new NotSupportedException($"Array type {array.GetType().Name} is not supported.") + }; + } + + private static IEnumerable<object?> ExtractPrimitiveValues<T>(PrimitiveArray<T> array) where T : struct, IEquatable<T> + { Review Comment: The underlying arrays in all three cases should already be `IEnumerable<>`. Can they not be used directly? ########## csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlClientTests.cs: ########## @@ -0,0 +1,855 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Apache.Arrow.Flight.Client; +using Apache.Arrow.Flight.Sql.Client; +using Apache.Arrow.Flight.TestWeb; +using Apache.Arrow.Types; +using Arrow.Flight.Protocol.Sql; +using Google.Protobuf; +using Grpc.Core.Utils; +using Xunit; + +namespace Apache.Arrow.Flight.Sql.Tests; + +public class FlightSqlClientTests : IDisposable +{ + readonly TestFlightSqlWebFactory _testWebFactory; + readonly FlightStore _flightStore; + private readonly FlightSqlClient _flightSqlClient; + private readonly FlightSqlTestUtils _testUtils; + + public FlightSqlClientTests() + { + _flightStore = new FlightStore(); + _testWebFactory = new TestFlightSqlWebFactory(_flightStore); + FlightClient flightClient = new(_testWebFactory.GetChannel()); + _flightSqlClient = new FlightSqlClient(flightClient); + + _testUtils = new FlightSqlTestUtils(_testWebFactory, _flightStore); + } + + #region Transactions + + [Fact] + public async Task CommitTransactionAsync() + { + // Arrange + string transactionId = "sample-transaction-id"; + var transaction = new Transaction(transactionId); + + // Act + var streamCall = _flightSqlClient.CommitAsync(transaction); + var result = await streamCall.ResponseStream.ToListAsync(); + + // Assert + Assert.NotNull(result); + Assert.Equal(transaction.TransactionId, result.FirstOrDefault()?.Body); + } + + [Fact] + public async Task BeginTransactionAsync() + { + // Arrange + string expectedTransactionId = "sample-transaction-id"; + + // Act + var transaction = await _flightSqlClient.BeginTransactionAsync(); + + // Assert + Assert.NotNull(transaction); + Assert.Equal(ByteString.CopyFromUtf8(expectedTransactionId), transaction.TransactionId); + } + + [Fact] + public async Task RollbackTransactionAsync() + { + // Arrange + string transactionId = "sample-transaction-id"; + var transaction = new Transaction(transactionId); + + // Act + var streamCall = _flightSqlClient.RollbackAsync(transaction); + var result = await streamCall.ResponseStream.ToListAsync(); + + // Assert + Assert.NotNull(transaction); + Assert.Equal(result.FirstOrDefault()?.Body, transaction.TransactionId); + } + + #endregion + + #region PreparedStatement + + [Fact] + public async Task PreparedAsync() + { + // Arrange + string query = "INSERT INTO users (id, name) VALUES (1, 'John Doe')"; + var transaction = new Transaction("sample-transaction-id"); + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + + // Create a sample schema for the dataset and parameters + var schema = new Schema.Builder() + .Field(f => f.Name("id").DataType(Int32Type.Default)) + .Field(f => f.Name("name").DataType(StringType.Default)) + .Build(); + + var recordBatch = new RecordBatch(schema, new Array[] + { + new Int32Array.Builder().Append(1).Build(), + new StringArray.Builder().Append("John Doe").Build() + }, 1); + + var flightHolder = new FlightHolder(flightDescriptor, schema, _testWebFactory.GetAddress()); + flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch)); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + var datasetSchemaBytes = SchemaExtensions.SerializeSchema(schema); + var parameterSchemaBytes = SchemaExtensions.SerializeSchema(schema); + + var preparedStatementResponse = new ActionCreatePreparedStatementResult + { + PreparedStatementHandle = ByteString.CopyFromUtf8("prepared-handle"), + DatasetSchema = ByteString.CopyFrom(datasetSchemaBytes), + ParameterSchema = ByteString.CopyFrom(parameterSchemaBytes) + }; + + // Act + var preparedStatement = await _flightSqlClient.PrepareAsync(query, transaction); + var deserializedDatasetSchema = SchemaExtensions.DeserializeSchema(preparedStatementResponse.DatasetSchema.ToByteArray()); + var deserializedParameterSchema = SchemaExtensions.DeserializeSchema(preparedStatementResponse.ParameterSchema.ToByteArray()); + + // Assert + Assert.NotNull(preparedStatement); + Assert.NotNull(deserializedDatasetSchema); + Assert.NotNull(deserializedParameterSchema); + CompareSchemas(schema, deserializedDatasetSchema); + CompareSchemas(schema, deserializedParameterSchema); + } + + #endregion + + [Fact] + public async Task ExecuteUpdateAsync() + { + // Arrange + string query = "UPDATE test_table SET column1 = 'value' WHERE column2 = 'condition'"; + var transaction = new Transaction("sample-transaction-id"); + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + + var schema = new Schema.Builder() + .Field(f => f.Name("id").DataType(Int32Type.Default)) + .Field(f => f.Name("name").DataType(StringType.Default)) + .Build(); + + var recordBatch = new RecordBatch(schema, new Array[] + { + new Int32Array.Builder().Append(1).Build(), + new StringArray.Builder().Append("John Doe").Build() + }, 1); + + + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, _testWebFactory.GetAddress()); + flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch)); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + // Act + long affectedRows = await _flightSqlClient.ExecuteUpdateAsync(query, transaction); + + // Assert + Assert.Equal(1, affectedRows); + } + + [Fact] + public async Task ExecuteAsync() + { + // Arrange + string query = "SELECT * FROM test_table"; + var transaction = new Transaction("sample-transaction-id"); + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + var recordBatch = _testUtils.CreateTestBatch(0, 100); + + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, _testWebFactory.GetAddress()); + flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch)); + + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + // Act + var flightInfo = await _flightSqlClient.ExecuteAsync(query, transaction); + + // Assert + Assert.NotNull(flightInfo); + Assert.Single(flightInfo.Endpoints); + } + + [Fact] + public async Task ExecuteAsync_ShouldReturnFlightInfo_WhenValidInputsAreProvided() + { + // Arrange + string query = "SELECT * FROM test_table"; + var transaction = new Transaction("sample-transaction-id"); + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + var recordBatch = _testUtils.CreateTestBatch(0, 100); + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, + _testWebFactory.GetAddress()); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + // Act + var flightInfo = await _flightSqlClient.ExecuteAsync(query, transaction); + + // Assert + Assert.NotNull(flightInfo); + Assert.IsType<FlightInfo>(flightInfo); + } + + [Fact] + public async Task ExecuteAsync_ShouldThrowArgumentException_WhenQueryIsEmpty() + { + // Arrange + string emptyQuery = string.Empty; + var transaction = new Transaction("sample-transaction-id"); + + // Act & Assert + await Assert.ThrowsAsync<ArgumentException>(async () => + await _flightSqlClient.ExecuteAsync(emptyQuery, transaction)); + } + + [Fact] + public async Task ExecuteAsync_ShouldReturnFlightInfo_WhenTransactionIsNoTransaction() + { + // Arrange + string query = "SELECT * FROM test_table"; + var transaction = Transaction.NoTransaction; + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + var recordBatch = _testUtils.CreateTestBatch(0, 100); + var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, + _testWebFactory.GetAddress()); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + // Act + var flightInfo = await _flightSqlClient.ExecuteAsync(query, transaction); + + // Assert + Assert.NotNull(flightInfo); + Assert.IsType<FlightInfo>(flightInfo); + } + + Review Comment: nit: extra blank line ########## csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlClientTests.cs: ########## @@ -0,0 +1,855 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Apache.Arrow.Flight.Client; +using Apache.Arrow.Flight.Sql.Client; +using Apache.Arrow.Flight.TestWeb; +using Apache.Arrow.Types; +using Arrow.Flight.Protocol.Sql; +using Google.Protobuf; +using Grpc.Core.Utils; +using Xunit; + +namespace Apache.Arrow.Flight.Sql.Tests; + +public class FlightSqlClientTests : IDisposable +{ + readonly TestFlightSqlWebFactory _testWebFactory; + readonly FlightStore _flightStore; + private readonly FlightSqlClient _flightSqlClient; + private readonly FlightSqlTestUtils _testUtils; + + public FlightSqlClientTests() + { + _flightStore = new FlightStore(); + _testWebFactory = new TestFlightSqlWebFactory(_flightStore); + FlightClient flightClient = new(_testWebFactory.GetChannel()); + _flightSqlClient = new FlightSqlClient(flightClient); + + _testUtils = new FlightSqlTestUtils(_testWebFactory, _flightStore); + } + + #region Transactions + + [Fact] + public async Task CommitTransactionAsync() + { + // Arrange + string transactionId = "sample-transaction-id"; + var transaction = new Transaction(transactionId); + + // Act + var streamCall = _flightSqlClient.CommitAsync(transaction); + var result = await streamCall.ResponseStream.ToListAsync(); + + // Assert + Assert.NotNull(result); + Assert.Equal(transaction.TransactionId, result.FirstOrDefault()?.Body); + } + + [Fact] + public async Task BeginTransactionAsync() + { + // Arrange + string expectedTransactionId = "sample-transaction-id"; + + // Act + var transaction = await _flightSqlClient.BeginTransactionAsync(); + + // Assert + Assert.NotNull(transaction); + Assert.Equal(ByteString.CopyFromUtf8(expectedTransactionId), transaction.TransactionId); + } + + [Fact] + public async Task RollbackTransactionAsync() + { + // Arrange + string transactionId = "sample-transaction-id"; + var transaction = new Transaction(transactionId); + + // Act + var streamCall = _flightSqlClient.RollbackAsync(transaction); + var result = await streamCall.ResponseStream.ToListAsync(); + + // Assert + Assert.NotNull(transaction); + Assert.Equal(result.FirstOrDefault()?.Body, transaction.TransactionId); + } + + #endregion + + #region PreparedStatement + + [Fact] + public async Task PreparedAsync() + { + // Arrange + string query = "INSERT INTO users (id, name) VALUES (1, 'John Doe')"; + var transaction = new Transaction("sample-transaction-id"); + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + + // Create a sample schema for the dataset and parameters + var schema = new Schema.Builder() + .Field(f => f.Name("id").DataType(Int32Type.Default)) + .Field(f => f.Name("name").DataType(StringType.Default)) + .Build(); + + var recordBatch = new RecordBatch(schema, new Array[] + { + new Int32Array.Builder().Append(1).Build(), + new StringArray.Builder().Append("John Doe").Build() + }, 1); + + var flightHolder = new FlightHolder(flightDescriptor, schema, _testWebFactory.GetAddress()); + flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch)); + _flightStore.Flights.Add(flightDescriptor, flightHolder); + + var datasetSchemaBytes = SchemaExtensions.SerializeSchema(schema); + var parameterSchemaBytes = SchemaExtensions.SerializeSchema(schema); + + var preparedStatementResponse = new ActionCreatePreparedStatementResult + { + PreparedStatementHandle = ByteString.CopyFromUtf8("prepared-handle"), + DatasetSchema = ByteString.CopyFrom(datasetSchemaBytes), + ParameterSchema = ByteString.CopyFrom(parameterSchemaBytes) + }; + + // Act + var preparedStatement = await _flightSqlClient.PrepareAsync(query, transaction); + var deserializedDatasetSchema = SchemaExtensions.DeserializeSchema(preparedStatementResponse.DatasetSchema.ToByteArray()); + var deserializedParameterSchema = SchemaExtensions.DeserializeSchema(preparedStatementResponse.ParameterSchema.ToByteArray()); + + // Assert + Assert.NotNull(preparedStatement); + Assert.NotNull(deserializedDatasetSchema); + Assert.NotNull(deserializedParameterSchema); + CompareSchemas(schema, deserializedDatasetSchema); + CompareSchemas(schema, deserializedParameterSchema); + } + + #endregion + + [Fact] + public async Task ExecuteUpdateAsync() + { + // Arrange + string query = "UPDATE test_table SET column1 = 'value' WHERE column2 = 'condition'"; + var transaction = new Transaction("sample-transaction-id"); + var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test"); + + var schema = new Schema.Builder() + .Field(f => f.Name("id").DataType(Int32Type.Default)) + .Field(f => f.Name("name").DataType(StringType.Default)) + .Build(); + + var recordBatch = new RecordBatch(schema, new Array[] + { + new Int32Array.Builder().Append(1).Build(), + new StringArray.Builder().Append("John Doe").Build() + }, 1); + + Review Comment: nit: extra blank line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
