HackPoint commented on code in PR #44783:
URL: https://github.com/apache/arrow/pull/44783#discussion_r1968066965


##########
csharp/src/Apache.Arrow.Flight.Sql/RecordBatchExtensions.cs:
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+using Google.Protobuf;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public static class RecordBatchExtensions
+{
+    /// <summary>
+    /// Converts a RecordBatch into an asynchronous stream of FlightData.
+    /// </summary>
+    /// <param name="recordBatch">The RecordBatch to convert.</param>
+    /// <param name="flightDescriptor">The FlightDescriptor describing the 
Flight data.</param>
+    /// <returns>An asynchronous stream of FlightData objects.</returns>
+    public static async IAsyncEnumerable<FlightData> 
ToFlightDataStreamAsync(this RecordBatch recordBatch,
+        FlightDescriptor flightDescriptor)
+    {
+        if (recordBatch == null)
+        {
+            throw new ArgumentNullException(nameof(recordBatch));
+        }
+
+        // Use a memory stream to write the Arrow RecordBatch into FlightData 
format
+        using var memoryStream = new MemoryStream();
+        var writer = new ArrowStreamWriter(memoryStream, recordBatch.Schema);
+
+        // Write the RecordBatch to the stream
+        await writer.WriteRecordBatchAsync(recordBatch).ConfigureAwait(false);
+        await writer.WriteEndAsync().ConfigureAwait(false);
+
+        // Reset the memory stream position
+        memoryStream.Position = 0;
+
+        // Read back the data to create FlightData
+        var flightData = new FlightData(flightDescriptor, 
ByteString.CopyFrom(memoryStream.ToArray()),
+            ByteString.CopyFrom(memoryStream.ToArray()));
+        yield return flightData;
+    }
+
+    /// <summary>
+    /// Converts a RecordBatch into an IAsyncStreamReader<FlightData>.
+    /// </summary>
+    /// <param name="recordBatch">The RecordBatch to convert.</param>
+    /// <param name="flightDescriptor">The FlightDescriptor describing the 
Flight data.</param>
+    /// <returns>An IAsyncStreamReader of FlightData.</returns>
+    public static IAsyncStreamReader<FlightData> ToFlightDataStream(this 
RecordBatch recordBatch, FlightDescriptor flightDescriptor)
+    {
+        if (recordBatch == null) throw new 
ArgumentNullException(nameof(recordBatch));
+        if (flightDescriptor == null) throw new 
ArgumentNullException(nameof(flightDescriptor));
+
+        var channel = Channel.CreateUnbounded<FlightData>();
+
+        try
+        {
+            if (recordBatch.Schema == null || 
!recordBatch.Schema.FieldsList.Any())
+            {
+                throw new InvalidOperationException("The record batch has an 
invalid or empty schema.");
+            }
+
+            using var memoryStream = new MemoryStream();
+            using var writer = new ArrowStreamWriter(memoryStream, 
recordBatch.Schema);
+            writer.WriteRecordBatch(recordBatch);
+            writer.WriteEnd();
+            memoryStream.Position = 0;
+            var flightData = new FlightData(flightDescriptor, 
ByteString.CopyFrom(memoryStream.ToArray()), ByteString.Empty, 
ByteString.Empty);
+            if (flightData.DataBody.IsEmpty)
+            {
+                throw new InvalidOperationException(
+                    "The generated FlightData is empty. Check the RecordBatch 
content.");
+            }
+
+            channel.Writer.TryWrite(flightData);

Review Comment:
   Implementation is deleted



##########
csharp/src/Apache.Arrow.Flight.Sql/RecordBatchExtensions.cs:
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+using Google.Protobuf;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Sql;
+
+public static class RecordBatchExtensions
+{
+    /// <summary>
+    /// Converts a RecordBatch into an asynchronous stream of FlightData.
+    /// </summary>
+    /// <param name="recordBatch">The RecordBatch to convert.</param>
+    /// <param name="flightDescriptor">The FlightDescriptor describing the 
Flight data.</param>
+    /// <returns>An asynchronous stream of FlightData objects.</returns>
+    public static async IAsyncEnumerable<FlightData> 
ToFlightDataStreamAsync(this RecordBatch recordBatch,
+        FlightDescriptor flightDescriptor)
+    {
+        if (recordBatch == null)
+        {
+            throw new ArgumentNullException(nameof(recordBatch));
+        }
+
+        // Use a memory stream to write the Arrow RecordBatch into FlightData 
format
+        using var memoryStream = new MemoryStream();
+        var writer = new ArrowStreamWriter(memoryStream, recordBatch.Schema);
+
+        // Write the RecordBatch to the stream
+        await writer.WriteRecordBatchAsync(recordBatch).ConfigureAwait(false);
+        await writer.WriteEndAsync().ConfigureAwait(false);
+
+        // Reset the memory stream position
+        memoryStream.Position = 0;
+
+        // Read back the data to create FlightData
+        var flightData = new FlightData(flightDescriptor, 
ByteString.CopyFrom(memoryStream.ToArray()),
+            ByteString.CopyFrom(memoryStream.ToArray()));
+        yield return flightData;
+    }
+
+    /// <summary>
+    /// Converts a RecordBatch into an IAsyncStreamReader<FlightData>.
+    /// </summary>
+    /// <param name="recordBatch">The RecordBatch to convert.</param>
+    /// <param name="flightDescriptor">The FlightDescriptor describing the 
Flight data.</param>
+    /// <returns>An IAsyncStreamReader of FlightData.</returns>
+    public static IAsyncStreamReader<FlightData> ToFlightDataStream(this 
RecordBatch recordBatch, FlightDescriptor flightDescriptor)
+    {
+        if (recordBatch == null) throw new 
ArgumentNullException(nameof(recordBatch));
+        if (flightDescriptor == null) throw new 
ArgumentNullException(nameof(flightDescriptor));
+
+        var channel = Channel.CreateUnbounded<FlightData>();
+
+        try
+        {
+            if (recordBatch.Schema == null || 
!recordBatch.Schema.FieldsList.Any())
+            {
+                throw new InvalidOperationException("The record batch has an 
invalid or empty schema.");
+            }
+
+            using var memoryStream = new MemoryStream();
+            using var writer = new ArrowStreamWriter(memoryStream, 
recordBatch.Schema);
+            writer.WriteRecordBatch(recordBatch);
+            writer.WriteEnd();
+            memoryStream.Position = 0;
+            var flightData = new FlightData(flightDescriptor, 
ByteString.CopyFrom(memoryStream.ToArray()), ByteString.Empty, 
ByteString.Empty);
+            if (flightData.DataBody.IsEmpty)

Review Comment:
   Implementation is deleted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to