This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-dotnet.git
The following commit(s) were added to refs/heads/main by this push:
new 3380d96 feat: Allow empty streams to return an empty schema
asynchronously. (#81)
3380d96 is described below
commit 3380d96512346bb2852869cb5e1a691cfd9828ad
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Tue Sep 23 12:16:22 2025 -0700
feat: Allow empty streams to return an empty schema asynchronously. (#81)
## What's Changed
When using an empty stream with `ArrowStreamReader`, allow an empty
stream to return an empty schema asynchronously without using a
synchronous code path.
Closes #80
---
.../Internal/RecordBatchReaderImplementation.cs | 3 ++-
.../Ipc/ArrowFileReaderImplementation.cs | 6 +++--
.../Ipc/ArrowMemoryReaderImplementation.cs | 4 ++--
src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs | 2 +-
src/Apache.Arrow/Ipc/ArrowStreamReader.cs | 6 ++---
.../Ipc/ArrowStreamReaderImplementation.cs | 7 +++---
test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs | 26 ++++++++++++++++++++++
7 files changed, 42 insertions(+), 12 deletions(-)
diff --git
a/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs
b/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs
index e3353e8..cb64867 100644
--- a/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs
+++ b/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs
@@ -67,7 +67,7 @@ namespace Apache.Arrow.Flight.Internal
ReadSchemaAsync(CancellationToken.None).AsTask().Wait();
}
- public override async ValueTask ReadSchemaAsync(CancellationToken
cancellationToken)
+ public override async ValueTask<Schema>
ReadSchemaAsync(CancellationToken cancellationToken)
{
while (!HasReadSchema)
{
@@ -107,6 +107,7 @@ namespace Apache.Arrow.Flight.Internal
throw new Exception($"Expected schema as the first
message, but got: {message.HeaderType.ToString()}");
}
}
+ return _schema;
}
public override async ValueTask<RecordBatch>
ReadNextRecordBatchAsync(CancellationToken cancellationToken)
diff --git a/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
b/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
index 8cedea6..7486a51 100644
--- a/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
+++ b/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
@@ -52,11 +52,11 @@ namespace Apache.Arrow.Ipc
return _footer.RecordBatchCount;
}
- public override async ValueTask ReadSchemaAsync(CancellationToken
cancellationToken = default)
+ public override async ValueTask<Schema>
ReadSchemaAsync(CancellationToken cancellationToken = default)
{
if (HasReadSchema)
{
- return;
+ return _schema;
}
await ValidateFileAsync(cancellationToken).ConfigureAwait(false);
@@ -82,6 +82,8 @@ namespace Apache.Arrow.Ipc
EnsureFullRead(buffer, bytesRead);
ReadSchema(buffer);
+
+ return _schema;
}
}
diff --git a/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
b/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
index 1a799b0..763a5d3 100644
--- a/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
+++ b/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
@@ -33,11 +33,11 @@ namespace Apache.Arrow.Ipc
_buffer = buffer;
}
- public override ValueTask ReadSchemaAsync(CancellationToken
cancellationToken)
+ public override ValueTask<Schema> ReadSchemaAsync(CancellationToken
cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ReadSchema();
- return default;
+ return new ValueTask<Schema>(_schema);
}
public override ValueTask<RecordBatch>
ReadNextRecordBatchAsync(CancellationToken cancellationToken)
diff --git a/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
b/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
index d71478a..d5d3758 100644
--- a/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
+++ b/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
@@ -69,7 +69,7 @@ namespace Apache.Arrow.Ipc
{
}
- public abstract ValueTask ReadSchemaAsync(CancellationToken
cancellationToken);
+ public abstract ValueTask<Schema> ReadSchemaAsync(CancellationToken
cancellationToken);
public abstract void ReadSchema();
public abstract ValueTask<RecordBatch>
ReadNextRecordBatchAsync(CancellationToken cancellationToken);
diff --git a/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
b/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
index dc342b2..540fb19 100644
--- a/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
+++ b/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
@@ -102,11 +102,11 @@ namespace Apache.Arrow.Ipc
public async ValueTask<Schema> GetSchema(CancellationToken
cancellationToken = default)
{
- if (!_implementation.HasReadSchema)
+ if (_implementation.HasReadSchema)
{
- await _implementation.ReadSchemaAsync(cancellationToken);
+ return _implementation.Schema;
}
- return _implementation.Schema;
+ return await _implementation.ReadSchemaAsync(cancellationToken);
}
public ValueTask<RecordBatch>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
diff --git a/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
b/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
index e2bfe4a..c1daae9 100644
--- a/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
+++ b/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
@@ -151,11 +151,11 @@ namespace Apache.Arrow.Ipc
return new ReadResult(messageLength, result);
}
- public override async ValueTask ReadSchemaAsync(CancellationToken
cancellationToken = default)
+ public override async ValueTask<Schema>
ReadSchemaAsync(CancellationToken cancellationToken = default)
{
if (HasReadSchema)
{
- return;
+ return _schema;
}
// Figure out length of schema
@@ -163,7 +163,7 @@ namespace Apache.Arrow.Ipc
.ConfigureAwait(false);
if (schemaMessageLength == 0)
{
- return;
+ return null;
}
using (ArrayPool<byte>.Shared.RentReturn(schemaMessageLength, out
Memory<byte> buff))
@@ -174,6 +174,7 @@ namespace Apache.Arrow.Ipc
Google.FlatBuffers.ByteBuffer schemabb =
CreateByteBuffer(buff);
_schema =
MessageSerializer.GetSchema(ReadMessage<Flatbuf.Schema>(schemabb), ref
_dictionaryMemo);
+ return _schema;
}
}
diff --git a/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
b/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
index cd8bb62..22ed233 100644
--- a/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
+++ b/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
@@ -315,5 +315,31 @@ namespace Apache.Arrow.Tests
}
});
}
+
+ [Fact]
+ public async Task EmptyStreamNoSyncRead()
+ {
+ using (var stream = new EmptyAsyncOnlyStream())
+ {
+ var reader = new ArrowStreamReader(stream);
+ var schema = await reader.GetSchema();
+ Assert.Null(schema);
+ }
+ }
+
+ private class EmptyAsyncOnlyStream : Stream
+ {
+ public override bool CanRead => true;
+ public override bool CanSeek => false;
+ public override bool CanWrite => false;
+ public override long Length => 0;
+ public override long Position { get => 0; set => throw new
NotSupportedException(); }
+ public override void Flush() { }
+ public override int Read(byte[] buffer, int offset, int count) =>
throw new NotSupportedException();
+ public override long Seek(long offset, SeekOrigin origin) => throw
new NotSupportedException();
+ public override void SetLength(long value) => throw new
NotSupportedException();
+ public override void Write(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
+ public override Task<int> ReadAsync(byte[] buffer, int offset, int
count, CancellationToken cancellationToken) => Task.FromResult(0);
+ }
}
}