This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 15fca3d ARROW-5908: [C#] ArrowStreamWriter doesn't align buffers to 8 bytes 15fca3d is described below commit 15fca3d0169479fc4d21619b22515d95cd264fc5 Author: Eric Erhardt <eric.erha...@microsoft.com> AuthorDate: Thu Jul 11 17:05:53 2019 -0500 ARROW-5908: [C#] ArrowStreamWriter doesn't align buffers to 8 bytes Ensure 8-byte alignment on each buffer in a RecordBatch as specified in https://arrow.apache.org/docs/format/Layout.html#requirements-goals-and-non-goals >It is required to have all the contiguous memory buffers in an IPC payload aligned at 8-byte boundaries. In other words, each buffer must start at an aligned 8-byte offset. Additionally, each buffer should be padded to a multiple of 8 bytes. /cc @pgovind @stephentoub @imback82 @wesm - If possible, can we also include this patch in the next release (0.14.1 or 0.15.0)? We hit this issue trying to update .NET for Apache Spark to the latest Arrow release - https://github.com/dotnet/spark/pull/167. Author: Eric Erhardt <eric.erha...@microsoft.com> Closes #4851 from eerhardt/FixWriterPadding and squashes the following commits: 76807e938 <Eric Erhardt> PR feedback 7ecda78c6 <Eric Erhardt> Ensure 8-byte alignment on each buffer in a RecordBatch. --- csharp/src/Apache.Arrow/BitUtility.cs | 8 +++ csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs | 30 +++++++----- .../ArrowWriterBenchmark.cs | 57 ++++++++++++++++++++++ .../Apache.Arrow.Tests/ArrowStreamWriterTests.cs | 56 +++++++++++++++++++++ csharp/test/Apache.Arrow.Tests/TestData.cs | 50 +++++++++++-------- 5 files changed, 169 insertions(+), 32 deletions(-) diff --git a/csharp/src/Apache.Arrow/BitUtility.cs b/csharp/src/Apache.Arrow/BitUtility.cs index a5da46b..7d2cfbf 100644 --- a/csharp/src/Apache.Arrow/BitUtility.cs +++ b/csharp/src/Apache.Arrow/BitUtility.cs @@ -117,6 +117,14 @@ namespace Apache.Arrow RoundUpToMultiplePowerOfTwo(n, 64); /// <summary> + /// Rounds an integer to the nearest multiple of 8. + /// </summary> + /// <param name="n">Integer to round.</param> + /// <returns>Integer rounded to the nearest multiple of 8.</returns> + public static long RoundUpToMultipleOf8(long n) => + RoundUpToMultiplePowerOfTwo(n, 8); + + /// <summary> /// Rounds an integer up to the nearest multiple of factor, where /// factor must be a power of two. /// diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs index 8488175..e1da448 100644 --- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs +++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs @@ -45,17 +45,15 @@ namespace Apache.Arrow.Ipc IArrowArrayVisitor<StringArray>, IArrowArrayVisitor<BinaryArray> { - public struct Buffer + public readonly struct Buffer { public readonly ArrowBuffer DataBuffer; public readonly int Offset; - public readonly int Length; - public Buffer(ArrowBuffer buffer, int offset, int length) + public Buffer(ArrowBuffer buffer, int offset) { DataBuffer = buffer; Offset = offset; - Length = length; } } @@ -124,9 +122,10 @@ namespace Apache.Arrow.Ipc { var offset = TotalLength; - TotalLength += buffer.Length; + int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length)); + TotalLength += paddedLength; - return new Buffer(buffer, offset, buffer.Length); + return new Buffer(buffer, offset); } public void Visit(IArrowArray array) @@ -215,7 +214,7 @@ namespace Apache.Arrow.Ipc for (var i = buffers.Count - 1; i >= 0; i--) { Flatbuf.Buffer.CreateBuffer(Builder, - buffers[i].Offset, buffers[i].Length); + buffers[i].Offset, buffers[i].DataBuffer.Length); } var buffersVectorOffset = Builder.EndVector(); @@ -238,11 +237,20 @@ namespace Apache.Arrow.Ipc for (var i = 0; i < buffers.Count; i++) { - if (buffers[i].DataBuffer.IsEmpty) + ArrowBuffer buffer = buffers[i].DataBuffer; + if (buffer.IsEmpty) continue; - await WriteBufferAsync(buffers[i].DataBuffer, cancellationToken).ConfigureAwait(false); - bodyLength += buffers[i].DataBuffer.Length; + await WriteBufferAsync(buffer, cancellationToken).ConfigureAwait(false); + + int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length)); + int padding = paddedLength - buffer.Length; + if (padding > 0) + { + await WritePaddingAsync(padding).ConfigureAwait(false); + } + + bodyLength += paddedLength; } // Write padding so the record batch message body length is a multiple of 8 bytes @@ -332,7 +340,7 @@ namespace Apache.Arrow.Ipc where T: struct { var messageOffset = Flatbuf.Message.CreateMessage( - Builder, CurrentMetadataVersion, headerType, headerOffset.Value, + Builder, CurrentMetadataVersion, headerType, headerOffset.Value, bodyLength); Builder.Finish(messageOffset.Value); diff --git a/csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs b/csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs new file mode 100644 index 0000000..5b19486 --- /dev/null +++ b/csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Apache.Arrow.Ipc; +using Apache.Arrow.Tests; +using BenchmarkDotNet.Attributes; +using System.IO; +using System.Threading.Tasks; + +namespace Apache.Arrow.Benchmarks +{ + //[EtwProfiler] - needs elevated privileges + [MemoryDiagnoser] + public class ArrowWriterBenchmark + { + [Params(10_000, 1_000_000)] + public int BatchLength{ get; set; } + + [Params(10, 25)] + public int ColumnSetCount { get; set; } + + private MemoryStream _memoryStream; + private RecordBatch _batch; + + [GlobalSetup] + public void GlobalSetup() + { + _batch = TestData.CreateSampleRecordBatch(BatchLength, ColumnSetCount); + _memoryStream = new MemoryStream(); + } + + [IterationSetup] + public void Setup() + { + _memoryStream.Position = 0; + } + + [Benchmark] + public async Task WriteBatch() + { + ArrowStreamWriter writer = new ArrowStreamWriter(_memoryStream, _batch.Schema); + await writer.WriteRecordBatchAsync(_batch); + } + } +} diff --git a/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs index 83a97f3..06be8bd 100644 --- a/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs +++ b/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs @@ -14,6 +14,7 @@ // limitations under the License. using Apache.Arrow.Ipc; +using Apache.Arrow.Types; using System; using System.IO; using System.Linq; @@ -139,5 +140,60 @@ namespace Apache.Arrow.Tests } } } + + [Fact] + public async Task WriteBatchWithCorrectPadding() + { + byte value1 = 0x04; + byte value2 = 0x14; + var batch = new RecordBatch( + new Schema.Builder() + .Field(f => f.Name("age").DataType(Int32Type.Default)) + .Field(f => f.Name("characterCount").DataType(Int32Type.Default)) + .Build(), + new IArrowArray[] + { + new Int32Array( + new ArrowBuffer(new byte[] { value1, value1, 0x00, 0x00 }), + ArrowBuffer.Empty, + length: 1, + nullCount: 0, + offset: 0), + new Int32Array( + new ArrowBuffer(new byte[] { value2, value2, 0x00, 0x00 }), + ArrowBuffer.Empty, + length: 1, + nullCount: 0, + offset: 0) + }, + length: 1); + + await TestRoundTripRecordBatch(batch); + + using (MemoryStream stream = new MemoryStream()) + { + using (var writer = new ArrowStreamWriter(stream, batch.Schema, leaveOpen: true)) + { + await writer.WriteRecordBatchAsync(batch); + } + + byte[] writtenBytes = stream.ToArray(); + + // ensure that the data buffers at the end are 8-byte aligned + Assert.Equal(value1, writtenBytes[writtenBytes.Length - 16]); + Assert.Equal(value1, writtenBytes[writtenBytes.Length - 15]); + for (int i = 14; i > 8; i--) + { + Assert.Equal(0, writtenBytes[writtenBytes.Length - i]); + } + + Assert.Equal(value2, writtenBytes[writtenBytes.Length - 8]); + Assert.Equal(value2, writtenBytes[writtenBytes.Length - 7]); + for (int i = 6; i > 0; i--) + { + Assert.Equal(0, writtenBytes[writtenBytes.Length - i]); + } + } + } } } diff --git a/csharp/test/Apache.Arrow.Tests/TestData.cs b/csharp/test/Apache.Arrow.Tests/TestData.cs index 1bc046d..15774a7 100644 --- a/csharp/test/Apache.Arrow.Tests/TestData.cs +++ b/csharp/test/Apache.Arrow.Tests/TestData.cs @@ -23,26 +23,34 @@ namespace Apache.Arrow.Tests { public static RecordBatch CreateSampleRecordBatch(int length) { + return CreateSampleRecordBatch(length, columnSetCount: 1); + } + + public static RecordBatch CreateSampleRecordBatch(int length, int columnSetCount) + { Schema.Builder builder = new Schema.Builder(); - builder.Field(CreateField(BooleanType.Default)); - builder.Field(CreateField(UInt8Type.Default)); - builder.Field(CreateField(Int8Type.Default)); - builder.Field(CreateField(UInt16Type.Default)); - builder.Field(CreateField(Int16Type.Default)); - builder.Field(CreateField(UInt32Type.Default)); - builder.Field(CreateField(Int32Type.Default)); - builder.Field(CreateField(UInt64Type.Default)); - builder.Field(CreateField(Int64Type.Default)); - builder.Field(CreateField(FloatType.Default)); - builder.Field(CreateField(DoubleType.Default)); - //builder.Field(CreateField(new DecimalType(19, 2))); - //builder.Field(CreateField(HalfFloatType.Default)); - //builder.Field(CreateField(StringType.Default)); - //builder.Field(CreateField(Date32Type.Default)); - //builder.Field(CreateField(Date64Type.Default)); - //builder.Field(CreateField(Time32Type.Default)); - //builder.Field(CreateField(Time64Type.Default)); - //builder.Field(CreateField(TimestampType.Default)); + for (int i = 0; i < columnSetCount; i++) + { + builder.Field(CreateField(BooleanType.Default, i)); + builder.Field(CreateField(UInt8Type.Default, i)); + builder.Field(CreateField(Int8Type.Default, i)); + builder.Field(CreateField(UInt16Type.Default, i)); + builder.Field(CreateField(Int16Type.Default, i)); + builder.Field(CreateField(UInt32Type.Default, i)); + builder.Field(CreateField(Int32Type.Default, i)); + builder.Field(CreateField(UInt64Type.Default, i)); + builder.Field(CreateField(Int64Type.Default, i)); + builder.Field(CreateField(FloatType.Default, i)); + builder.Field(CreateField(DoubleType.Default, i)); + //builder.Field(CreateField(new DecimalType(19, 2))); + //builder.Field(CreateField(HalfFloatType.Default)); + //builder.Field(CreateField(StringType.Default)); + //builder.Field(CreateField(Date32Type.Default)); + //builder.Field(CreateField(Date64Type.Default)); + //builder.Field(CreateField(Time32Type.Default)); + //builder.Field(CreateField(Time64Type.Default)); + //builder.Field(CreateField(TimestampType.Default)); + } Schema schema = builder.Build(); @@ -51,9 +59,9 @@ namespace Apache.Arrow.Tests return new RecordBatch(schema, arrays, length); } - private static Field CreateField(ArrowType type) + private static Field CreateField(ArrowType type, int iteration) { - return new Field(type.Name, type, nullable: false); + return new Field(type.Name + iteration, type, nullable: false); } private static IEnumerable<IArrowArray> CreateArrays(Schema schema, int length)