CurtHagenlocher commented on code in PR #44783:
URL: https://github.com/apache/arrow/pull/44783#discussion_r1947995089


##########
csharp/src/Apache.Arrow.Flight.Sql/TableRef.cs:
##########
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+namespace Apache.Arrow.Flight.Sql;
+
+public class TableRef
+{
+    public string? Catalog { get; set; }
+    public string DbSchema { get; set; } = null!;

Review Comment:
   The `null!` seems dodgy. As these are required to be set, consider using a 
pair of constructors `TableRef(string, string)` and `TableRef(string, string, 
string)` instead.



##########
csharp/src/Apache.Arrow.Flight.Sql/SchemaExtensions.cs:
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using Apache.Arrow.Ipc;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public static class SchemaExtensions
+{
+    /// <summary>
+    /// Deserializes a schema from a byte array.
+    /// </summary>
+    /// <param name="serializedSchema">The byte array representing the 
serialized schema.</param>
+    /// <returns>The deserialized Schema object.</returns>
+    public static Schema DeserializeSchema(byte[] serializedSchema)
+    {
+        if (serializedSchema == null || serializedSchema.Length == 0)
+        {
+            throw new ArgumentException("Invalid serialized schema");
+        }
+
+        using var stream = new MemoryStream(serializedSchema);
+        var reader = new ArrowStreamReader(stream);

Review Comment:
   There's a ctor for `ArrowStreamReader` which takes a `ReadOnlyMemory<byte>` 
as an argument, so you can just pass in the bytes. This is probably a bit more 
efficient.



##########
csharp/src/Apache.Arrow.Flight.Sql/DoPutResult.cs:
##########
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Client;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public class DoPutResult
+{
+    public FlightClientRecordBatchStreamWriter Writer { get; }
+    public IAsyncStreamReader<FlightPutResult> Reader { get; }
+
+    public DoPutResult(FlightClientRecordBatchStreamWriter writer, 
IAsyncStreamReader<FlightPutResult> reader)
+    {
+        Writer = writer;
+        Reader = reader;
+    }
+    
+    /// <summary>
+    /// Reads the metadata asynchronously from the reader.
+    /// </summary>
+    /// <returns>A ByteString containing the metadata read from the 
reader.</returns>
+    public async Task<Google.Protobuf.ByteString> ReadMetadataAsync()

Review Comment:
   In general, none of the `async` methods in this change take a 
`CancellationToken`. Shouldn't they?



##########
csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlClientTests.cs:
##########
@@ -0,0 +1,855 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Client;
+using Apache.Arrow.Flight.Sql.Client;
+using Apache.Arrow.Flight.TestWeb;
+using Apache.Arrow.Types;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Grpc.Core.Utils;
+using Xunit;
+
+namespace Apache.Arrow.Flight.Sql.Tests;
+
+public class FlightSqlClientTests : IDisposable
+{
+    readonly TestFlightSqlWebFactory _testWebFactory;
+    readonly FlightStore _flightStore;
+    private readonly FlightSqlClient _flightSqlClient;
+    private readonly FlightSqlTestUtils _testUtils;
+
+    public FlightSqlClientTests()
+    {
+        _flightStore = new FlightStore();
+        _testWebFactory = new TestFlightSqlWebFactory(_flightStore);
+        FlightClient flightClient = new(_testWebFactory.GetChannel());
+        _flightSqlClient = new FlightSqlClient(flightClient);
+
+        _testUtils = new FlightSqlTestUtils(_testWebFactory, _flightStore);
+    }
+
+    #region Transactions
+
+    [Fact]
+    public async Task CommitTransactionAsync()
+    {
+        // Arrange
+        string transactionId = "sample-transaction-id";
+        var transaction = new Transaction(transactionId);
+
+        // Act
+        var streamCall = _flightSqlClient.CommitAsync(transaction);
+        var result = await streamCall.ResponseStream.ToListAsync();
+
+        // Assert
+        Assert.NotNull(result);
+        Assert.Equal(transaction.TransactionId, result.FirstOrDefault()?.Body);
+    }
+
+    [Fact]
+    public async Task BeginTransactionAsync()
+    {
+        // Arrange
+        string expectedTransactionId = "sample-transaction-id";
+
+        // Act
+        var transaction = await _flightSqlClient.BeginTransactionAsync();
+
+        // Assert
+        Assert.NotNull(transaction);
+        Assert.Equal(ByteString.CopyFromUtf8(expectedTransactionId), 
transaction.TransactionId);
+    }
+
+    [Fact]
+    public async Task RollbackTransactionAsync()
+    {
+        // Arrange
+        string transactionId = "sample-transaction-id";
+        var transaction = new Transaction(transactionId);
+
+        // Act
+        var streamCall = _flightSqlClient.RollbackAsync(transaction);
+        var result = await streamCall.ResponseStream.ToListAsync();
+
+        // Assert
+        Assert.NotNull(transaction);
+        Assert.Equal(result.FirstOrDefault()?.Body, transaction.TransactionId);
+    }
+
+    #endregion
+
+    #region PreparedStatement
+
+    [Fact]
+    public async Task PreparedAsync()
+    {
+        // Arrange
+        string query = "INSERT INTO users (id, name) VALUES (1, 'John Doe')";
+        var transaction = new Transaction("sample-transaction-id");
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+
+        // Create a sample schema for the dataset and parameters
+        var schema = new Schema.Builder()
+            .Field(f => f.Name("id").DataType(Int32Type.Default))
+            .Field(f => f.Name("name").DataType(StringType.Default))
+            .Build();
+
+        var recordBatch = new RecordBatch(schema, new Array[]
+        {
+            new Int32Array.Builder().Append(1).Build(),
+            new StringArray.Builder().Append("John Doe").Build()
+        }, 1);
+        
+        var flightHolder = new FlightHolder(flightDescriptor, schema, 
_testWebFactory.GetAddress());
+        flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch));
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+        
+        var datasetSchemaBytes = SchemaExtensions.SerializeSchema(schema);
+        var parameterSchemaBytes = SchemaExtensions.SerializeSchema(schema);
+        
+        var preparedStatementResponse = new ActionCreatePreparedStatementResult
+        {
+            PreparedStatementHandle = 
ByteString.CopyFromUtf8("prepared-handle"),
+            DatasetSchema = ByteString.CopyFrom(datasetSchemaBytes),
+            ParameterSchema = ByteString.CopyFrom(parameterSchemaBytes)
+        };
+        
+        // Act
+        var preparedStatement = await _flightSqlClient.PrepareAsync(query, 
transaction);
+        var deserializedDatasetSchema = 
SchemaExtensions.DeserializeSchema(preparedStatementResponse.DatasetSchema.ToByteArray());
+        var deserializedParameterSchema = 
SchemaExtensions.DeserializeSchema(preparedStatementResponse.ParameterSchema.ToByteArray());
+
+        // Assert
+        Assert.NotNull(preparedStatement);
+        Assert.NotNull(deserializedDatasetSchema);
+        Assert.NotNull(deserializedParameterSchema);
+        CompareSchemas(schema, deserializedDatasetSchema);
+        CompareSchemas(schema, deserializedParameterSchema);
+    }
+
+    #endregion
+
+    [Fact]
+    public async Task ExecuteUpdateAsync()
+    {
+        // Arrange
+        string query = "UPDATE test_table SET column1 = 'value' WHERE column2 
= 'condition'";
+        var transaction = new Transaction("sample-transaction-id");
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        
+        var schema = new Schema.Builder()
+            .Field(f => f.Name("id").DataType(Int32Type.Default))
+            .Field(f => f.Name("name").DataType(StringType.Default))
+            .Build();
+
+        var recordBatch = new RecordBatch(schema, new Array[]
+        {
+            new Int32Array.Builder().Append(1).Build(),
+            new StringArray.Builder().Append("John Doe").Build()
+        }, 1);
+        
+
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema, _testWebFactory.GetAddress());
+        flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch));
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+
+        // Act
+        long affectedRows = await _flightSqlClient.ExecuteUpdateAsync(query, 
transaction);
+
+        // Assert
+        Assert.Equal(1, affectedRows);
+    }
+
+    [Fact]
+    public async Task ExecuteAsync()
+    {
+        // Arrange
+        string query = "SELECT * FROM test_table";
+        var transaction = new Transaction("sample-transaction-id");
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        var recordBatch = _testUtils.CreateTestBatch(0, 100);
+
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema, _testWebFactory.GetAddress());
+        flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch));
+
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+
+        // Act
+        var flightInfo = await _flightSqlClient.ExecuteAsync(query, 
transaction);
+
+        // Assert
+        Assert.NotNull(flightInfo);
+        Assert.Single(flightInfo.Endpoints);
+    }
+
+    [Fact]
+    public async Task 
ExecuteAsync_ShouldReturnFlightInfo_WhenValidInputsAreProvided()
+    {
+        // Arrange
+        string query = "SELECT * FROM test_table";
+        var transaction = new Transaction("sample-transaction-id");
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        var recordBatch = _testUtils.CreateTestBatch(0, 100);
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema,
+            _testWebFactory.GetAddress());
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+        
+        // Act
+        var flightInfo = await _flightSqlClient.ExecuteAsync(query, 
transaction);
+
+        // Assert
+        Assert.NotNull(flightInfo);
+        Assert.IsType<FlightInfo>(flightInfo);
+    }
+    
+    [Fact]
+    public async Task 
ExecuteAsync_ShouldThrowArgumentException_WhenQueryIsEmpty()
+    {
+        // Arrange
+        string emptyQuery = string.Empty;
+        var transaction = new Transaction("sample-transaction-id");
+
+        // Act & Assert
+        await Assert.ThrowsAsync<ArgumentException>(async () =>
+            await _flightSqlClient.ExecuteAsync(emptyQuery, transaction));
+    }
+    
+    [Fact]
+    public async Task 
ExecuteAsync_ShouldReturnFlightInfo_WhenTransactionIsNoTransaction()
+    {
+        // Arrange
+        string query = "SELECT * FROM test_table";
+        var transaction = Transaction.NoTransaction;
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        var recordBatch = _testUtils.CreateTestBatch(0, 100);
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema,
+            _testWebFactory.GetAddress());
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+        
+        // Act
+        var flightInfo = await _flightSqlClient.ExecuteAsync(query, 
transaction);
+
+        // Assert
+        Assert.NotNull(flightInfo);
+        Assert.IsType<FlightInfo>(flightInfo);
+    }
+
+    
+    [Fact]
+    public async Task GetFlightInfoAsync()
+    {
+        // Arrange
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        var recordBatch = _testUtils.CreateTestBatch(0, 100);
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema,
+            _testWebFactory.GetAddress());
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+        // Act
+        var flightInfo = await 
_flightSqlClient.GetFlightInfoAsync(flightDescriptor);
+
+        // Assert
+        Assert.NotNull(flightInfo);
+    }
+
+    [Fact]
+    public async Task GetExecuteSchemaAsync()
+    {
+        // Arrange
+        string query = "SELECT * FROM test_table";
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        var recordBatch = _testUtils.CreateTestBatch(0, 100);
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema,
+            _testWebFactory.GetAddress());
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+
+        // Act
+        Schema resultSchema =
+            await _flightSqlClient.GetExecuteSchemaAsync(query, new 
Transaction("sample-transaction-id"));
+
+        // Assert
+        Assert.NotNull(resultSchema);
+        Assert.Equal(recordBatch.Schema.FieldsList.Count, 
resultSchema.FieldsList.Count);
+        CompareSchemas(resultSchema, recordBatch.Schema);
+    }
+
+    [Fact]
+    public async Task GetCatalogsAsync()
+    {
+        // Arrange
+        var options = new FlightCallOptions();
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        var recordBatch = _testUtils.CreateTestBatch(0, 100);
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema,
+            _testWebFactory.GetAddress());
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+
+        // Act
+        var result = await _flightSqlClient.GetCatalogsAsync(options);
+
+        // Assert
+        Assert.NotNull(result);
+        Assert.Equal(flightHolder.GetFlightInfo().Endpoints.Count, 
result.Endpoints.Count);
+        Assert.Equal(flightDescriptor, result.Descriptor);
+    }
+
+    [Fact]
+    public async Task GetSchemaAsync()
+    {
+        // Arrange
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        var recordBatch = _testUtils.CreateTestBatch(0, 100);
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema,
+            _testWebFactory.GetAddress());
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+
+        // Act
+        var result = await _flightSqlClient.GetSchemaAsync(flightDescriptor);
+
+        // Assert
+        Assert.NotNull(result);
+        Assert.Equal(recordBatch.Schema.FieldsList.Count, 
result.FieldsList.Count);
+        CompareSchemas(result, recordBatch.Schema);
+    }
+
+    [Fact]
+    public async Task GetDbSchemasAsync()
+    {
+        // Arrange
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        var recordBatch = _testUtils.CreateTestBatch(0, 100);
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema,
+            _testWebFactory.GetAddress());
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+        string catalog = "test-catalog";
+        string dbSchemaFilterPattern = "test-schema-pattern";
+
+        // Act
+        var result = await _flightSqlClient.GetDbSchemasAsync(catalog, 
dbSchemaFilterPattern);
+
+        // Assert
+        Assert.NotNull(result);
+        var expectedFlightInfo = flightHolder.GetFlightInfo();
+        Assert.Equal(recordBatch.Schema.FieldsList.Count, 
result.Schema.FieldsList.Count);
+        Assert.Equal(expectedFlightInfo.Descriptor.Command, 
result.Descriptor.Command);
+        Assert.Equal(expectedFlightInfo.Descriptor.Type, 
result.Descriptor.Type);
+        Assert.Equal(expectedFlightInfo.Schema.FieldsList.Count, 
result.Schema.FieldsList.Count);
+        Assert.Equal(expectedFlightInfo.Endpoints.Count, 
result.Endpoints.Count);
+
+        for (int i = 0; i < expectedFlightInfo.Schema.FieldsList.Count; i++)
+        {
+            var expectedField = expectedFlightInfo.Schema.FieldsList[i];
+            var actualField = result.Schema.FieldsList[i];
+
+            Assert.Equal(expectedField.Name, actualField.Name);
+            Assert.Equal(expectedField.DataType, actualField.DataType);
+            Assert.Equal(expectedField.IsNullable, actualField.IsNullable);
+            Assert.Equal(expectedField.Metadata?.Count ?? 0, 
actualField.Metadata?.Count ?? 0);
+        }
+
+        for (int i = 0; i < expectedFlightInfo.Endpoints.Count; i++)
+        {
+            var expectedEndpoint = expectedFlightInfo.Endpoints[i];
+            var actualEndpoint = result.Endpoints[i];
+
+            Assert.Equal(expectedEndpoint.Ticket, actualEndpoint.Ticket);
+            Assert.Equal(expectedEndpoint.Locations.Count(), 
actualEndpoint.Locations.Count());
+        }
+    }
+
+    [Fact]
+    public async Task GetPrimaryKeysAsync()
+    {
+        // Arrange
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        var recordBatch = _testUtils.CreateTestBatch(0, 100);
+        var tableRef = new TableRef { Catalog = "test-catalog", Table = 
"test-table", DbSchema = "test-schema" };
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema,
+            _testWebFactory.GetAddress());
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+
+        // Act
+        var result = await _flightSqlClient.GetPrimaryKeysAsync(tableRef);
+
+        // Assert
+        Assert.NotNull(result);
+        var expectedFlightInfo = flightHolder.GetFlightInfo();
+        Assert.Equal(expectedFlightInfo.Descriptor.Command, 
result.Descriptor.Command);
+        Assert.Equal(expectedFlightInfo.Descriptor.Type, 
result.Descriptor.Type);
+        Assert.Equal(expectedFlightInfo.Schema.FieldsList.Count, 
result.Schema.FieldsList.Count);
+
+        for (int i = 0; i < expectedFlightInfo.Schema.FieldsList.Count; i++)
+        {
+            var expectedField = expectedFlightInfo.Schema.FieldsList[i];
+            var actualField = result.Schema.FieldsList[i];
+            Assert.Equal(expectedField.Name, actualField.Name);
+            Assert.Equal(expectedField.DataType, actualField.DataType);
+            Assert.Equal(expectedField.IsNullable, actualField.IsNullable);
+            Assert.Equal(expectedField.Metadata?.Count ?? 0, 
actualField.Metadata?.Count ?? 0);
+        }
+
+        Assert.Equal(expectedFlightInfo.Endpoints.Count, 
result.Endpoints.Count);
+
+        for (int i = 0; i < expectedFlightInfo.Endpoints.Count; i++)
+        {
+            var expectedEndpoint = expectedFlightInfo.Endpoints[i];
+            var actualEndpoint = result.Endpoints[i];
+
+            Assert.Equal(expectedEndpoint.Ticket, actualEndpoint.Ticket);
+            Assert.Equal(expectedEndpoint.Locations.Count(), 
actualEndpoint.Locations.Count());
+        }
+    }
+
+    [Fact]
+    public async Task GetTablesAsync()
+    {
+        // Arrange
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        var recordBatch = _testUtils.CreateTestBatch(0, 100);
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema,
+            _testWebFactory.GetAddress());
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+
+        string catalog = "sample_catalog";
+        string dbSchemaFilterPattern = "sample_schema";
+        string tableFilterPattern = "sample_table";
+        bool includeSchema = true;
+        var tableTypes = new List<string> { "BASE TABLE" };
+
+        // Act
+        var result = await _flightSqlClient.GetTablesAsync(catalog, 
dbSchemaFilterPattern, tableFilterPattern,
+            includeSchema, tableTypes);
+
+        // Assert
+        Assert.NotNull(result);
+        Assert.Single(result);
+
+        var expectedFlightInfo = flightHolder.GetFlightInfo();
+        var flightInfo = result.First();
+        Assert.Equal(expectedFlightInfo.Descriptor.Command, 
flightInfo.Descriptor.Command);
+        Assert.Equal(expectedFlightInfo.Descriptor.Type, 
flightInfo.Descriptor.Type);
+        Assert.Equal(expectedFlightInfo.Schema.FieldsList.Count, 
flightInfo.Schema.FieldsList.Count);
+        for (int i = 0; i < expectedFlightInfo.Schema.FieldsList.Count; i++)
+        {
+            var expectedField = expectedFlightInfo.Schema.FieldsList[i];
+            var actualField = flightInfo.Schema.FieldsList[i];
+            Assert.Equal(expectedField.Name, actualField.Name);
+            Assert.Equal(expectedField.DataType, actualField.DataType);
+            Assert.Equal(expectedField.IsNullable, actualField.IsNullable);
+        }
+
+        Assert.Equal(expectedFlightInfo.Endpoints.Count, 
flightInfo.Endpoints.Count);
+    }
+
+

Review Comment:
   nit: extra blank line



##########
csharp/src/Apache.Arrow.Flight.Sql/Transaction.cs:
##########
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Google.Protobuf;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public class Transaction

Review Comment:
   Should this be a value type?



##########
csharp/examples/Examples.sln:
##########
@@ -7,6 +7,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = 
"FluentBuilderExample", "Flu
 EndProject
 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow", 
"..\src\Apache.Arrow\Apache.Arrow.csproj", 
"{1FE1DE95-FF6E-4895-82E7-909713C53524}"
 EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FlightClientExample", 
"FlightClientExample\FlightClientExample.csproj", 
"{CBB46C39-530D-465A-9367-4E771595209A}"
+EndProject

Review Comment:
   FlightClientExample was added to Examples.sln but its files are not part of 
this pull request.



##########
csharp/src/Apache.Arrow.Flight/FlightInfoCancelRequest.cs:
##########
@@ -0,0 +1,38 @@
+using System;

Review Comment:
   Added files must have the Apache header. It's missing from this file as well 
as FlightInfoCancelResult.cs.



##########
csharp/src/Apache.Arrow.Flight.Sql/Transaction.cs:
##########
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Google.Protobuf;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public class Transaction
+{
+    private static readonly ByteString TransactionIdDefaultValue = 
ByteString.Empty;
+    private ByteString? _transactionId;
+
+    public ByteString TransactionId
+    {
+        get => _transactionId ?? TransactionIdDefaultValue;
+        set => _transactionId = ProtoPreconditions.CheckNotNull(value, 
nameof(value));
+    }
+
+    public static readonly Transaction NoTransaction = 
new(TransactionIdDefaultValue);
+
+    public Transaction(ByteString transactionId)
+    {
+        TransactionId = transactionId;
+    }
+
+    public Transaction(string transactionId)
+    {
+        _transactionId = ByteString.CopyFromUtf8(transactionId);
+    }
+
+    public bool IsValid() => TransactionId.Length > 0;
+    public void ResetTransaction()

Review Comment:
   `ResetTransaction` doesnt appear to be used.



##########
csharp/src/Apache.Arrow.Flight.Sql/Transaction.cs:
##########
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Google.Protobuf;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public class Transaction
+{
+    private static readonly ByteString TransactionIdDefaultValue = 
ByteString.Empty;
+    private ByteString? _transactionId;
+
+    public ByteString TransactionId
+    {
+        get => _transactionId ?? TransactionIdDefaultValue;
+        set => _transactionId = ProtoPreconditions.CheckNotNull(value, 
nameof(value));
+    }
+
+    public static readonly Transaction NoTransaction = 
new(TransactionIdDefaultValue);
+
+    public Transaction(ByteString transactionId)
+    {
+        TransactionId = transactionId;
+    }
+
+    public Transaction(string transactionId)
+    {
+        _transactionId = ByteString.CopyFromUtf8(transactionId);
+    }
+
+    public bool IsValid() => TransactionId.Length > 0;

Review Comment:
   Should this be a property?



##########
csharp/src/Apache.Arrow.Flight.Sql/RecordBatchExtensions.cs:
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+using Google.Protobuf;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public static class RecordBatchExtensions
+{
+    /// <summary>
+    /// Converts a RecordBatch into an asynchronous stream of FlightData.
+    /// </summary>
+    /// <param name="recordBatch">The RecordBatch to convert.</param>
+    /// <param name="flightDescriptor">The FlightDescriptor describing the 
Flight data.</param>
+    /// <returns>An asynchronous stream of FlightData objects.</returns>
+    public static async IAsyncEnumerable<FlightData> 
ToFlightDataStreamAsync(this RecordBatch recordBatch,
+        FlightDescriptor flightDescriptor)
+    {
+        if (recordBatch == null)
+        {
+            throw new ArgumentNullException(nameof(recordBatch));
+        }
+
+        // Use a memory stream to write the Arrow RecordBatch into FlightData 
format
+        using var memoryStream = new MemoryStream();
+        var writer = new ArrowStreamWriter(memoryStream, recordBatch.Schema);
+
+        // Write the RecordBatch to the stream
+        await writer.WriteRecordBatchAsync(recordBatch).ConfigureAwait(false);
+        await writer.WriteEndAsync().ConfigureAwait(false);
+
+        // Reset the memory stream position
+        memoryStream.Position = 0;
+
+        // Read back the data to create FlightData
+        var flightData = new FlightData(flightDescriptor, 
ByteString.CopyFrom(memoryStream.ToArray()),

Review Comment:
   `ByteString.CopyFrom(memoryStream.ToArray())` makes an extra copy of the 
data. This can be avoided by doing 
`ByteString.CopyFrom(memoryStream.GetBuffer(), 0, (int)memoryStream.Length)`. 



##########
csharp/src/Apache.Arrow.Flight.Sql/PreparedStatement.cs:
##########
@@ -0,0 +1,383 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Sql.Client;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public class PreparedStatement : IDisposable
+{
+    private readonly FlightSqlClient _client;
+    private readonly string _handle;
+    private Schema _datasetSchema;
+    private Schema _parameterSchema;
+    private RecordBatch? _recordsBatch;
+    private bool _isClosed;
+    public bool IsClosed => _isClosed;
+    public string Handle => _handle;
+    public RecordBatch? ParametersBatch => _recordsBatch;
+
+    /// <summary>
+    /// Initializes a new instance of the <see cref="PreparedStatement"/> 
class.
+    /// </summary>
+    /// <param name="client">The Flight SQL client used for executing SQL 
operations.</param>
+    /// <param name="handle">The handle representing the prepared 
statement.</param>
+    /// <param name="datasetSchema">The schema of the result dataset.</param>
+    /// <param name="parameterSchema">The schema of the parameters for this 
prepared statement.</param>
+    public PreparedStatement(FlightSqlClient client, string handle, Schema 
datasetSchema, Schema parameterSchema)
+    {
+        _client = client ?? throw new ArgumentNullException(nameof(client));
+        _handle = handle ?? throw new ArgumentNullException(nameof(handle));
+        _datasetSchema = datasetSchema ?? throw new 
ArgumentNullException(nameof(datasetSchema));
+        _parameterSchema = parameterSchema ?? throw new 
ArgumentNullException(nameof(parameterSchema));
+        _isClosed = false;
+    }
+
+    /// <summary>
+    /// Retrieves the schema associated with the prepared statement 
asynchronously.
+    /// </summary>
+    /// <param name="options">The options used to configure the Flight 
call.</param>
+    /// <returns>A task representing the asynchronous operation, which returns 
the schema of the result set.</returns>
+    /// <exception cref="InvalidOperationException">Thrown when the schema is 
empty or invalid.</exception>
+    public async Task<Schema> GetSchemaAsync(FlightCallOptions? options = 
default)
+    {
+        EnsureStatementIsNotClosed();
+
+        try
+        {
+            var command = new CommandPreparedStatementQuery
+            {
+                PreparedStatementHandle = ByteString.CopyFrom(_handle, 
Encoding.UTF8)
+            };
+            var descriptor = 
FlightDescriptor.CreateCommandDescriptor(command.PackAndSerialize());
+            var schema = await _client.GetSchemaAsync(descriptor, 
options).ConfigureAwait(false);
+            if (schema == null || !schema.FieldsList.Any())
+            {
+                throw new InvalidOperationException("Schema is empty or 
invalid.");
+            }
+            return schema;
+        }
+        catch (RpcException ex)
+        {
+            throw new InvalidOperationException("Failed to retrieve the schema 
for the prepared statement", ex);
+        }
+    }
+
+

Review Comment:
   nit: extra blank line



##########
csharp/src/Apache.Arrow.Flight.Sql/RecordBatchExtensions.cs:
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+using Google.Protobuf;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public static class RecordBatchExtensions
+{
+    /// <summary>
+    /// Converts a RecordBatch into an asynchronous stream of FlightData.
+    /// </summary>
+    /// <param name="recordBatch">The RecordBatch to convert.</param>
+    /// <param name="flightDescriptor">The FlightDescriptor describing the 
Flight data.</param>
+    /// <returns>An asynchronous stream of FlightData objects.</returns>
+    public static async IAsyncEnumerable<FlightData> 
ToFlightDataStreamAsync(this RecordBatch recordBatch,
+        FlightDescriptor flightDescriptor)
+    {
+        if (recordBatch == null)
+        {
+            throw new ArgumentNullException(nameof(recordBatch));
+        }
+
+        // Use a memory stream to write the Arrow RecordBatch into FlightData 
format
+        using var memoryStream = new MemoryStream();
+        var writer = new ArrowStreamWriter(memoryStream, recordBatch.Schema);
+
+        // Write the RecordBatch to the stream
+        await writer.WriteRecordBatchAsync(recordBatch).ConfigureAwait(false);
+        await writer.WriteEndAsync().ConfigureAwait(false);
+
+        // Reset the memory stream position
+        memoryStream.Position = 0;
+
+        // Read back the data to create FlightData
+        var flightData = new FlightData(flightDescriptor, 
ByteString.CopyFrom(memoryStream.ToArray()),
+            ByteString.CopyFrom(memoryStream.ToArray()));

Review Comment:
   Is it correct for the `dataBody` and `dataHeader` arguments to have the same 
value? The sync version passes `ByteString.Empty` as the `dataHeader` parameter.



##########
csharp/src/Apache.Arrow.Flight.Sql/FlightCallOptions.cs:
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Buffers;
+using System.Threading;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public class FlightCallOptions
+{
+    public FlightCallOptions()
+    {
+        Timeout = TimeSpan.FromSeconds(-1);
+    }
+    // Implement any necessary options for RPC calls
+    public Metadata Headers { get; set; } = new();
+
+    /// <summary>
+    /// Gets or sets a token to enable interactive user cancellation of 
long-running requests.
+    /// </summary>
+    public CancellationToken StopToken { get; set; }
+
+    /// <summary>
+    /// Gets or sets the optional timeout for this call.
+    /// Negative durations mean an implementation-defined default behavior 
will be used instead.
+    /// </summary>
+    public TimeSpan Timeout { get; set; }
+
+    /// <summary>
+    /// Gets or sets an optional memory manager to control where to allocate 
incoming data.
+    /// </summary>
+    public MemoryManager<ArrowBuffer>? MemoryManager { get; set; }

Review Comment:
   Nothing references `MemoryManager`. To the extent that these are 
aspirational, it would be better to remove them now and add them back in a 
separate PR once implemented instead of putting them in unused and giving the 
impression that they might do something.



##########
csharp/src/Apache.Arrow.Flight.Sql/PreparedStatement.cs:
##########
@@ -0,0 +1,383 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Sql.Client;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public class PreparedStatement : IDisposable
+{
+    private readonly FlightSqlClient _client;
+    private readonly string _handle;
+    private Schema _datasetSchema;
+    private Schema _parameterSchema;

Review Comment:
   Neither of these schemas appear to be used.



##########
csharp/src/Apache.Arrow.Flight.Sql/RecordBatchExtensions.cs:
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+using Google.Protobuf;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public static class RecordBatchExtensions
+{
+    /// <summary>
+    /// Converts a RecordBatch into an asynchronous stream of FlightData.
+    /// </summary>
+    /// <param name="recordBatch">The RecordBatch to convert.</param>
+    /// <param name="flightDescriptor">The FlightDescriptor describing the 
Flight data.</param>
+    /// <returns>An asynchronous stream of FlightData objects.</returns>
+    public static async IAsyncEnumerable<FlightData> 
ToFlightDataStreamAsync(this RecordBatch recordBatch,
+        FlightDescriptor flightDescriptor)
+    {
+        if (recordBatch == null)
+        {
+            throw new ArgumentNullException(nameof(recordBatch));
+        }
+
+        // Use a memory stream to write the Arrow RecordBatch into FlightData 
format
+        using var memoryStream = new MemoryStream();
+        var writer = new ArrowStreamWriter(memoryStream, recordBatch.Schema);
+
+        // Write the RecordBatch to the stream
+        await writer.WriteRecordBatchAsync(recordBatch).ConfigureAwait(false);
+        await writer.WriteEndAsync().ConfigureAwait(false);
+
+        // Reset the memory stream position
+        memoryStream.Position = 0;
+
+        // Read back the data to create FlightData
+        var flightData = new FlightData(flightDescriptor, 
ByteString.CopyFrom(memoryStream.ToArray()),
+            ByteString.CopyFrom(memoryStream.ToArray()));
+        yield return flightData;
+    }
+
+    /// <summary>
+    /// Converts a RecordBatch into an IAsyncStreamReader<FlightData>.
+    /// </summary>
+    /// <param name="recordBatch">The RecordBatch to convert.</param>
+    /// <param name="flightDescriptor">The FlightDescriptor describing the 
Flight data.</param>
+    /// <returns>An IAsyncStreamReader of FlightData.</returns>
+    public static IAsyncStreamReader<FlightData> ToFlightDataStream(this 
RecordBatch recordBatch, FlightDescriptor flightDescriptor)
+    {
+        if (recordBatch == null) throw new 
ArgumentNullException(nameof(recordBatch));
+        if (flightDescriptor == null) throw new 
ArgumentNullException(nameof(flightDescriptor));
+
+        var channel = Channel.CreateUnbounded<FlightData>();
+
+        try
+        {
+            if (recordBatch.Schema == null || 
!recordBatch.Schema.FieldsList.Any())
+            {
+                throw new InvalidOperationException("The record batch has an 
invalid or empty schema.");
+            }
+
+            using var memoryStream = new MemoryStream();
+            using var writer = new ArrowStreamWriter(memoryStream, 
recordBatch.Schema);
+            writer.WriteRecordBatch(recordBatch);
+            writer.WriteEnd();
+            memoryStream.Position = 0;
+            var flightData = new FlightData(flightDescriptor, 
ByteString.CopyFrom(memoryStream.ToArray()), ByteString.Empty, 
ByteString.Empty);
+            if (flightData.DataBody.IsEmpty)
+            {
+                throw new InvalidOperationException(
+                    "The generated FlightData is empty. Check the RecordBatch 
content.");
+            }
+
+            channel.Writer.TryWrite(flightData);

Review Comment:
   What would it mean if this returned `false`?



##########
csharp/src/Apache.Arrow.Flight.Sql/RecordBatchExtensions.cs:
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+using Google.Protobuf;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public static class RecordBatchExtensions
+{
+    /// <summary>
+    /// Converts a RecordBatch into an asynchronous stream of FlightData.
+    /// </summary>
+    /// <param name="recordBatch">The RecordBatch to convert.</param>
+    /// <param name="flightDescriptor">The FlightDescriptor describing the 
Flight data.</param>
+    /// <returns>An asynchronous stream of FlightData objects.</returns>
+    public static async IAsyncEnumerable<FlightData> 
ToFlightDataStreamAsync(this RecordBatch recordBatch,
+        FlightDescriptor flightDescriptor)
+    {
+        if (recordBatch == null)
+        {
+            throw new ArgumentNullException(nameof(recordBatch));
+        }
+
+        // Use a memory stream to write the Arrow RecordBatch into FlightData 
format
+        using var memoryStream = new MemoryStream();
+        var writer = new ArrowStreamWriter(memoryStream, recordBatch.Schema);
+
+        // Write the RecordBatch to the stream
+        await writer.WriteRecordBatchAsync(recordBatch).ConfigureAwait(false);
+        await writer.WriteEndAsync().ConfigureAwait(false);
+
+        // Reset the memory stream position
+        memoryStream.Position = 0;
+
+        // Read back the data to create FlightData
+        var flightData = new FlightData(flightDescriptor, 
ByteString.CopyFrom(memoryStream.ToArray()),
+            ByteString.CopyFrom(memoryStream.ToArray()));
+        yield return flightData;
+    }
+
+    /// <summary>
+    /// Converts a RecordBatch into an IAsyncStreamReader<FlightData>.
+    /// </summary>
+    /// <param name="recordBatch">The RecordBatch to convert.</param>
+    /// <param name="flightDescriptor">The FlightDescriptor describing the 
Flight data.</param>
+    /// <returns>An IAsyncStreamReader of FlightData.</returns>
+    public static IAsyncStreamReader<FlightData> ToFlightDataStream(this 
RecordBatch recordBatch, FlightDescriptor flightDescriptor)
+    {
+        if (recordBatch == null) throw new 
ArgumentNullException(nameof(recordBatch));
+        if (flightDescriptor == null) throw new 
ArgumentNullException(nameof(flightDescriptor));
+
+        var channel = Channel.CreateUnbounded<FlightData>();
+
+        try
+        {
+            if (recordBatch.Schema == null || 
!recordBatch.Schema.FieldsList.Any())
+            {
+                throw new InvalidOperationException("The record batch has an 
invalid or empty schema.");
+            }
+
+            using var memoryStream = new MemoryStream();
+            using var writer = new ArrowStreamWriter(memoryStream, 
recordBatch.Schema);
+            writer.WriteRecordBatch(recordBatch);
+            writer.WriteEnd();
+            memoryStream.Position = 0;
+            var flightData = new FlightData(flightDescriptor, 
ByteString.CopyFrom(memoryStream.ToArray()), ByteString.Empty, 
ByteString.Empty);
+            if (flightData.DataBody.IsEmpty)

Review Comment:
   In addition to the same comment about `ByteString.CopyFrom`, it should be 
possible to check for an empty body before creating the `FlightData` by looking 
at `memoryStream.Length`. Does this length check belong in the async version 
too?



##########
csharp/src/Apache.Arrow.Flight.Sql/FlightExtensions.cs:
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Apache.Arrow.Flight.Sql;
+
+internal static class FlightExtensions
+{
+    public static byte[] PackAndSerialize(this IMessage command) => 
Any.Pack(command).ToByteArray();
+    public static T ParseAndUnpack<T>(this ByteString source) where T : 
IMessage<T>, new() => Any.Parser.ParseFrom(source).Unpack<T>();
+
+    public static IEnumerable<long> ExtractRowCount(this RecordBatch batch)
+    {
+        foreach (var array in batch.Arrays)
+        {
+            var values = ExtractValues(array);
+            foreach (var value in values)
+            {
+                yield return value switch
+                {
+                    long l => l,
+                    int i => i != 0 ? i : 0,

Review Comment:
   Isn't this just `int i => i`?



##########
csharp/src/Apache.Arrow.Flight.Sql/FlightCallOptions.cs:
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Buffers;
+using System.Threading;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public class FlightCallOptions
+{
+    public FlightCallOptions()
+    {
+        Timeout = TimeSpan.FromSeconds(-1);
+    }
+    // Implement any necessary options for RPC calls
+    public Metadata Headers { get; set; } = new();
+
+    /// <summary>
+    /// Gets or sets a token to enable interactive user cancellation of 
long-running requests.
+    /// </summary>
+    public CancellationToken StopToken { get; set; }

Review Comment:
   Nothing references `StopToken`



##########
csharp/src/Apache.Arrow.Flight.Sql/FlightCallOptions.cs:
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Buffers;
+using System.Threading;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public class FlightCallOptions
+{
+    public FlightCallOptions()
+    {
+        Timeout = TimeSpan.FromSeconds(-1);
+    }

Review Comment:
   nit; insert blank line after closing bracket



##########
csharp/src/Apache.Arrow.Flight.Sql/FlightExtensions.cs:
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Apache.Arrow.Flight.Sql;
+
+internal static class FlightExtensions
+{
+    public static byte[] PackAndSerialize(this IMessage command) => 
Any.Pack(command).ToByteArray();
+    public static T ParseAndUnpack<T>(this ByteString source) where T : 
IMessage<T>, new() => Any.Parser.ParseFrom(source).Unpack<T>();
+
+    public static IEnumerable<long> ExtractRowCount(this RecordBatch batch)

Review Comment:
   I have to admit, I can't tell what the semantics of this are supposed to be 
:(. I did a quick search to see what the result of `ExecuteUpdate` was supposed 
to look like but that didn't help. Putting the explanation in a comment might 
be worthwhile.



##########
csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlPreparedStatementTests.cs:
##########
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Client;
+using Apache.Arrow.Flight.Sql.Client;
+using Apache.Arrow.Flight.TestWeb;
+using Apache.Arrow.Types;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Grpc.Core;
+using Xunit;
+
+namespace Apache.Arrow.Flight.Sql.Tests
+{
+    public class FlightSqlPreparedStatementTests
+    {
+        readonly TestFlightSqlWebFactory _testWebFactory;
+        readonly FlightStore _flightStore;
+        readonly FlightSqlClient _flightSqlClient;
+        private readonly PreparedStatement _preparedStatement;
+        private readonly Schema _schema;
+        private readonly FlightDescriptor _flightDescriptor;
+        private readonly RecordBatch _parameterBatch;
+
+        public FlightSqlPreparedStatementTests()
+        {
+            _flightStore = new FlightStore();
+            _testWebFactory = new TestFlightSqlWebFactory(_flightStore);
+            _flightSqlClient = new FlightSqlClient(new 
FlightClient(_testWebFactory.GetChannel()));
+
+            _flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test-query");
+            _schema = CreateSchema();
+            _parameterBatch = CreateParameterBatch();
+            _preparedStatement = new PreparedStatement(_flightSqlClient, 
"test-handle-guid", _schema, _schema);
+        }
+
+        private static Schema CreateSchema()
+        {
+            return new Schema.Builder()
+                .Field(f => 
f.Name("DATA_TYPE_ID").DataType(Int32Type.Default).Nullable(false))
+                .Build();
+        }
+
+        private RecordBatch CreateParameterBatch()
+        {
+            return new RecordBatch(_schema,
+                new IArrowArray[]
+                {
+                    new Int32Array.Builder().AppendRange(new[] { 1, 2, 3 
}).Build(),
+                    new StringArray.Builder().AppendRange(new[] { "INTEGER", 
"VARCHAR", "BOOLEAN" }).Build(),
+                    new Int32Array.Builder().AppendRange(new[] { 32, 255, 1 
}).Build()
+                }, 3);
+        }
+        
+        

Review Comment:
   nit: extra blank line



##########
csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlTestUtils.cs:
##########
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System.Linq;
+using Apache.Arrow.Flight.Tests;
+using Apache.Arrow.Flight.TestWeb;
+
+namespace Apache.Arrow.Flight.Sql.Tests;
+
+public class FlightSqlTestUtils
+{
+    private readonly TestFlightSqlWebFactory _testWebFactory;
+    private readonly FlightStore _flightStore;
+
+    public FlightSqlTestUtils(TestFlightSqlWebFactory testWebFactory, 
FlightStore flightStore)
+    {
+        _testWebFactory = testWebFactory;
+        _flightStore = flightStore;
+    }
+
+    public RecordBatch CreateTestBatch(int startValue, int length) 
+    {
+        var batchBuilder = new RecordBatch.Builder();
+        Int32Array.Builder builder = new();
+        for (int i = 0; i < length; i++)
+        {
+            builder.Append(startValue + i);
+        }
+
+        batchBuilder.Append("test", true, builder.Build());
+        return batchBuilder.Build();
+    }
+
+

Review Comment:
   nit: extra blank line



##########
csharp/src/Apache.Arrow.Flight.Sql/FlightExtensions.cs:
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Apache.Arrow.Flight.Sql;
+
+internal static class FlightExtensions
+{
+    public static byte[] PackAndSerialize(this IMessage command) => 
Any.Pack(command).ToByteArray();
+    public static T ParseAndUnpack<T>(this ByteString source) where T : 
IMessage<T>, new() => Any.Parser.ParseFrom(source).Unpack<T>();
+
+    public static IEnumerable<long> ExtractRowCount(this RecordBatch batch)
+    {
+        foreach (var array in batch.Arrays)
+        {
+            var values = ExtractValues(array);
+            foreach (var value in values)
+            {
+                yield return value switch
+                {
+                    long l => l,
+                    int i => i != 0 ? i : 0,
+                    _ => 0L
+                };
+            }
+        }
+    }
+    
+    private static IEnumerable<object?> ExtractValues(IArrowArray array)
+    {
+        return array switch
+        {
+            Int32Array int32Array => ExtractPrimitiveValues(int32Array),
+            Int64Array int64Array => ExtractPrimitiveValues(int64Array),
+            FloatArray floatArray => ExtractPrimitiveValues(floatArray),
+            BooleanArray booleanArray => ExtractBooleanValues(booleanArray),
+            StringArray stringArray => ExtractStringValues(stringArray),
+            _ => throw new NotSupportedException($"Array type 
{array.GetType().Name} is not supported.")
+        };
+    }
+    
+    private static IEnumerable<object?> 
ExtractPrimitiveValues<T>(PrimitiveArray<T> array) where T : struct, 
IEquatable<T>
+    {

Review Comment:
   The underlying arrays in all three cases should already be `IEnumerable<>`. 
Can they not be used directly?



##########
csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlClientTests.cs:
##########
@@ -0,0 +1,855 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Client;
+using Apache.Arrow.Flight.Sql.Client;
+using Apache.Arrow.Flight.TestWeb;
+using Apache.Arrow.Types;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Grpc.Core.Utils;
+using Xunit;
+
+namespace Apache.Arrow.Flight.Sql.Tests;
+
+public class FlightSqlClientTests : IDisposable
+{
+    readonly TestFlightSqlWebFactory _testWebFactory;
+    readonly FlightStore _flightStore;
+    private readonly FlightSqlClient _flightSqlClient;
+    private readonly FlightSqlTestUtils _testUtils;
+
+    public FlightSqlClientTests()
+    {
+        _flightStore = new FlightStore();
+        _testWebFactory = new TestFlightSqlWebFactory(_flightStore);
+        FlightClient flightClient = new(_testWebFactory.GetChannel());
+        _flightSqlClient = new FlightSqlClient(flightClient);
+
+        _testUtils = new FlightSqlTestUtils(_testWebFactory, _flightStore);
+    }
+
+    #region Transactions
+
+    [Fact]
+    public async Task CommitTransactionAsync()
+    {
+        // Arrange
+        string transactionId = "sample-transaction-id";
+        var transaction = new Transaction(transactionId);
+
+        // Act
+        var streamCall = _flightSqlClient.CommitAsync(transaction);
+        var result = await streamCall.ResponseStream.ToListAsync();
+
+        // Assert
+        Assert.NotNull(result);
+        Assert.Equal(transaction.TransactionId, result.FirstOrDefault()?.Body);
+    }
+
+    [Fact]
+    public async Task BeginTransactionAsync()
+    {
+        // Arrange
+        string expectedTransactionId = "sample-transaction-id";
+
+        // Act
+        var transaction = await _flightSqlClient.BeginTransactionAsync();
+
+        // Assert
+        Assert.NotNull(transaction);
+        Assert.Equal(ByteString.CopyFromUtf8(expectedTransactionId), 
transaction.TransactionId);
+    }
+
+    [Fact]
+    public async Task RollbackTransactionAsync()
+    {
+        // Arrange
+        string transactionId = "sample-transaction-id";
+        var transaction = new Transaction(transactionId);
+
+        // Act
+        var streamCall = _flightSqlClient.RollbackAsync(transaction);
+        var result = await streamCall.ResponseStream.ToListAsync();
+
+        // Assert
+        Assert.NotNull(transaction);
+        Assert.Equal(result.FirstOrDefault()?.Body, transaction.TransactionId);
+    }
+
+    #endregion
+
+    #region PreparedStatement
+
+    [Fact]
+    public async Task PreparedAsync()
+    {
+        // Arrange
+        string query = "INSERT INTO users (id, name) VALUES (1, 'John Doe')";
+        var transaction = new Transaction("sample-transaction-id");
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+
+        // Create a sample schema for the dataset and parameters
+        var schema = new Schema.Builder()
+            .Field(f => f.Name("id").DataType(Int32Type.Default))
+            .Field(f => f.Name("name").DataType(StringType.Default))
+            .Build();
+
+        var recordBatch = new RecordBatch(schema, new Array[]
+        {
+            new Int32Array.Builder().Append(1).Build(),
+            new StringArray.Builder().Append("John Doe").Build()
+        }, 1);
+        
+        var flightHolder = new FlightHolder(flightDescriptor, schema, 
_testWebFactory.GetAddress());
+        flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch));
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+        
+        var datasetSchemaBytes = SchemaExtensions.SerializeSchema(schema);
+        var parameterSchemaBytes = SchemaExtensions.SerializeSchema(schema);
+        
+        var preparedStatementResponse = new ActionCreatePreparedStatementResult
+        {
+            PreparedStatementHandle = 
ByteString.CopyFromUtf8("prepared-handle"),
+            DatasetSchema = ByteString.CopyFrom(datasetSchemaBytes),
+            ParameterSchema = ByteString.CopyFrom(parameterSchemaBytes)
+        };
+        
+        // Act
+        var preparedStatement = await _flightSqlClient.PrepareAsync(query, 
transaction);
+        var deserializedDatasetSchema = 
SchemaExtensions.DeserializeSchema(preparedStatementResponse.DatasetSchema.ToByteArray());
+        var deserializedParameterSchema = 
SchemaExtensions.DeserializeSchema(preparedStatementResponse.ParameterSchema.ToByteArray());
+
+        // Assert
+        Assert.NotNull(preparedStatement);
+        Assert.NotNull(deserializedDatasetSchema);
+        Assert.NotNull(deserializedParameterSchema);
+        CompareSchemas(schema, deserializedDatasetSchema);
+        CompareSchemas(schema, deserializedParameterSchema);
+    }
+
+    #endregion
+
+    [Fact]
+    public async Task ExecuteUpdateAsync()
+    {
+        // Arrange
+        string query = "UPDATE test_table SET column1 = 'value' WHERE column2 
= 'condition'";
+        var transaction = new Transaction("sample-transaction-id");
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        
+        var schema = new Schema.Builder()
+            .Field(f => f.Name("id").DataType(Int32Type.Default))
+            .Field(f => f.Name("name").DataType(StringType.Default))
+            .Build();
+
+        var recordBatch = new RecordBatch(schema, new Array[]
+        {
+            new Int32Array.Builder().Append(1).Build(),
+            new StringArray.Builder().Append("John Doe").Build()
+        }, 1);
+        
+
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema, _testWebFactory.GetAddress());
+        flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch));
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+
+        // Act
+        long affectedRows = await _flightSqlClient.ExecuteUpdateAsync(query, 
transaction);
+
+        // Assert
+        Assert.Equal(1, affectedRows);
+    }
+
+    [Fact]
+    public async Task ExecuteAsync()
+    {
+        // Arrange
+        string query = "SELECT * FROM test_table";
+        var transaction = new Transaction("sample-transaction-id");
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        var recordBatch = _testUtils.CreateTestBatch(0, 100);
+
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema, _testWebFactory.GetAddress());
+        flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch));
+
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+
+        // Act
+        var flightInfo = await _flightSqlClient.ExecuteAsync(query, 
transaction);
+
+        // Assert
+        Assert.NotNull(flightInfo);
+        Assert.Single(flightInfo.Endpoints);
+    }
+
+    [Fact]
+    public async Task 
ExecuteAsync_ShouldReturnFlightInfo_WhenValidInputsAreProvided()
+    {
+        // Arrange
+        string query = "SELECT * FROM test_table";
+        var transaction = new Transaction("sample-transaction-id");
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        var recordBatch = _testUtils.CreateTestBatch(0, 100);
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema,
+            _testWebFactory.GetAddress());
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+        
+        // Act
+        var flightInfo = await _flightSqlClient.ExecuteAsync(query, 
transaction);
+
+        // Assert
+        Assert.NotNull(flightInfo);
+        Assert.IsType<FlightInfo>(flightInfo);
+    }
+    
+    [Fact]
+    public async Task 
ExecuteAsync_ShouldThrowArgumentException_WhenQueryIsEmpty()
+    {
+        // Arrange
+        string emptyQuery = string.Empty;
+        var transaction = new Transaction("sample-transaction-id");
+
+        // Act & Assert
+        await Assert.ThrowsAsync<ArgumentException>(async () =>
+            await _flightSqlClient.ExecuteAsync(emptyQuery, transaction));
+    }
+    
+    [Fact]
+    public async Task 
ExecuteAsync_ShouldReturnFlightInfo_WhenTransactionIsNoTransaction()
+    {
+        // Arrange
+        string query = "SELECT * FROM test_table";
+        var transaction = Transaction.NoTransaction;
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        var recordBatch = _testUtils.CreateTestBatch(0, 100);
+        var flightHolder = new FlightHolder(flightDescriptor, 
recordBatch.Schema,
+            _testWebFactory.GetAddress());
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+        
+        // Act
+        var flightInfo = await _flightSqlClient.ExecuteAsync(query, 
transaction);
+
+        // Assert
+        Assert.NotNull(flightInfo);
+        Assert.IsType<FlightInfo>(flightInfo);
+    }
+
+    

Review Comment:
   nit: extra blank line



##########
csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlClientTests.cs:
##########
@@ -0,0 +1,855 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Client;
+using Apache.Arrow.Flight.Sql.Client;
+using Apache.Arrow.Flight.TestWeb;
+using Apache.Arrow.Types;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Grpc.Core.Utils;
+using Xunit;
+
+namespace Apache.Arrow.Flight.Sql.Tests;
+
+public class FlightSqlClientTests : IDisposable
+{
+    readonly TestFlightSqlWebFactory _testWebFactory;
+    readonly FlightStore _flightStore;
+    private readonly FlightSqlClient _flightSqlClient;
+    private readonly FlightSqlTestUtils _testUtils;
+
+    public FlightSqlClientTests()
+    {
+        _flightStore = new FlightStore();
+        _testWebFactory = new TestFlightSqlWebFactory(_flightStore);
+        FlightClient flightClient = new(_testWebFactory.GetChannel());
+        _flightSqlClient = new FlightSqlClient(flightClient);
+
+        _testUtils = new FlightSqlTestUtils(_testWebFactory, _flightStore);
+    }
+
+    #region Transactions
+
+    [Fact]
+    public async Task CommitTransactionAsync()
+    {
+        // Arrange
+        string transactionId = "sample-transaction-id";
+        var transaction = new Transaction(transactionId);
+
+        // Act
+        var streamCall = _flightSqlClient.CommitAsync(transaction);
+        var result = await streamCall.ResponseStream.ToListAsync();
+
+        // Assert
+        Assert.NotNull(result);
+        Assert.Equal(transaction.TransactionId, result.FirstOrDefault()?.Body);
+    }
+
+    [Fact]
+    public async Task BeginTransactionAsync()
+    {
+        // Arrange
+        string expectedTransactionId = "sample-transaction-id";
+
+        // Act
+        var transaction = await _flightSqlClient.BeginTransactionAsync();
+
+        // Assert
+        Assert.NotNull(transaction);
+        Assert.Equal(ByteString.CopyFromUtf8(expectedTransactionId), 
transaction.TransactionId);
+    }
+
+    [Fact]
+    public async Task RollbackTransactionAsync()
+    {
+        // Arrange
+        string transactionId = "sample-transaction-id";
+        var transaction = new Transaction(transactionId);
+
+        // Act
+        var streamCall = _flightSqlClient.RollbackAsync(transaction);
+        var result = await streamCall.ResponseStream.ToListAsync();
+
+        // Assert
+        Assert.NotNull(transaction);
+        Assert.Equal(result.FirstOrDefault()?.Body, transaction.TransactionId);
+    }
+
+    #endregion
+
+    #region PreparedStatement
+
+    [Fact]
+    public async Task PreparedAsync()
+    {
+        // Arrange
+        string query = "INSERT INTO users (id, name) VALUES (1, 'John Doe')";
+        var transaction = new Transaction("sample-transaction-id");
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+
+        // Create a sample schema for the dataset and parameters
+        var schema = new Schema.Builder()
+            .Field(f => f.Name("id").DataType(Int32Type.Default))
+            .Field(f => f.Name("name").DataType(StringType.Default))
+            .Build();
+
+        var recordBatch = new RecordBatch(schema, new Array[]
+        {
+            new Int32Array.Builder().Append(1).Build(),
+            new StringArray.Builder().Append("John Doe").Build()
+        }, 1);
+        
+        var flightHolder = new FlightHolder(flightDescriptor, schema, 
_testWebFactory.GetAddress());
+        flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch));
+        _flightStore.Flights.Add(flightDescriptor, flightHolder);
+        
+        var datasetSchemaBytes = SchemaExtensions.SerializeSchema(schema);
+        var parameterSchemaBytes = SchemaExtensions.SerializeSchema(schema);
+        
+        var preparedStatementResponse = new ActionCreatePreparedStatementResult
+        {
+            PreparedStatementHandle = 
ByteString.CopyFromUtf8("prepared-handle"),
+            DatasetSchema = ByteString.CopyFrom(datasetSchemaBytes),
+            ParameterSchema = ByteString.CopyFrom(parameterSchemaBytes)
+        };
+        
+        // Act
+        var preparedStatement = await _flightSqlClient.PrepareAsync(query, 
transaction);
+        var deserializedDatasetSchema = 
SchemaExtensions.DeserializeSchema(preparedStatementResponse.DatasetSchema.ToByteArray());
+        var deserializedParameterSchema = 
SchemaExtensions.DeserializeSchema(preparedStatementResponse.ParameterSchema.ToByteArray());
+
+        // Assert
+        Assert.NotNull(preparedStatement);
+        Assert.NotNull(deserializedDatasetSchema);
+        Assert.NotNull(deserializedParameterSchema);
+        CompareSchemas(schema, deserializedDatasetSchema);
+        CompareSchemas(schema, deserializedParameterSchema);
+    }
+
+    #endregion
+
+    [Fact]
+    public async Task ExecuteUpdateAsync()
+    {
+        // Arrange
+        string query = "UPDATE test_table SET column1 = 'value' WHERE column2 
= 'condition'";
+        var transaction = new Transaction("sample-transaction-id");
+        var flightDescriptor = 
FlightDescriptor.CreateCommandDescriptor("test");
+        
+        var schema = new Schema.Builder()
+            .Field(f => f.Name("id").DataType(Int32Type.Default))
+            .Field(f => f.Name("name").DataType(StringType.Default))
+            .Build();
+
+        var recordBatch = new RecordBatch(schema, new Array[]
+        {
+            new Int32Array.Builder().Append(1).Build(),
+            new StringArray.Builder().Append("John Doe").Build()
+        }, 1);
+        
+

Review Comment:
   nit: extra blank line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to