This is an automated email from the ASF dual-hosted git repository.
CurtHagenlocher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-dotnet.git
The following commit(s) were added to refs/heads/main by this push:
new 4f60cc6 Refactors VariantValueWriter (#329)
4f60cc6 is described below
commit 4f60cc69fc7cc2d178e5e666d9799ac178417a28
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Sun Apr 26 07:45:02 2026 -0700
Refactors VariantValueWriter (#329)
## What's Changed
Refactors `VariantValueWriter` to use a new `Buffer<>` type to manage
the output buffers instead of using `MemoryStream`.
---
.../VariantJson/VariantJsonReader.cs | 2 +-
src/Apache.Arrow.Scalars/Variant/Buffer.cs | 229 ++++++++++
src/Apache.Arrow.Scalars/Variant/VariantBuilder.cs | 2 +-
.../Variant/VariantValueWriter.cs | 493 +++++++++++----------
src/Apache.Arrow/Arrays/GuidArray.cs | 10 +-
.../VariantBuilderTests.cs | 15 +
6 files changed, 502 insertions(+), 249 deletions(-)
diff --git a/src/Apache.Arrow.Operations/VariantJson/VariantJsonReader.cs
b/src/Apache.Arrow.Operations/VariantJson/VariantJsonReader.cs
index e6487f0..0c51c5d 100644
--- a/src/Apache.Arrow.Operations/VariantJson/VariantJsonReader.cs
+++ b/src/Apache.Arrow.Operations/VariantJson/VariantJsonReader.cs
@@ -55,7 +55,7 @@ namespace Apache.Arrow.Operations.VariantJson
// Pass 2: stream values into a VariantValueWriter using the
sorted field IDs.
Utf8JsonReader emitter = new Utf8JsonReader(utf8Json);
emitter.Read();
- VariantValueWriter writer = new
VariantValueWriter(metadataBuilder, idRemap);
+ using VariantValueWriter writer = new
VariantValueWriter(metadataBuilder, idRemap);
WriteValue(ref emitter, writer);
return (metadata, writer.ToArray());
}
diff --git a/src/Apache.Arrow.Scalars/Variant/Buffer.cs
b/src/Apache.Arrow.Scalars/Variant/Buffer.cs
new file mode 100644
index 0000000..3cbfb62
--- /dev/null
+++ b/src/Apache.Arrow.Scalars/Variant/Buffer.cs
@@ -0,0 +1,229 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Buffers;
+using System.Buffers.Binary;
+using System.Collections.Generic;
+
+namespace Apache.Arrow.Scalars.Variant
+{
+ /// <summary>
+ /// Mutable, non-thread-safe growable buffer designed for single-owner use
as
+ /// a field on a class (or as a local owned by the same method that created
+ /// it). The backing array is rented from a caller-supplied
+ /// <see cref="Stack{T}"/> so that capacity is "sticky" across reuse cycles
+ /// within one owner — unlike <see cref="ArrayPool{T}.Shared"/>, which
+ /// buckets by size class and may hand back a different array than the one
+ /// last returned.
+ /// </summary>
+ /// <remarks>
+ /// <b>Warning — mutable value type.</b> A <c>Buffer<T></c> must
never
+ /// be copied by value. All mutation has to go through the original storage
+ /// location, or a stale copy will silently corrupt output by writing past
+ /// its own view of the length into a shared backing array:
+ /// <list type="bullet">
+ /// <item>Pass by <c>ref</c>, not by value. Helper methods that take a
+ /// <c>Buffer<T></c> parameter must declare it
+ /// <c>ref Buffer<T></c>.</item>
+ /// <item>Assign from a getter only through a ref local
+ /// (<c>ref Buffer<T> b = ref GetBuffer();</c>) — a plain
+ /// <c>var</c> local is a copy.</item>
+ /// <item>Byte-specific helpers live on <see
cref="ByteBufferExtensions"/>
+ /// as <c>ref this</c> extension methods precisely so they can't be
+ /// invoked on a by-value receiver. Do not add methods that would force
+ /// a by-value receiver.</item>
+ /// </list>
+ /// <para>
+ /// <b>Explicit lifetime.</b> Pair every <see cref="Acquire"/> with
+ /// <see cref="Release"/> (typically at the begin/end of a scope).
+ /// <see cref="Acquire"/> must be called before any write; writes on a
+ /// default-initialized buffer throw <see cref="NullReferenceException"/>.
+ /// Failing to <see cref="Release"/> leaks the backing array to the GC.
+ /// For the same reason, the local pool must also be cleaned up when done.
+ /// This can be done with <see cref="DrainPool"/>.
+ ///
+ /// </para>
+ /// </remarks>
+ internal struct Buffer<T>
+ {
+ private const int InitialCapacity = 64;
+
+ private T[] _buf;
+ private int _length;
+
+ /// <summary>Number of items currently written.</summary>
+ public int Length => _length;
+
+ /// <summary>
+ /// The backing array. Slots beyond <see cref="Length"/> are
unspecified;
+ /// callers must respect the length when reading.
+ /// </summary>
+ public T[] RawBuffer => _buf;
+
+ /// <summary>
+ /// Rents a backing array from <paramref name="pool"/> (falling back to
+ /// <see cref="ArrayPool{T}.Shared"/> on pool miss) and resets length
to
+ /// zero. Must be called before any write.
+ /// </summary>
+ public void Acquire(Stack<T[]> pool)
+ {
+ _buf = pool.Count > 0 ? pool.Pop() :
ArrayPool<T>.Shared.Rent(InitialCapacity);
+ _length = 0;
+ }
+
+ /// <summary>
+ /// Returns the backing array to <paramref name="pool"/> and clears
+ /// state. Safe to call on a default-initialized buffer (no-op).
+ /// </summary>
+ public void Release(Stack<T[]> pool)
+ {
+ if (_buf != null)
+ {
+ pool.Push(_buf);
+ _buf = null;
+ _length = 0;
+ }
+ }
+
+ /// <summary>
+ /// Returns every array stashed in <paramref name="pool"/> to
+ /// <see cref="ArrayPool{T}.Shared"/>. Use at end-of-life of the owner
+ /// to release the per-owner cache built up by <see cref="Release"/>.
+ /// </summary>
+ public static void DrainPool(Stack<T[]> pool)
+ {
+ while (pool.Count > 0)
+ {
+ ArrayPool<T>.Shared.Return(pool.Pop());
+ }
+ }
+
+ /// <summary>
+ /// Returns a span covering the next <paramref name="sizeHint"/>
writable
+ /// items, growing the backing array if necessary. Call
+ /// <see cref="Advance"/> after writing to commit.
+ /// </summary>
+ public Span<T> GetSpan(int sizeHint)
+ {
+ EnsureCapacity(_length + sizeHint);
+ return _buf.AsSpan(_length);
+ }
+
+ /// <summary>Advances the written length by <paramref
name="count"/>.</summary>
+ public void Advance(int count) => _length += count;
+
+ /// <summary>Appends a single item.</summary>
+ public void Append(T value)
+ {
+ EnsureCapacity(_length + 1);
+ _buf[_length++] = value;
+ }
+
+ /// <summary>Appends a span of items.</summary>
+ public void Append(ReadOnlySpan<T> src)
+ {
+ EnsureCapacity(_length + src.Length);
+ src.CopyTo(_buf.AsSpan(_length));
+ _length += src.Length;
+ }
+
+ /// <summary>Appends a range from an array.</summary>
+ public void Append(T[] src, int start, int count)
+ {
+ EnsureCapacity(_length + count);
+ Array.Copy(src, start, _buf, _length, count);
+ _length += count;
+ }
+
+ /// <summary>Copies the written items into a freshly allocated array
of exact length.</summary>
+ public T[] ToArray()
+ {
+ T[] result = new T[_length];
+ Array.Copy(_buf, 0, result, 0, _length);
+ return result;
+ }
+
+ private void EnsureCapacity(int required)
+ {
+ if (required > _buf.Length)
+ {
+ int newSize = _buf.Length;
+ do
+ {
+ newSize *= 2;
+ } while (newSize < required);
+ T[] grown = ArrayPool<T>.Shared.Rent(newSize);
+ Array.Copy(_buf, 0, grown, 0, _length);
+ ArrayPool<T>.Shared.Return(_buf);
+ _buf = grown;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Byte-specific writers for <see cref="Buffer{T}"/> of <see
cref="byte"/>.
+ /// Declared as <c>ref this</c> extension methods so invocation through a
+ /// by-value receiver is a compile error, not a silent copy-and-desync.
+ /// </summary>
+ internal static class ByteBufferExtensions
+ {
+ public static void WriteInt16LE(this ref Buffer<byte> buf, short value)
+ {
+ BinaryPrimitives.WriteInt16LittleEndian(buf.GetSpan(2), value);
+ buf.Advance(2);
+ }
+
+ public static void WriteInt32LE(this ref Buffer<byte> buf, int value)
+ {
+ BinaryPrimitives.WriteInt32LittleEndian(buf.GetSpan(4), value);
+ buf.Advance(4);
+ }
+
+ public static void WriteInt64LE(this ref Buffer<byte> buf, long value)
+ {
+ BinaryPrimitives.WriteInt64LittleEndian(buf.GetSpan(8), value);
+ buf.Advance(8);
+ }
+
+ public static void WriteFloatLE(this ref Buffer<byte> buf, float value)
+ {
+#if NET8_0_OR_GREATER
+ BinaryPrimitives.WriteSingleLittleEndian(buf.GetSpan(4), value);
+ buf.Advance(4);
+#else
+ int bits = System.Runtime.CompilerServices.Unsafe.As<float,
int>(ref value);
+ buf.WriteInt32LE(bits);
+#endif
+ }
+
+ public static void WriteDoubleLE(this ref Buffer<byte> buf, double
value)
+ {
+#if NET8_0_OR_GREATER
+ BinaryPrimitives.WriteDoubleLittleEndian(buf.GetSpan(8), value);
+ buf.Advance(8);
+#else
+ long bits = BitConverter.DoubleToInt64Bits(value);
+ buf.WriteInt64LE(bits);
+#endif
+ }
+
+ public static void WriteSmallInt(this ref Buffer<byte> buf, int value,
int byteWidth)
+ {
+ VariantEncodingHelper.WriteLittleEndianInt(buf.GetSpan(byteWidth),
value, byteWidth);
+ buf.Advance(byteWidth);
+ }
+ }
+}
diff --git a/src/Apache.Arrow.Scalars/Variant/VariantBuilder.cs
b/src/Apache.Arrow.Scalars/Variant/VariantBuilder.cs
index d5b8337..7b6dbc4 100644
--- a/src/Apache.Arrow.Scalars/Variant/VariantBuilder.cs
+++ b/src/Apache.Arrow.Scalars/Variant/VariantBuilder.cs
@@ -34,7 +34,7 @@ namespace Apache.Arrow.Scalars.Variant
CollectFieldNames(variant, metadataBuilder);
byte[] metadata = metadataBuilder.Build(out int[] idRemap);
- VariantValueWriter writer = new
VariantValueWriter(metadataBuilder, idRemap);
+ using VariantValueWriter writer = new
VariantValueWriter(metadataBuilder, idRemap);
WriteValue(writer, variant);
return (metadata, writer.ToArray());
}
diff --git a/src/Apache.Arrow.Scalars/Variant/VariantValueWriter.cs
b/src/Apache.Arrow.Scalars/Variant/VariantValueWriter.cs
index dc54aa5..0daa47d 100644
--- a/src/Apache.Arrow.Scalars/Variant/VariantValueWriter.cs
+++ b/src/Apache.Arrow.Scalars/Variant/VariantValueWriter.cs
@@ -15,10 +15,8 @@
using System;
using System.Buffers;
-using System.Buffers.Binary;
using System.Collections.Generic;
using System.Data.SqlTypes;
-using System.IO;
using System.Text;
namespace Apache.Arrow.Scalars.Variant
@@ -34,22 +32,27 @@ namespace Apache.Arrow.Scalars.Variant
/// <item>Create a <see cref="VariantMetadataBuilder"/> and <see
cref="VariantMetadataBuilder.Add(string)"/> every field name that will
appear.</item>
/// <item>Call <see cref="VariantMetadataBuilder.Build(out int[])"/> to
produce the metadata bytes and the ID remap.</item>
/// <item>Create a <see cref="VariantValueWriter"/> with the metadata
builder and remap, emit the value via the <c>Write*</c> / <c>Begin*</c> /
<c>End*</c> methods, then call <see cref="ToArray"/>.</item>
+ /// <item><see cref="Dispose"/> the writer to return its cached backing
arrays to <see cref="ArrayPool{T}.Shared"/>. Skipping <c>Dispose</c> leaks
those arrays to the GC.</item>
/// </list>
/// </remarks>
- public sealed class VariantValueWriter
+ public sealed class VariantValueWriter : IDisposable
{
private const int StackAllocThreshold = 256;
private readonly VariantMetadataBuilder _metadata;
private readonly int[] _idRemap;
- private readonly MemoryStream _root = new MemoryStream();
+
+ // Per-writer stacks of cached backing arrays, separate from
+ // ArrayPool<T>.Shared so that capacity grown on one frame's buffer
+ // carries over to the next frame through the same writer without
+ // being redistributed by size class.
+ private readonly Stack<byte[]> _bytePool = new Stack<byte[]>();
+ private readonly Stack<int[]> _intPool = new Stack<int[]>();
+
+ private Buffer<byte> _root;
private readonly Stack<Frame> _frameStack = new Stack<Frame>();
- private readonly Stack<MemoryStream> _streamPool = new
Stack<MemoryStream>();
private Frame _frame;
-
-#if !NET8_0_OR_GREATER
- private readonly byte[] _scratch = new byte[16];
-#endif
+ private bool _disposed;
/// <summary>
/// Creates a writer that produces value bytes referencing the given
metadata.
@@ -67,6 +70,8 @@ namespace Apache.Arrow.Scalars.Variant
"the idRemap array length must match the metadata builder
count used to create it.",
nameof(idRemap));
}
+
+ _root.Acquire(_bytePool);
}
/// <summary>
@@ -74,6 +79,7 @@ namespace Apache.Arrow.Scalars.Variant
/// </summary>
public byte[] ToArray()
{
+ if (_disposed) throw new
ObjectDisposedException(nameof(VariantValueWriter));
if (_frame != null)
{
throw new InvalidOperationException("Unclosed object or array
at the top of the writer.");
@@ -81,6 +87,40 @@ namespace Apache.Arrow.Scalars.Variant
return _root.ToArray();
}
+ /// <summary>
+ /// Returns all cached backing arrays (the root buffer, any still-open
+ /// frame buffers, and the per-writer array pools) to
+ /// <see cref="ArrayPool{T}.Shared"/>. The writer must not be used
after
+ /// <see cref="Dispose"/>; calls to <see cref="ToArray"/> or any
+ /// <c>Write*</c> / <c>Begin*</c> method will throw
+ /// <see cref="ObjectDisposedException"/>. Idempotent.
+ /// </summary>
+ public void Dispose()
+ {
+ if (_disposed) return;
+ _disposed = true;
+
+ // Release still-owned frame arrays into the per-writer pools.
+ if (_frame != null) ReleaseFrameArrays(_frame);
+ while (_frameStack.Count > 0)
+ {
+ Frame f = _frameStack.Pop();
+ if (f != null) ReleaseFrameArrays(f);
+ }
+ _root.Release(_bytePool);
+
+ // Drain the per-writer pools back to the process-wide shared pool.
+ Buffer<byte>.DrainPool(_bytePool);
+ Buffer<int>.DrainPool(_intPool);
+ }
+
+ private void ReleaseFrameArrays(Frame f)
+ {
+ f.Buffer.Release(_bytePool);
+ f.ValueStarts.Release(_intPool);
+ if (f is ObjectFrame obj) obj.FieldIds.Release(_intPool);
+ }
+
// ---------------------------------------------------------------
// Object / array scope
// ---------------------------------------------------------------
@@ -89,10 +129,26 @@ namespace Apache.Arrow.Scalars.Variant
public void BeginObject()
{
BeforeWriteValue();
- _frameStack.Push(_frame);
- ObjectFrame frame = new ObjectFrame { Buffer = RentStream() };
- frame.FieldIds = ArrayPool<int>.Shared.Rent(InitialFrameCapacity);
- frame.ValueStarts =
ArrayPool<int>.Shared.Rent(InitialFrameCapacity);
+ ObjectFrame frame = new ObjectFrame();
+ // Keep _frame / _frameStack untouched until every Acquire + the
Push
+ // have succeeded. If any step throws, the catch releases whatever
+ // was acquired so far (Release is a no-op on un-Acquired buffers),
+ // and the writer's visible state is as if BeginObject was never
+ // called — Dispose sees no orphaned arrays.
+ try
+ {
+ frame.Buffer.Acquire(_bytePool);
+ frame.ValueStarts.Acquire(_intPool);
+ frame.FieldIds.Acquire(_intPool);
+ _frameStack.Push(_frame);
+ }
+ catch
+ {
+ frame.FieldIds.Release(_intPool);
+ frame.ValueStarts.Release(_intPool);
+ frame.Buffer.Release(_bytePool);
+ throw;
+ }
_frame = frame;
}
@@ -103,6 +159,7 @@ namespace Apache.Arrow.Scalars.Variant
/// </summary>
public void WriteFieldName(string name)
{
+ if (_disposed) throw new
ObjectDisposedException(nameof(VariantValueWriter));
if (!(_frame is ObjectFrame objFrame))
{
throw new InvalidOperationException("WriteFieldName may only
be called inside an object scope.");
@@ -112,13 +169,14 @@ namespace Apache.Arrow.Scalars.Variant
throw new InvalidOperationException("A value must be written
for the previous field before writing the next field name.");
}
int fieldId = _idRemap[_metadata.GetId(name)];
- AppendInt(ref objFrame.FieldIds, ref objFrame.FieldIdCount,
fieldId);
+ objFrame.FieldIds.Append(fieldId);
objFrame.PendingValue = true;
}
/// <summary>Ends the current object scope.</summary>
public void EndObject()
{
+ if (_disposed) throw new
ObjectDisposedException(nameof(VariantValueWriter));
if (!(_frame is ObjectFrame objFrame))
{
throw new InvalidOperationException("EndObject called without
matching BeginObject.");
@@ -129,36 +187,78 @@ namespace Apache.Arrow.Scalars.Variant
}
_frame = _frameStack.Pop();
- MemoryStream output = _frame != null ? _frame.Buffer : _root;
- WriteObjectBody(output, objFrame);
- ArrayPool<int>.Shared.Return(objFrame.FieldIds);
- ArrayPool<int>.Shared.Return(objFrame.ValueStarts);
- ReturnStream(objFrame.Buffer);
+ // Once objFrame is popped it's no longer visible to Dispose, so
+ // WriteObjectBody must not leave its buffers unreleased on throw.
+ try
+ {
+ if (_frame != null)
+ {
+ WriteObjectBody(ref _frame.Buffer, objFrame);
+ }
+ else
+ {
+ WriteObjectBody(ref _root, objFrame);
+ }
+ }
+ finally
+ {
+ objFrame.FieldIds.Release(_intPool);
+ objFrame.ValueStarts.Release(_intPool);
+ objFrame.Buffer.Release(_bytePool);
+ }
}
/// <summary>Begins writing an array. Pair with <see
cref="EndArray"/>.</summary>
public void BeginArray()
{
BeforeWriteValue();
- _frameStack.Push(_frame);
- ArrayFrame frame = new ArrayFrame { Buffer = RentStream() };
- frame.ValueStarts =
ArrayPool<int>.Shared.Rent(InitialFrameCapacity);
+ ArrayFrame frame = new ArrayFrame();
+ // See BeginObject: defer any visible state change until all the
+ // rent-and-push steps have succeeded; on throw, release whatever
+ // was acquired so nothing escapes Dispose's reach.
+ try
+ {
+ frame.Buffer.Acquire(_bytePool);
+ frame.ValueStarts.Acquire(_intPool);
+ _frameStack.Push(_frame);
+ }
+ catch
+ {
+ frame.ValueStarts.Release(_intPool);
+ frame.Buffer.Release(_bytePool);
+ throw;
+ }
_frame = frame;
}
/// <summary>Ends the current array scope.</summary>
public void EndArray()
{
+ if (_disposed) throw new
ObjectDisposedException(nameof(VariantValueWriter));
if (!(_frame is ArrayFrame arrFrame))
{
throw new InvalidOperationException("EndArray called without
matching BeginArray.");
}
_frame = _frameStack.Pop();
- MemoryStream output = _frame != null ? _frame.Buffer : _root;
- WriteArrayBody(output, arrFrame);
- ArrayPool<int>.Shared.Return(arrFrame.ValueStarts);
- ReturnStream(arrFrame.Buffer);
+ // Popped frame is no longer visible to Dispose; the finally makes
+ // sure its buffers are released even if WriteArrayBody throws.
+ try
+ {
+ if (_frame != null)
+ {
+ WriteArrayBody(ref _frame.Buffer, arrFrame);
+ }
+ else
+ {
+ WriteArrayBody(ref _root, arrFrame);
+ }
+ }
+ finally
+ {
+ arrFrame.ValueStarts.Release(_intPool);
+ arrFrame.Buffer.Release(_bytePool);
+ }
}
// ---------------------------------------------------------------
@@ -168,48 +268,48 @@ namespace Apache.Arrow.Scalars.Variant
/// <summary>Writes a null value.</summary>
public void WriteNull()
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.NullType));
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.NullType));
}
/// <summary>Writes a boolean value.</summary>
public void WriteBoolean(bool value)
{
- MemoryStream ms = BeforeWriteValue();
- ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+ buf.Append(VariantEncodingHelper.MakePrimitiveHeader(
value ? VariantPrimitiveType.BooleanTrue :
VariantPrimitiveType.BooleanFalse));
}
/// <summary>Writes an 8-bit signed integer.</summary>
public void WriteInt8(sbyte value)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int8));
- ms.WriteByte((byte)value);
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int8));
+ buf.Append((byte)value);
}
/// <summary>Writes a 16-bit signed integer.</summary>
public void WriteInt16(short value)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int16));
- WriteInt16LE(ms, value);
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int16));
+ buf.WriteInt16LE(value);
}
/// <summary>Writes a 32-bit signed integer.</summary>
public void WriteInt32(int value)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int32));
- WriteInt32LE(ms, value);
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int32));
+ buf.WriteInt32LE(value);
}
/// <summary>Writes a 64-bit signed integer.</summary>
public void WriteInt64(long value)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int64));
- WriteInt64LE(ms, value);
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Int64));
+ buf.WriteInt64LE(value);
}
/// <summary>
@@ -239,24 +339,24 @@ namespace Apache.Arrow.Scalars.Variant
/// <summary>Writes a 32-bit IEEE 754 float.</summary>
public void WriteFloat(float value)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Float));
- WriteFloatLE(ms, value);
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Float));
+ buf.WriteFloatLE(value);
}
/// <summary>Writes a 64-bit IEEE 754 double.</summary>
public void WriteDouble(double value)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Double));
- WriteDoubleLE(ms, value);
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Double));
+ buf.WriteDoubleLE(value);
}
/// <summary>Writes a Decimal4 (precision ≤ 9) value.</summary>
public void WriteDecimal4(decimal value)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Decimal4));
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Decimal4));
#if NET8_0_OR_GREATER
Span<int> bits = stackalloc int[4];
decimal.GetBits(value, bits);
@@ -267,15 +367,15 @@ namespace Apache.Arrow.Scalars.Variant
bool negative = (bits[3] & unchecked((int)0x80000000)) != 0;
int unscaled = bits[0];
if (negative) unscaled = -unscaled;
- ms.WriteByte(scale);
- WriteInt32LE(ms, unscaled);
+ buf.Append(scale);
+ buf.WriteInt32LE(unscaled);
}
/// <summary>Writes a Decimal8 (precision ≤ 18) value.</summary>
public void WriteDecimal8(decimal value)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Decimal8));
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Decimal8));
#if NET8_0_OR_GREATER
Span<int> bits = stackalloc int[4];
decimal.GetBits(value, bits);
@@ -286,15 +386,15 @@ namespace Apache.Arrow.Scalars.Variant
bool negative = (bits[3] & unchecked((int)0x80000000)) != 0;
long unscaled = ((long)bits[1] << 32) | (uint)bits[0];
if (negative) unscaled = -unscaled;
- ms.WriteByte(scale);
- WriteInt64LE(ms, unscaled);
+ buf.Append(scale);
+ buf.WriteInt64LE(unscaled);
}
/// <summary>Writes a Decimal16 (precision ≤ 38) value stored as <see
cref="SqlDecimal"/>.</summary>
public void WriteDecimal16(SqlDecimal value)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Decimal16));
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Decimal16));
bool positive = value.IsPositive;
byte scale = (byte)value.Scale;
@@ -314,160 +414,155 @@ namespace Apache.Arrow.Scalars.Variant
lo = (long)uLo;
}
- ms.WriteByte(scale);
- WriteInt64LE(ms, lo);
- WriteInt64LE(ms, hi);
+ buf.Append(scale);
+ buf.WriteInt64LE(lo);
+ buf.WriteInt64LE(hi);
}
/// <summary>Writes a string. Uses the short-string encoding when the
UTF-8 byte length is ≤ 63.</summary>
public void WriteString(string value)
{
if (value == null) throw new ArgumentNullException(nameof(value));
- MemoryStream ms = BeforeWriteValue();
+ ref Buffer<byte> buf = ref BeforeWriteValue();
int byteCount = Encoding.UTF8.GetByteCount(value);
if (byteCount <= 63)
{
-
ms.WriteByte(VariantEncodingHelper.MakeShortStringHeader(byteCount));
+
buf.Append(VariantEncodingHelper.MakeShortStringHeader(byteCount));
}
else
{
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.String));
- WriteInt32LE(ms, byteCount);
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.String));
+ buf.WriteInt32LE(byteCount);
}
- // Encode UTF-8 directly into the MemoryStream's buffer.
- int dataPos = (int)ms.Position;
- int needed = dataPos + byteCount;
- if (needed > ms.Length)
- {
- ms.SetLength(needed);
- }
- Encoding.UTF8.GetBytes(value, 0, value.Length, ms.GetBuffer(),
dataPos);
- ms.Position = needed;
+ // Encode UTF-8 directly into the buffer.
+ Span<byte> dest = buf.GetSpan(byteCount);
+#if NET8_0_OR_GREATER
+ Encoding.UTF8.GetBytes(value, dest);
+#else
+ Encoding.UTF8.GetBytes(value, 0, value.Length, buf.RawBuffer,
buf.Length);
+#endif
+ buf.Advance(byteCount);
}
/// <summary>Writes a binary blob.</summary>
- public void WriteBinary(byte[] data)
+ public void WriteBinary(ReadOnlySpan<byte> data)
{
- if (data == null) throw new ArgumentNullException(nameof(data));
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Binary));
- WriteInt32LE(ms, data.Length);
- ms.Write(data, 0, data.Length);
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Binary));
+ buf.WriteInt32LE(data.Length);
+ buf.Append(data);
}
/// <summary>Writes a UUID.</summary>
- public void WriteUuid(Guid value)
+ public unsafe void WriteUuid(Guid value)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Uuid));
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Uuid));
+ Span<byte> raw = stackalloc byte[16];
+
#if NET8_0_OR_GREATER
- Span<byte> buf = stackalloc byte[16];
- value.TryWriteBytes(buf, bigEndian: true, out int _);
- ms.Write(buf);
+ value.TryWriteBytes(raw, bigEndian: true, out _);
#else
- byte[] native = value.ToByteArray();
// Convert from .NET mixed-endian to big-endian (RFC 4122).
- _scratch[0] = native[3]; _scratch[1] = native[2]; _scratch[2] =
native[1]; _scratch[3] = native[0];
- _scratch[4] = native[5]; _scratch[5] = native[4];
- _scratch[6] = native[7]; _scratch[7] = native[6];
- Buffer.BlockCopy(native, 8, _scratch, 8, 8);
- ms.Write(_scratch, 0, 16);
+ byte* guidPtr = (byte*)&value;
+ fixed (byte* bytePtr = raw)
+ {
+ bytePtr[0] = guidPtr[3];
+ bytePtr[1] = guidPtr[2];
+ bytePtr[2] = guidPtr[1];
+ bytePtr[3] = guidPtr[0];
+ bytePtr[4] = guidPtr[5];
+ bytePtr[5] = guidPtr[4];
+ bytePtr[6] = guidPtr[7];
+ bytePtr[7] = guidPtr[6];
+ ((long*)bytePtr)[1] = ((long*)guidPtr)[1];
+ }
#endif
+ buf.Append(raw);
}
/// <summary>Writes a date as days since the Unix epoch.</summary>
public void WriteDateDays(int days)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Date));
- WriteInt32LE(ms, days);
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Date));
+ buf.WriteInt32LE(days);
}
/// <summary>Writes a timestamp (tz-adjusted microseconds since the
Unix epoch).</summary>
public void WriteTimestampMicros(long micros)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Timestamp));
- WriteInt64LE(ms, micros);
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.Timestamp));
+ buf.WriteInt64LE(micros);
}
/// <summary>Writes a timestamp-without-timezone (microseconds since
the Unix epoch).</summary>
public void WriteTimestampNtzMicros(long micros)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimestampNtz));
- WriteInt64LE(ms, micros);
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimestampNtz));
+ buf.WriteInt64LE(micros);
}
/// <summary>Writes a time-without-timezone value (microseconds since
midnight).</summary>
public void WriteTimeNtzMicros(long micros)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimeNtz));
- WriteInt64LE(ms, micros);
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimeNtz));
+ buf.WriteInt64LE(micros);
}
/// <summary>Writes a timestamp with timezone (nanoseconds since the
Unix epoch).</summary>
public void WriteTimestampTzNanos(long nanos)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimestampTzNanos));
- WriteInt64LE(ms, nanos);
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimestampTzNanos));
+ buf.WriteInt64LE(nanos);
}
/// <summary>Writes a timestamp without timezone (nanoseconds since
the Unix epoch).</summary>
public void WriteTimestampNtzNanos(long nanos)
{
- MemoryStream ms = BeforeWriteValue();
-
ms.WriteByte(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimestampNtzNanos));
- WriteInt64LE(ms, nanos);
+ ref Buffer<byte> buf = ref BeforeWriteValue();
+
buf.Append(VariantEncodingHelper.MakePrimitiveHeader(VariantPrimitiveType.TimestampNtzNanos));
+ buf.WriteInt64LE(nanos);
}
// ---------------------------------------------------------------
// Internal bookkeeping
// ---------------------------------------------------------------
- private MemoryStream BeforeWriteValue()
+ private ref Buffer<byte> BeforeWriteValue()
{
+ if (_disposed) throw new
ObjectDisposedException(nameof(VariantValueWriter));
if (_frame is ObjectFrame objFrame)
{
if (!objFrame.PendingValue)
{
throw new InvalidOperationException("A field name is
required before writing an object field value. Call WriteFieldName first.");
}
- AppendInt(ref objFrame.ValueStarts, ref
objFrame.ValueStartCount, (int)objFrame.Buffer.Position);
+ objFrame.ValueStarts.Append(objFrame.Buffer.Length);
objFrame.PendingValue = false;
- return objFrame.Buffer;
+ return ref objFrame.Buffer;
}
if (_frame is ArrayFrame arrFrame)
{
- AppendInt(ref arrFrame.ValueStarts, ref
arrFrame.ValueStartCount, (int)arrFrame.Buffer.Position);
- return arrFrame.Buffer;
+ arrFrame.ValueStarts.Append(arrFrame.Buffer.Length);
+ return ref arrFrame.Buffer;
}
- return _root;
+ return ref _root;
}
- private static void AppendInt(ref int[] array, ref int count, int
value)
+ private static void WriteObjectBody(ref Buffer<byte> output,
ObjectFrame frame)
{
- if (count == array.Length)
- {
- int[] grown = ArrayPool<int>.Shared.Rent(array.Length * 2);
- Array.Copy(array, 0, grown, 0, count);
- ArrayPool<int>.Shared.Return(array);
- array = grown;
- }
- array[count++] = value;
- }
-
- private void WriteObjectBody(MemoryStream output, ObjectFrame frame)
- {
- int fieldCount = frame.FieldIdCount;
+ int fieldCount = frame.FieldIds.Length;
// Sentinel marks the end of the last value in the frame buffer.
- AppendInt(ref frame.ValueStarts, ref frame.ValueStartCount,
(int)frame.Buffer.Position);
+ frame.ValueStarts.Append(frame.Buffer.Length);
- int[] fieldIds = frame.FieldIds;
- int[] valueStarts = frame.ValueStarts;
+ int[] fieldIds = frame.FieldIds.RawBuffer;
+ int[] valueStarts = frame.ValueStarts.RawBuffer;
// Sort indices so fields are emitted in sorted-field-id order.
#if NET8_0_OR_GREATER
@@ -506,169 +601,75 @@ namespace Apache.Arrow.Scalars.Variant
int offsetSize =
VariantEncodingHelper.ByteWidthForValue(Math.Max(1, offsets[fieldCount]));
bool isLarge = fieldCount > 255;
-
output.WriteByte(VariantEncodingHelper.MakeObjectHeader(fieldIdSize,
offsetSize, isLarge));
+ output.Append(VariantEncodingHelper.MakeObjectHeader(fieldIdSize,
offsetSize, isLarge));
if (isLarge)
{
- WriteInt32LE(output, fieldCount);
+ output.WriteInt32LE(fieldCount);
}
else
{
- output.WriteByte((byte)fieldCount);
+ output.Append((byte)fieldCount);
}
for (int i = 0; i < fieldCount; i++)
{
- WriteSmallInt(output, fieldIds[sortOrder[i]], fieldIdSize);
+ output.WriteSmallInt(fieldIds[sortOrder[i]], fieldIdSize);
}
for (int i = 0; i <= fieldCount; i++)
{
- WriteSmallInt(output, offsets[i], offsetSize);
+ output.WriteSmallInt(offsets[i], offsetSize);
}
- byte[] valueBuffer = frame.Buffer.GetBuffer();
+ byte[] valueBuffer = frame.Buffer.RawBuffer;
for (int i = 0; i < fieldCount; i++)
{
int idx = sortOrder[i];
int start = valueStarts[idx];
int length = valueStarts[idx + 1] - start;
- output.Write(valueBuffer, start, length);
+ output.Append(valueBuffer, start, length);
}
}
- private void WriteArrayBody(MemoryStream output, ArrayFrame frame)
+ private static void WriteArrayBody(ref Buffer<byte> output, ArrayFrame
frame)
{
- int elementCount = frame.ValueStartCount;
+ int elementCount = frame.ValueStarts.Length;
// Sentinel marks the end of the last element.
- AppendInt(ref frame.ValueStarts, ref frame.ValueStartCount,
(int)frame.Buffer.Position);
- int[] valueStarts = frame.ValueStarts;
+ frame.ValueStarts.Append(frame.Buffer.Length);
+ int[] valueStarts = frame.ValueStarts.RawBuffer;
int offsetSize =
VariantEncodingHelper.ByteWidthForValue(Math.Max(1, valueStarts[elementCount]));
bool isLarge = elementCount > 255;
- output.WriteByte(VariantEncodingHelper.MakeArrayHeader(offsetSize,
isLarge));
+ output.Append(VariantEncodingHelper.MakeArrayHeader(offsetSize,
isLarge));
if (isLarge)
{
- WriteInt32LE(output, elementCount);
+ output.WriteInt32LE(elementCount);
}
else
{
- output.WriteByte((byte)elementCount);
+ output.Append((byte)elementCount);
}
for (int i = 0; i <= elementCount; i++)
{
- WriteSmallInt(output, valueStarts[i], offsetSize);
+ output.WriteSmallInt(valueStarts[i], offsetSize);
}
- output.Write(frame.Buffer.GetBuffer(), 0,
(int)frame.Buffer.Position);
+ output.Append(frame.Buffer.RawBuffer, 0, frame.Buffer.Length);
}
- private MemoryStream RentStream()
- {
- if (_streamPool.Count > 0)
- {
- MemoryStream ms = _streamPool.Pop();
- ms.SetLength(0);
- return ms;
- }
- return new MemoryStream();
- }
-
- private void ReturnStream(MemoryStream ms)
- {
- _streamPool.Push(ms);
- }
-
- private void WriteSmallInt(MemoryStream ms, int value, int byteWidth)
- {
-#if NET8_0_OR_GREATER
- Span<byte> buf = stackalloc byte[4];
- VariantEncodingHelper.WriteLittleEndianInt(buf, value, byteWidth);
- ms.Write(buf.Slice(0, byteWidth));
-#else
- VariantEncodingHelper.WriteLittleEndianInt(_scratch, value,
byteWidth);
- ms.Write(_scratch, 0, byteWidth);
-#endif
- }
-
- private void WriteInt16LE(MemoryStream ms, short value)
- {
-#if NET8_0_OR_GREATER
- Span<byte> buf = stackalloc byte[2];
- BinaryPrimitives.WriteInt16LittleEndian(buf, value);
- ms.Write(buf);
-#else
- BinaryPrimitives.WriteInt16LittleEndian(_scratch, value);
- ms.Write(_scratch, 0, 2);
-#endif
- }
-
- private void WriteInt32LE(MemoryStream ms, int value)
- {
-#if NET8_0_OR_GREATER
- Span<byte> buf = stackalloc byte[4];
- BinaryPrimitives.WriteInt32LittleEndian(buf, value);
- ms.Write(buf);
-#else
- BinaryPrimitives.WriteInt32LittleEndian(_scratch, value);
- ms.Write(_scratch, 0, 4);
-#endif
- }
-
- private void WriteInt64LE(MemoryStream ms, long value)
- {
-#if NET8_0_OR_GREATER
- Span<byte> buf = stackalloc byte[8];
- BinaryPrimitives.WriteInt64LittleEndian(buf, value);
- ms.Write(buf);
-#else
- BinaryPrimitives.WriteInt64LittleEndian(_scratch, value);
- ms.Write(_scratch, 0, 8);
-#endif
- }
-
- private void WriteFloatLE(MemoryStream ms, float value)
- {
-#if NET8_0_OR_GREATER
- Span<byte> buf = stackalloc byte[4];
- BinaryPrimitives.WriteSingleLittleEndian(buf, value);
- ms.Write(buf);
-#else
- int bits = System.Runtime.CompilerServices.Unsafe.As<float,
int>(ref value);
- BinaryPrimitives.WriteInt32LittleEndian(_scratch, bits);
- ms.Write(_scratch, 0, 4);
-#endif
- }
-
- private void WriteDoubleLE(MemoryStream ms, double value)
- {
-#if NET8_0_OR_GREATER
- Span<byte> buf = stackalloc byte[8];
- BinaryPrimitives.WriteDoubleLittleEndian(buf, value);
- ms.Write(buf);
-#else
- long bits = BitConverter.DoubleToInt64Bits(value);
- BinaryPrimitives.WriteInt64LittleEndian(_scratch, bits);
- ms.Write(_scratch, 0, 8);
-#endif
- }
-
- private const int InitialFrameCapacity = 16;
-
private abstract class Frame
{
- public MemoryStream Buffer;
- public int[] ValueStarts;
- public int ValueStartCount;
+ public Buffer<byte> Buffer;
+ public Buffer<int> ValueStarts;
}
private sealed class ObjectFrame : Frame
{
- public int[] FieldIds;
- public int FieldIdCount;
+ public Buffer<int> FieldIds;
public bool PendingValue;
}
diff --git a/src/Apache.Arrow/Arrays/GuidArray.cs
b/src/Apache.Arrow/Arrays/GuidArray.cs
index 0f4c3e1..cad9cfa 100644
--- a/src/Apache.Arrow/Arrays/GuidArray.cs
+++ b/src/Apache.Arrow/Arrays/GuidArray.cs
@@ -140,6 +140,9 @@ namespace Apache.Arrow
if (bytes.Length != GuidType.ByteWidth)
throw new ArgumentException("Byte span must be exactly 16
bytes long.", nameof(bytes));
+#if NET8_0_OR_GREATER
+ guid.TryWriteBytes(bytes, bigEndian: true, out _);
+#else
byte* guidPtr = (byte*)&guid;
fixed (byte* bytePtr = bytes)
{
@@ -153,6 +156,7 @@ namespace Apache.Arrow
bytePtr[7] = guidPtr[6];
((long*)bytePtr)[1] = ((long*)guidPtr)[1];
}
+#endif
}
/// <summary>
@@ -164,6 +168,9 @@ namespace Apache.Arrow
if (bytes.Length != GuidType.ByteWidth)
throw new ArgumentException("Byte span must be exactly 16
bytes long.", nameof(bytes));
+#if NET8_0_OR_GREATER
+ Guid result = new Guid(bytes, bigEndian: true);
+#else
Guid result = new Guid();
byte* guidPtr = (byte*)&result;
fixed (byte* bytePtr = bytes)
@@ -177,8 +184,9 @@ namespace Apache.Arrow
guidPtr[6] = bytePtr[7];
guidPtr[7] = bytePtr[6];
((long*)guidPtr)[1] = ((long*)bytePtr)[1];
- return result;
}
+#endif
+ return result;
}
public Guid? GetGuid(int index)
diff --git a/test/Apache.Arrow.Scalars.Tests/VariantBuilderTests.cs
b/test/Apache.Arrow.Scalars.Tests/VariantBuilderTests.cs
index 2282859..8fe4c01 100644
--- a/test/Apache.Arrow.Scalars.Tests/VariantBuilderTests.cs
+++ b/test/Apache.Arrow.Scalars.Tests/VariantBuilderTests.cs
@@ -152,6 +152,21 @@ namespace Apache.Arrow.Scalars.Tests
Assert.Equal(guid, reader.GetUuid());
}
+ [Fact]
+ public void Encode_Uuid_ProducesRfc4122BigEndianValueBytes()
+ {
+ // Round-tripping alone can't catch an endian bug in WriteUuid if
+ // VariantReader has the mirror bug. Pin the wire format directly:
+ // TestVectors.PrimitiveUuid is the canonical encoding of
+ // "550e8400-e29b-41d4-a716-446655440000" — 0x50 header + 16 bytes
+ // in RFC 4122 big-endian order (first three fields big-endian,
+ // last 8 bytes as-is).
+ Guid guid = new Guid("550e8400-e29b-41d4-a716-446655440000");
+ (byte[] _, byte[] value) =
_builder.Encode(VariantValue.FromUuid(guid));
+
+ Assert.Equal(TestVectors.PrimitiveUuid.ToArray(), value);
+ }
+
[Fact]
public void Encode_Decimal4()
{