This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new 3380d96  feat: Allow empty streams to return an empty schema 
asynchronously. (#81)
3380d96 is described below

commit 3380d96512346bb2852869cb5e1a691cfd9828ad
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Tue Sep 23 12:16:22 2025 -0700

    feat: Allow empty streams to return an empty schema asynchronously. (#81)
    
    ## What's Changed
    
    When using an empty stream with `ArrowStreamReader`, allow an empty
    stream to return an empty schema asynchronously without using a
    synchronous code path.
    
    Closes #80
---
 .../Internal/RecordBatchReaderImplementation.cs    |  3 ++-
 .../Ipc/ArrowFileReaderImplementation.cs           |  6 +++--
 .../Ipc/ArrowMemoryReaderImplementation.cs         |  4 ++--
 src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs  |  2 +-
 src/Apache.Arrow/Ipc/ArrowStreamReader.cs          |  6 ++---
 .../Ipc/ArrowStreamReaderImplementation.cs         |  7 +++---
 test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs  | 26 ++++++++++++++++++++++
 7 files changed, 42 insertions(+), 12 deletions(-)

diff --git 
a/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs 
b/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs
index e3353e8..cb64867 100644
--- a/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs
+++ b/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs
@@ -67,7 +67,7 @@ namespace Apache.Arrow.Flight.Internal
             ReadSchemaAsync(CancellationToken.None).AsTask().Wait();
         }
 
-        public override async ValueTask ReadSchemaAsync(CancellationToken 
cancellationToken)
+        public override async ValueTask<Schema> 
ReadSchemaAsync(CancellationToken cancellationToken)
         {
             while (!HasReadSchema)
             {
@@ -107,6 +107,7 @@ namespace Apache.Arrow.Flight.Internal
                         throw new Exception($"Expected schema as the first 
message, but got: {message.HeaderType.ToString()}");
                 }
             }
+            return _schema;
         }
 
         public override async ValueTask<RecordBatch> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken)
diff --git a/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs 
b/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
index 8cedea6..7486a51 100644
--- a/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
+++ b/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
@@ -52,11 +52,11 @@ namespace Apache.Arrow.Ipc
             return _footer.RecordBatchCount;
         }
 
-        public override async ValueTask ReadSchemaAsync(CancellationToken 
cancellationToken = default)
+        public override async ValueTask<Schema> 
ReadSchemaAsync(CancellationToken cancellationToken = default)
         {
             if (HasReadSchema)
             {
-                return;
+                return _schema;
             }
 
             await ValidateFileAsync(cancellationToken).ConfigureAwait(false);
@@ -82,6 +82,8 @@ namespace Apache.Arrow.Ipc
                 EnsureFullRead(buffer, bytesRead);
 
                 ReadSchema(buffer);
+
+                return _schema;
             }
         }
 
diff --git a/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs 
b/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
index 1a799b0..763a5d3 100644
--- a/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
+++ b/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
@@ -33,11 +33,11 @@ namespace Apache.Arrow.Ipc
             _buffer = buffer;
         }
 
-        public override ValueTask ReadSchemaAsync(CancellationToken 
cancellationToken)
+        public override ValueTask<Schema> ReadSchemaAsync(CancellationToken 
cancellationToken)
         {
             cancellationToken.ThrowIfCancellationRequested();
             ReadSchema();
-            return default;
+            return new ValueTask<Schema>(_schema);
         }
 
         public override ValueTask<RecordBatch> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken)
diff --git a/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs 
b/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
index d71478a..d5d3758 100644
--- a/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
+++ b/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
@@ -69,7 +69,7 @@ namespace Apache.Arrow.Ipc
         {
         }
 
-        public abstract ValueTask ReadSchemaAsync(CancellationToken 
cancellationToken);
+        public abstract ValueTask<Schema> ReadSchemaAsync(CancellationToken 
cancellationToken);
         public abstract void ReadSchema();
 
         public abstract ValueTask<RecordBatch> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken);
diff --git a/src/Apache.Arrow/Ipc/ArrowStreamReader.cs 
b/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
index dc342b2..540fb19 100644
--- a/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
+++ b/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
@@ -102,11 +102,11 @@ namespace Apache.Arrow.Ipc
 
         public async ValueTask<Schema> GetSchema(CancellationToken 
cancellationToken = default)
         {
-            if (!_implementation.HasReadSchema)
+            if (_implementation.HasReadSchema)
             {
-                await _implementation.ReadSchemaAsync(cancellationToken);
+                return _implementation.Schema;
             }
-            return _implementation.Schema;
+            return await _implementation.ReadSchemaAsync(cancellationToken);
         }
 
         public ValueTask<RecordBatch> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
diff --git a/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs 
b/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
index e2bfe4a..c1daae9 100644
--- a/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
+++ b/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
@@ -151,11 +151,11 @@ namespace Apache.Arrow.Ipc
             return new ReadResult(messageLength, result);
         }
 
-        public override async ValueTask ReadSchemaAsync(CancellationToken 
cancellationToken = default)
+        public override async ValueTask<Schema> 
ReadSchemaAsync(CancellationToken cancellationToken = default)
         {
             if (HasReadSchema)
             {
-                return;
+                return _schema;
             }
 
             // Figure out length of schema
@@ -163,7 +163,7 @@ namespace Apache.Arrow.Ipc
                 .ConfigureAwait(false);
             if (schemaMessageLength == 0)
             {
-                return;
+                return null;
             }
 
             using (ArrayPool<byte>.Shared.RentReturn(schemaMessageLength, out 
Memory<byte> buff))
@@ -174,6 +174,7 @@ namespace Apache.Arrow.Ipc
 
                 Google.FlatBuffers.ByteBuffer schemabb = 
CreateByteBuffer(buff);
                 _schema = 
MessageSerializer.GetSchema(ReadMessage<Flatbuf.Schema>(schemabb), ref 
_dictionaryMemo);
+                return _schema;
             }
         }
 
diff --git a/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs 
b/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
index cd8bb62..22ed233 100644
--- a/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
+++ b/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
@@ -315,5 +315,31 @@ namespace Apache.Arrow.Tests
                 }
             });
         }
+
+        [Fact]
+        public async Task EmptyStreamNoSyncRead()
+        {
+            using (var stream = new EmptyAsyncOnlyStream())
+            {
+                var reader = new ArrowStreamReader(stream);
+                var schema = await reader.GetSchema();
+                Assert.Null(schema);
+            }
+        }
+
+        private class EmptyAsyncOnlyStream : Stream
+        {
+            public override bool CanRead => true;
+            public override bool CanSeek => false;
+            public override bool CanWrite => false;
+            public override long Length => 0;
+            public override long Position { get => 0; set => throw new 
NotSupportedException(); }
+            public override void Flush() { }
+            public override int Read(byte[] buffer, int offset, int count) => 
throw new NotSupportedException();
+            public override long Seek(long offset, SeekOrigin origin) => throw 
new NotSupportedException();
+            public override void SetLength(long value) => throw new 
NotSupportedException();
+            public override void Write(byte[] buffer, int offset, int count) 
=> throw new NotSupportedException();
+            public override Task<int> ReadAsync(byte[] buffer, int offset, int 
count, CancellationToken cancellationToken) => Task.FromResult(0);
+        }
     }
 }

Reply via email to