This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-dotnet.git
The following commit(s) were added to refs/heads/main by this push:
new 1185f99 Implement Arrow C device API (#305)
1185f99 is described below
commit 1185f99410565baa7b7f302371d9efc99efc152a
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Wed Apr 1 23:05:51 2026 -0700
Implement Arrow C device API (#305)
## What's Changed
Implement Arrow C device API
Closes #299.
---
src/Apache.Arrow/Apache.Arrow.csproj | 3 +
src/Apache.Arrow/C/ArrowDeviceType.cs | 42 +++
src/Apache.Arrow/C/CArrowDeviceArray.cs | 69 ++++
src/Apache.Arrow/C/CArrowDeviceArrayExporter.cs | 86 +++++
src/Apache.Arrow/C/CArrowDeviceArrayImporter.cs | 75 +++++
src/Apache.Arrow/C/CArrowDeviceArrayStream.cs | 124 ++++++++
.../C/CArrowDeviceArrayStreamExporter.cs | 211 +++++++++++++
.../C/CArrowDeviceArrayStreamImporter.cs | 170 ++++++++++
.../Extensions/ExperimentalAttribute.Polyfill.cs | 36 +++
.../CDataInterfacePythonTests.cs | 62 +---
.../CDeviceDataInterfacePythonTests.cs | 280 ++++++++++++++++
.../CDeviceDataInterfaceTests.cs | 351 +++++++++++++++++++++
test/Apache.Arrow.Tests/PythonNetCollection.cs | 28 ++
test/Apache.Arrow.Tests/PythonNetFixture.cs | 98 ++++++
14 files changed, 1577 insertions(+), 58 deletions(-)
diff --git a/src/Apache.Arrow/Apache.Arrow.csproj
b/src/Apache.Arrow/Apache.Arrow.csproj
index 62725d0..f681a16 100644
--- a/src/Apache.Arrow/Apache.Arrow.csproj
+++ b/src/Apache.Arrow/Apache.Arrow.csproj
@@ -53,4 +53,7 @@
<!-- Code targeting .NET 5+ should use [UnmanagedCallersOnly]. -->
<Compile Remove="C\NativeDelegate.cs" />
</ItemGroup>
+ <ItemGroup
Condition="$([MSBuild]::IsTargetFrameworkCompatible($(TargetFramework),
'net8.0'))">
+ <Compile Remove="Extensions\ExperimentalAttribute.Polyfill.cs" />
+ </ItemGroup>
</Project>
diff --git a/src/Apache.Arrow/C/ArrowDeviceType.cs
b/src/Apache.Arrow/C/ArrowDeviceType.cs
new file mode 100644
index 0000000..55b71ab
--- /dev/null
+++ b/src/Apache.Arrow/C/ArrowDeviceType.cs
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+namespace Apache.Arrow.C
+{
+ /// <summary>
+ /// Device type constants for the Arrow C Device Data Interface.
+ /// </summary>
+ /// <remarks>
+ /// These values match the ArrowDeviceType enum defined in
+ /// https://arrow.apache.org/docs/format/CDeviceDataInterface.html.
+ /// </remarks>
+ public enum ArrowDeviceType : int
+ {
+ Cpu = 1,
+ Cuda = 2,
+ CudaHost = 3,
+ OpenCL = 4,
+ Vulkan = 7,
+ Metal = 8,
+ Vpi = 9,
+ RocM = 10,
+ RocMHost = 11,
+ ExtDev = 12,
+ CudaManaged = 13,
+ OneApi = 14,
+ WebGpu = 15,
+ Hexagon = 16,
+ }
+}
diff --git a/src/Apache.Arrow/C/CArrowDeviceArray.cs
b/src/Apache.Arrow/C/CArrowDeviceArray.cs
new file mode 100644
index 0000000..3dbde1f
--- /dev/null
+++ b/src/Apache.Arrow/C/CArrowDeviceArray.cs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Diagnostics.CodeAnalysis;
+using System.Runtime.InteropServices;
+
+namespace Apache.Arrow.C
+{
+ /// <summary>
+ /// An Arrow C Device Data Interface ArrowDeviceArray, which represents an
exported array
+ /// along with the device on which the array data resides.
+ /// </summary>
+ /// <remarks>
+ /// This is used to export <see cref="IArrowArray"/> or <see
cref="RecordBatch"/> with device
+ /// information to other languages. It matches the layout of the
ArrowDeviceArray struct
+ /// described in
https://arrow.apache.org/docs/format/CDeviceDataInterface.html.
+ /// </remarks>
+ [StructLayout(LayoutKind.Sequential)]
+ [Experimental("ArrowDeviceDataApi")]
+ public unsafe struct CArrowDeviceArray
+ {
+ public CArrowArray array;
+ public long device_id;
+ public ArrowDeviceType device_type;
+ // 4 bytes implicit padding on 64-bit (matches C layout)
+ public void* sync_event;
+ private fixed long reserved[3];
+
+ /// <summary>
+ /// Allocate and zero-initialize an unmanaged pointer of this type.
+ /// </summary>
+ /// <remarks>
+ /// This pointer must later be freed by <see cref="Free"/>.
+ /// </remarks>
+ public static CArrowDeviceArray* Create()
+ {
+ var ptr =
(CArrowDeviceArray*)Marshal.AllocHGlobal(sizeof(CArrowDeviceArray));
+
+ *ptr = default;
+
+ return ptr;
+ }
+
+ /// <summary>
+ /// Free a pointer that was allocated in <see cref="Create"/>.
+ /// </summary>
+ /// <remarks>
+ /// Do not call this on a pointer that was allocated elsewhere.
+ /// </remarks>
+ public static void Free(CArrowDeviceArray* deviceArray)
+ {
+ CArrowArray.CallReleaseFunc(&deviceArray->array);
+ Marshal.FreeHGlobal((IntPtr)deviceArray);
+ }
+ }
+}
diff --git a/src/Apache.Arrow/C/CArrowDeviceArrayExporter.cs
b/src/Apache.Arrow/C/CArrowDeviceArrayExporter.cs
new file mode 100644
index 0000000..05dfabb
--- /dev/null
+++ b/src/Apache.Arrow/C/CArrowDeviceArrayExporter.cs
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Diagnostics.CodeAnalysis;
+
+namespace Apache.Arrow.C
+{
+ [Experimental("ArrowDeviceDataApi")]
+ public static class CArrowDeviceArrayExporter
+ {
+ /// <summary>
+ /// Export an <see cref="IArrowArray"/> to a <see
cref="CArrowDeviceArray"/>. The exported array
+ /// shares the underlying buffers via reference counting, so the
original array remains valid
+ /// after export.
+ /// </summary>
+ /// <param name="array">The array to export</param>
+ /// <param name="deviceArray">An allocated but uninitialized
CArrowDeviceArray pointer.</param>
+ /// <example>
+ /// <code>
+ /// CArrowDeviceArray* exportPtr = CArrowDeviceArray.Create();
+ /// CArrowDeviceArrayExporter.ExportArray(array, exportPtr);
+ /// foreign_import_function(exportPtr);
+ /// </code>
+ /// </example>
+ public static unsafe void ExportArray(IArrowArray array,
CArrowDeviceArray* deviceArray)
+ {
+ if (array == null)
+ {
+ throw new ArgumentNullException(nameof(array));
+ }
+ if (deviceArray == null)
+ {
+ throw new ArgumentNullException(nameof(deviceArray));
+ }
+
+ CArrowArrayExporter.ExportArray(array, &deviceArray->array);
+ deviceArray->device_type = ArrowDeviceType.Cpu;
+ deviceArray->device_id = -1;
+ deviceArray->sync_event = null;
+ }
+
+ /// <summary>
+ /// Export a <see cref="RecordBatch"/> to a <see
cref="CArrowDeviceArray"/>. The exported record
+ /// batch shares the underlying buffers via reference counting, so the
original batch remains
+ /// valid after export.
+ /// </summary>
+ /// <param name="batch">The record batch to export</param>
+ /// <param name="deviceArray">An allocated but uninitialized
CArrowDeviceArray pointer.</param>
+ /// <example>
+ /// <code>
+ /// CArrowDeviceArray* exportPtr = CArrowDeviceArray.Create();
+ /// CArrowDeviceArrayExporter.ExportRecordBatch(batch, exportPtr);
+ /// foreign_import_function(exportPtr);
+ /// </code>
+ /// </example>
+ public static unsafe void ExportRecordBatch(RecordBatch batch,
CArrowDeviceArray* deviceArray)
+ {
+ if (batch == null)
+ {
+ throw new ArgumentNullException(nameof(batch));
+ }
+ if (deviceArray == null)
+ {
+ throw new ArgumentNullException(nameof(deviceArray));
+ }
+
+ CArrowArrayExporter.ExportRecordBatch(batch, &deviceArray->array);
+ deviceArray->device_type = ArrowDeviceType.Cpu;
+ deviceArray->device_id = -1;
+ deviceArray->sync_event = null;
+ }
+ }
+}
diff --git a/src/Apache.Arrow/C/CArrowDeviceArrayImporter.cs
b/src/Apache.Arrow/C/CArrowDeviceArrayImporter.cs
new file mode 100644
index 0000000..f875006
--- /dev/null
+++ b/src/Apache.Arrow/C/CArrowDeviceArrayImporter.cs
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Diagnostics.CodeAnalysis;
+using Apache.Arrow.Types;
+
+namespace Apache.Arrow.C
+{
+ [Experimental("ArrowDeviceDataApi")]
+ public static class CArrowDeviceArrayImporter
+ {
+ /// <summary>
+ /// Import C pointer as an <see cref="IArrowArray"/>.
+ /// </summary>
+ /// <remarks>
+ /// This will call the release callback once all of the buffers in the
returned
+ /// IArrowArray are disposed. Only CPU device arrays are supported.
+ /// </remarks>
+ /// <param name="ptr">The pointer to the device array being
imported</param>
+ /// <param name="type">The type of the array being imported</param>
+ /// <returns>The imported C# array</returns>
+ public static unsafe IArrowArray ImportArray(CArrowDeviceArray* ptr,
IArrowType type)
+ {
+ if (ptr == null)
+ {
+ throw new ArgumentNullException(nameof(ptr));
+ }
+ if (ptr->device_type != ArrowDeviceType.Cpu)
+ {
+ throw new NotSupportedException(
+ $"Importing arrays from device type {ptr->device_type} is
not supported. Only CPU arrays can be imported.");
+ }
+
+ return CArrowArrayImporter.ImportArray(&ptr->array, type);
+ }
+
+ /// <summary>
+ /// Import C pointer as a <see cref="RecordBatch"/>.
+ /// </summary>
+ /// <remarks>
+ /// This will call the release callback once all of the buffers in the
returned
+ /// RecordBatch are disposed. Only CPU device arrays are supported.
+ /// </remarks>
+ /// <param name="ptr">The pointer to the device array being
imported</param>
+ /// <param name="schema">The schema of the record batch being
imported</param>
+ /// <returns>The imported C# record batch</returns>
+ public static unsafe RecordBatch ImportRecordBatch(CArrowDeviceArray*
ptr, Schema schema)
+ {
+ if (ptr == null)
+ {
+ throw new ArgumentNullException(nameof(ptr));
+ }
+ if (ptr->device_type != ArrowDeviceType.Cpu)
+ {
+ throw new NotSupportedException(
+ $"Importing record batches from device type
{ptr->device_type} is not supported. Only CPU arrays can be imported.");
+ }
+
+ return CArrowArrayImporter.ImportRecordBatch(&ptr->array, schema);
+ }
+ }
+}
diff --git a/src/Apache.Arrow/C/CArrowDeviceArrayStream.cs
b/src/Apache.Arrow/C/CArrowDeviceArrayStream.cs
new file mode 100644
index 0000000..45461aa
--- /dev/null
+++ b/src/Apache.Arrow/C/CArrowDeviceArrayStream.cs
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Diagnostics.CodeAnalysis;
+using System.Runtime.InteropServices;
+
+namespace Apache.Arrow.C
+{
+ /// <summary>
+ /// An Arrow C Device Data Interface ArrowDeviceArrayStream, which
represents a stream
+ /// of record batches with device information.
+ /// </summary>
+ /// <remarks>
+ /// This is used to export <see cref="Ipc.IArrowArrayStream"/> with device
information
+ /// to other languages. It matches the layout of the
ArrowDeviceArrayStream struct
+ /// described in
https://arrow.apache.org/docs/format/CDeviceDataInterface.html.
+ /// </remarks>
+ [StructLayout(LayoutKind.Sequential)]
+ [Experimental("ArrowDeviceDataApi")]
+ public unsafe struct CArrowDeviceArrayStream
+ {
+ public ArrowDeviceType device_type;
+
+ /// <summary>
+ /// Callback to get the stream schema. Will be the same for all arrays
in the stream.
+ /// If successful, the ArrowSchema must be released independently from
the stream.
+ ///
+ /// Return value: 0 if successful, an `errno`-compatible error code
otherwise.
+ /// </summary>
+#if NET5_0_OR_GREATER
+ internal delegate* unmanaged<CArrowDeviceArrayStream*, CArrowSchema*,
int> get_schema;
+#else
+ internal IntPtr get_schema;
+#endif
+
+ /// <summary>
+ /// Callback to get the next device array. If no error and the array
is released,
+ /// the stream has ended. If successful, the ArrowDeviceArray must be
released
+ /// independently from the stream.
+ ///
+ /// Return value: 0 if successful, an `errno`-compatible error code
otherwise.
+ /// </summary>
+#if NET5_0_OR_GREATER
+ internal delegate* unmanaged<CArrowDeviceArrayStream*,
CArrowDeviceArray*, int> get_next;
+#else
+ internal IntPtr get_next;
+#endif
+
+ /// <summary>
+ /// Callback to get optional detailed error information. This must only
+ /// be called if the last stream operation failed with a non-0 return
code.
+ /// The returned pointer is only valid until the next operation on
this stream
+ /// (including release).
+ ///
+ /// Return value: pointer to a null-terminated character array
describing the last
+ /// error, or NULL if no description is available.
+ /// </summary>
+#if NET5_0_OR_GREATER
+ internal delegate* unmanaged<CArrowDeviceArrayStream*, byte*>
get_last_error;
+#else
+ internal IntPtr get_last_error;
+#endif
+
+ /// <summary>
+ /// Release callback: release the stream's own resources. Note that
arrays returned by
+ /// get_next must be individually released.
+ /// </summary>
+#if NET5_0_OR_GREATER
+ internal delegate* unmanaged<CArrowDeviceArrayStream*, void> release;
+#else
+ internal IntPtr release;
+#endif
+
+ public void* private_data;
+
+ /// <summary>
+ /// Allocate and zero-initialize an unmanaged pointer of this type.
+ /// </summary>
+ /// <remarks>
+ /// This pointer must later be freed by <see cref="Free"/>.
+ /// </remarks>
+ public static CArrowDeviceArrayStream* Create()
+ {
+ var ptr =
(CArrowDeviceArrayStream*)Marshal.AllocHGlobal(sizeof(CArrowDeviceArrayStream));
+
+ *ptr = default;
+
+ return ptr;
+ }
+
+ /// <summary>
+ /// Free a pointer that was allocated in <see cref="Create"/>.
+ /// </summary>
+ /// <remarks>
+ /// Do not call this on a pointer that was allocated elsewhere.
+ /// </remarks>
+ public static void Free(CArrowDeviceArrayStream* stream)
+ {
+ if (stream->release != default)
+ {
+ // Call release if not already called.
+#if NET5_0_OR_GREATER
+ stream->release(stream);
+#else
+
Marshal.GetDelegateForFunctionPointer<CArrowDeviceArrayStreamExporter.ReleaseDeviceArrayStream>(stream->release)(stream);
+#endif
+ }
+ Marshal.FreeHGlobal((IntPtr)stream);
+ }
+ }
+}
diff --git a/src/Apache.Arrow/C/CArrowDeviceArrayStreamExporter.cs
b/src/Apache.Arrow/C/CArrowDeviceArrayStreamExporter.cs
new file mode 100644
index 0000000..d695efb
--- /dev/null
+++ b/src/Apache.Arrow/C/CArrowDeviceArrayStreamExporter.cs
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Diagnostics.CodeAnalysis;
+using System.Runtime.InteropServices;
+using Apache.Arrow.Ipc;
+
+namespace Apache.Arrow.C
+{
+ [Experimental("ArrowDeviceDataApi")]
+ public static class CArrowDeviceArrayStreamExporter
+ {
+#if NET5_0_OR_GREATER
+ private static unsafe delegate* unmanaged<CArrowDeviceArrayStream*,
CArrowSchema*, int> GetSchemaPtr => &GetSchema;
+ private static unsafe delegate* unmanaged<CArrowDeviceArrayStream*,
CArrowDeviceArray*, int> GetNextPtr => &GetNext;
+ private static unsafe delegate* unmanaged<CArrowDeviceArrayStream*,
byte*> GetLastErrorPtr => &GetLastError;
+ private static unsafe delegate* unmanaged<CArrowDeviceArrayStream*,
void> ReleasePtr => &Release;
+#else
+ internal unsafe delegate int
GetSchemaDeviceArrayStream(CArrowDeviceArrayStream* cDeviceArrayStream,
CArrowSchema* cSchema);
+ private static unsafe NativeDelegate<GetSchemaDeviceArrayStream>
s_getSchema = new NativeDelegate<GetSchemaDeviceArrayStream>(GetSchema);
+ private static unsafe IntPtr GetSchemaPtr => s_getSchema.Pointer;
+ internal unsafe delegate int
GetNextDeviceArrayStream(CArrowDeviceArrayStream* cDeviceArrayStream,
CArrowDeviceArray* cDeviceArray);
+ private static unsafe NativeDelegate<GetNextDeviceArrayStream>
s_getNext = new NativeDelegate<GetNextDeviceArrayStream>(GetNext);
+ private static unsafe IntPtr GetNextPtr => s_getNext.Pointer;
+ internal unsafe delegate byte*
GetLastErrorDeviceArrayStream(CArrowDeviceArrayStream* cDeviceArrayStream);
+ private static unsafe NativeDelegate<GetLastErrorDeviceArrayStream>
s_getLastError = new
NativeDelegate<GetLastErrorDeviceArrayStream>(GetLastError);
+ private static unsafe IntPtr GetLastErrorPtr => s_getLastError.Pointer;
+ internal unsafe delegate void
ReleaseDeviceArrayStream(CArrowDeviceArrayStream* cDeviceArrayStream);
+ private static unsafe NativeDelegate<ReleaseDeviceArrayStream>
s_release = new NativeDelegate<ReleaseDeviceArrayStream>(Release);
+ private static unsafe IntPtr ReleasePtr => s_release.Pointer;
+#endif
+
+ /// <summary>
+ /// Export an <see cref="IArrowArrayStream"/> to a <see
cref="CArrowDeviceArrayStream"/>.
+ /// </summary>
+ /// <param name="arrayStream">The array stream to export</param>
+ /// <param name="deviceArrayStream">An allocated but uninitialized
CArrowDeviceArrayStream pointer.</param>
+ /// <example>
+ /// <code>
+ /// CArrowDeviceArrayStream* exportPtr =
CArrowDeviceArrayStream.Create();
+ /// CArrowDeviceArrayStreamExporter.ExportArrayStream(arrayStream,
exportPtr);
+ /// foreign_import_function(exportPtr);
+ /// </code>
+ /// </example>
+ public static unsafe void ExportArrayStream(IArrowArrayStream
arrayStream, CArrowDeviceArrayStream* deviceArrayStream)
+ {
+ if (arrayStream == null)
+ {
+ throw new ArgumentNullException(nameof(arrayStream));
+ }
+ if (deviceArrayStream == null)
+ {
+ throw new ArgumentNullException(nameof(deviceArrayStream));
+ }
+
+ deviceArrayStream->device_type = ArrowDeviceType.Cpu;
+ deviceArrayStream->private_data =
ExportedDeviceArrayStream.Export(arrayStream);
+ deviceArrayStream->get_schema = GetSchemaPtr;
+ deviceArrayStream->get_next = GetNextPtr;
+ deviceArrayStream->get_last_error = GetLastErrorPtr;
+ deviceArrayStream->release = ReleasePtr;
+ }
+
+#if NET5_0_OR_GREATER
+ [UnmanagedCallersOnly]
+#endif
+ private unsafe static int GetSchema(CArrowDeviceArrayStream*
cDeviceArrayStream, CArrowSchema* cSchema)
+ {
+ ExportedDeviceArrayStream stream = null;
+ try
+ {
+ stream =
ExportedDeviceArrayStream.FromPointer(cDeviceArrayStream->private_data);
+
CArrowSchemaExporter.ExportSchema(stream.ArrowArrayStream.Schema, cSchema);
+ return stream.ClearError();
+ }
+ catch (Exception ex)
+ {
+ return stream?.SetError(ex) ??
ExportedDeviceArrayStream.EOTHER;
+ }
+ }
+
+#if NET5_0_OR_GREATER
+ [UnmanagedCallersOnly]
+#endif
+ private unsafe static int GetNext(CArrowDeviceArrayStream*
cDeviceArrayStream, CArrowDeviceArray* cDeviceArray)
+ {
+ ExportedDeviceArrayStream stream = null;
+ try
+ {
+ cDeviceArray->array.release = default;
+ stream =
ExportedDeviceArrayStream.FromPointer(cDeviceArrayStream->private_data);
+ RecordBatch recordBatch =
stream.ArrowArrayStream.ReadNextRecordBatchAsync().Result;
+ if (recordBatch != null)
+ {
+ CArrowDeviceArrayExporter.ExportRecordBatch(recordBatch,
cDeviceArray);
+ }
+ return stream.ClearError();
+ }
+ catch (Exception ex)
+ {
+ return stream?.SetError(ex) ??
ExportedDeviceArrayStream.EOTHER;
+ }
+ }
+
+#if NET5_0_OR_GREATER
+ [UnmanagedCallersOnly]
+#endif
+ private unsafe static byte* GetLastError(CArrowDeviceArrayStream*
cDeviceArrayStream)
+ {
+ try
+ {
+ ExportedDeviceArrayStream stream =
ExportedDeviceArrayStream.FromPointer(cDeviceArrayStream->private_data);
+ return stream.LastError;
+ }
+ catch (Exception)
+ {
+ return null;
+ }
+ }
+
+#if NET5_0_OR_GREATER
+ [UnmanagedCallersOnly]
+#endif
+ private unsafe static void Release(CArrowDeviceArrayStream*
cDeviceArrayStream)
+ {
+ ExportedDeviceArrayStream.Free(&cDeviceArrayStream->private_data);
+ cDeviceArrayStream->release = default;
+ }
+
+ sealed unsafe class ExportedDeviceArrayStream : IDisposable
+ {
+ public const int EOTHER = 131;
+
+ ExportedDeviceArrayStream(IArrowArrayStream arrayStream)
+ {
+ ArrowArrayStream = arrayStream;
+ LastError = null;
+ }
+
+ public IArrowArrayStream ArrowArrayStream { get; private set; }
+ public byte* LastError { get; private set; }
+
+ public static void* Export(IArrowArrayStream arrayStream)
+ {
+ ExportedDeviceArrayStream result = new
ExportedDeviceArrayStream(arrayStream);
+ GCHandle gch = GCHandle.Alloc(result);
+ return (void*)GCHandle.ToIntPtr(gch);
+ }
+
+ public static void Free(void** ptr)
+ {
+ GCHandle gch = GCHandle.FromIntPtr((IntPtr)(*ptr));
+ if (!gch.IsAllocated)
+ {
+ return;
+ }
+ ((ExportedDeviceArrayStream)gch.Target).Dispose();
+ gch.Free();
+ *ptr = null;
+ }
+
+ public static ExportedDeviceArrayStream FromPointer(void* ptr)
+ {
+ GCHandle gch = GCHandle.FromIntPtr((IntPtr)ptr);
+ return (ExportedDeviceArrayStream)gch.Target;
+ }
+
+ public int SetError(Exception ex)
+ {
+ ReleaseLastError();
+ LastError = StringUtil.ToCStringUtf8(ex.Message);
+ return EOTHER;
+ }
+
+ public int ClearError()
+ {
+ ReleaseLastError();
+ return 0;
+ }
+
+ public void Dispose()
+ {
+ ReleaseLastError();
+ ArrowArrayStream?.Dispose();
+ ArrowArrayStream = null;
+ }
+
+ void ReleaseLastError()
+ {
+ if (LastError != null)
+ {
+ Marshal.FreeHGlobal((IntPtr)LastError);
+ LastError = null;
+ }
+ }
+ }
+ }
+}
diff --git a/src/Apache.Arrow/C/CArrowDeviceArrayStreamImporter.cs
b/src/Apache.Arrow/C/CArrowDeviceArrayStreamImporter.cs
new file mode 100644
index 0000000..793ae43
--- /dev/null
+++ b/src/Apache.Arrow/C/CArrowDeviceArrayStreamImporter.cs
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Diagnostics.CodeAnalysis;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+
+namespace Apache.Arrow.C
+{
+ [Experimental("ArrowDeviceDataApi")]
+ public static class CArrowDeviceArrayStreamImporter
+ {
+ /// <summary>
+ /// Import C pointer as an <see cref="IArrowArrayStream"/>.
+ /// </summary>
+ /// <remarks>
+ /// This will call the release callback on the passed struct if the
function fails.
+ /// Otherwise, the release callback is called when the
IArrowArrayStream is disposed.
+ /// Only CPU device streams are supported.
+ /// </remarks>
+ /// <param name="ptr">The pointer to the device array stream being
imported</param>
+ /// <returns>The imported C# array stream</returns>
+ public static unsafe IArrowArrayStream
ImportDeviceArrayStream(CArrowDeviceArrayStream* ptr)
+ {
+ if (ptr == null)
+ {
+ throw new ArgumentNullException(nameof(ptr));
+ }
+ if (ptr->device_type != ArrowDeviceType.Cpu)
+ {
+ throw new NotSupportedException(
+ $"Importing device array streams from device type
{ptr->device_type} is not supported. Only CPU streams can be imported.");
+ }
+
+ return new ImportedArrowDeviceArrayStream(ptr);
+ }
+
+ private sealed unsafe class ImportedArrowDeviceArrayStream :
IArrowArrayStream
+ {
+ private readonly CArrowDeviceArrayStream _cDeviceArrayStream;
+ private readonly Schema _schema;
+ private bool _disposed;
+
+ internal static string GetLastError(CArrowDeviceArrayStream*
stream, int errno)
+ {
+ byte* error = null;
+
+ if (stream->get_last_error != default)
+ {
+#if NET5_0_OR_GREATER
+ error = stream->get_last_error(stream);
+#else
+ error =
Marshal.GetDelegateForFunctionPointer<CArrowDeviceArrayStreamExporter.GetLastErrorDeviceArrayStream>(stream->get_last_error)(stream);
+#endif
+ }
+
+ if (error == null)
+ {
+ return $"Device array stream operation failed with no
message. Error code: {errno}";
+ }
+
+ return StringUtil.PtrToStringUtf8(error);
+ }
+
+ public ImportedArrowDeviceArrayStream(CArrowDeviceArrayStream*
cDeviceArrayStream)
+ {
+ if (cDeviceArrayStream == null)
+ {
+ throw new
ArgumentNullException(nameof(cDeviceArrayStream));
+ }
+ if (cDeviceArrayStream->release == default)
+ {
+ throw new ArgumentException("Tried to import a device
array stream that has already been released.", nameof(cDeviceArrayStream));
+ }
+ if (cDeviceArrayStream->get_schema == default ||
cDeviceArrayStream->get_next == default)
+ {
+ throw new ArgumentException("Tried to import a device
array stream with a null function pointer.", nameof(cDeviceArrayStream));
+ }
+
+ CArrowSchema cSchema = new CArrowSchema();
+#if NET5_0_OR_GREATER
+ int errno = cDeviceArrayStream->get_schema(cDeviceArrayStream,
&cSchema);
+#else
+ int errno =
Marshal.GetDelegateForFunctionPointer<CArrowDeviceArrayStreamExporter.GetSchemaDeviceArrayStream>(cDeviceArrayStream->get_schema)(cDeviceArrayStream,
&cSchema);
+#endif
+ if (errno != 0)
+ {
+ throw new Exception(GetLastError(cDeviceArrayStream,
errno));
+ }
+ _schema = CArrowSchemaImporter.ImportSchema(&cSchema);
+
+ _cDeviceArrayStream = *cDeviceArrayStream;
+ cDeviceArrayStream->release = default;
+ }
+
+ ~ImportedArrowDeviceArrayStream()
+ {
+ Dispose();
+ }
+
+ public Schema Schema => _schema;
+
+ public ValueTask<RecordBatch>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+ {
+ if (_disposed)
+ {
+ throw new
ObjectDisposedException(typeof(ImportedArrowDeviceArrayStream).Name);
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return
new(Task.FromCanceled<RecordBatch>(cancellationToken));
+ }
+
+ RecordBatch result = null;
+ CArrowDeviceArray cDeviceArray = new CArrowDeviceArray();
+ fixed (CArrowDeviceArrayStream* cDeviceArrayStream =
&_cDeviceArrayStream)
+ {
+#if NET5_0_OR_GREATER
+ int errno =
cDeviceArrayStream->get_next(cDeviceArrayStream, &cDeviceArray);
+#else
+ int errno =
Marshal.GetDelegateForFunctionPointer<CArrowDeviceArrayStreamExporter.GetNextDeviceArrayStream>(cDeviceArrayStream->get_next)(cDeviceArrayStream,
&cDeviceArray);
+#endif
+ if (errno != 0)
+ {
+ return new(Task.FromException<RecordBatch>(new
Exception(GetLastError(cDeviceArrayStream, errno))));
+ }
+ if (cDeviceArray.array.release != default)
+ {
+ result =
CArrowDeviceArrayImporter.ImportRecordBatch(&cDeviceArray, _schema);
+ }
+ }
+
+ return new ValueTask<RecordBatch>(result);
+ }
+
+ public void Dispose()
+ {
+ if (!_disposed && _cDeviceArrayStream.release != default)
+ {
+ _disposed = true;
+ fixed (CArrowDeviceArrayStream* cDeviceArrayStream =
&_cDeviceArrayStream)
+ {
+#if NET5_0_OR_GREATER
+ cDeviceArrayStream->release(cDeviceArrayStream);
+#else
+
Marshal.GetDelegateForFunctionPointer<CArrowDeviceArrayStreamExporter.ReleaseDeviceArrayStream>(cDeviceArrayStream->release)(cDeviceArrayStream);
+#endif
+ }
+ }
+ GC.SuppressFinalize(this);
+ }
+ }
+ }
+}
diff --git a/src/Apache.Arrow/Extensions/ExperimentalAttribute.Polyfill.cs
b/src/Apache.Arrow/Extensions/ExperimentalAttribute.Polyfill.cs
new file mode 100644
index 0000000..ce6d0d5
--- /dev/null
+++ b/src/Apache.Arrow/Extensions/ExperimentalAttribute.Polyfill.cs
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+namespace System.Diagnostics.CodeAnalysis
+{
+ [AttributeUsage(
+ AttributeTargets.Assembly | AttributeTargets.Module |
+ AttributeTargets.Class | AttributeTargets.Struct |
AttributeTargets.Enum |
+ AttributeTargets.Constructor | AttributeTargets.Method |
AttributeTargets.Property |
+ AttributeTargets.Field | AttributeTargets.Event |
AttributeTargets.Interface |
+ AttributeTargets.Delegate,
+ Inherited = false)]
+ public sealed class ExperimentalAttribute : Attribute
+ {
+ public ExperimentalAttribute(string diagnosticId)
+ {
+ DiagnosticId = diagnosticId;
+ }
+
+ public string DiagnosticId { get; }
+ public string UrlFormat { get; set; }
+ public string Message { get; set; }
+ }
+}
diff --git a/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs
b/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs
index 427ce44..843be18 100644
--- a/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs
+++ b/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs
@@ -30,66 +30,12 @@ using Xunit;
namespace Apache.Arrow.Tests
{
- public class CDataSchemaPythonTest :
IClassFixture<CDataSchemaPythonTest.PythonNet>
+ [Collection("PythonNet")]
+ public class CDataSchemaPythonTest
{
- public class PythonNet : IDisposable
+ public CDataSchemaPythonTest(PythonNetFixture pythonNet)
{
- public bool Initialized { get; }
-
- public bool VersionMismatch { get; }
-
- public PythonNet()
- {
- bool pythonSet =
Environment.GetEnvironmentVariable("PYTHONNET_PYDLL") != null;
- if (!pythonSet)
- {
- Initialized = false;
- return;
- }
-
- try
- {
- PythonEngine.Initialize();
- }
- catch (NotSupportedException e) when
(e.Message.Contains("Python ABI ") && e.Message.Contains("not supported"))
- {
- // An unsupported version of Python is being used
- Initialized = false;
- VersionMismatch = true;
- return;
- }
-
- if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows) &&
- PythonEngine.PythonPath.IndexOf("dlls",
StringComparison.OrdinalIgnoreCase) < 0)
- {
- dynamic sys = Py.Import("sys");
-
sys.path.append(Path.Combine(Path.GetDirectoryName(Environment.GetEnvironmentVariable("PYTHONNET_PYDLL")),
"DLLs"));
- }
-
- Initialized = true;
- }
-
- public void Dispose()
- {
- PythonEngine.Shutdown();
- }
- }
-
- public CDataSchemaPythonTest(PythonNet pythonNet)
- {
- if (!pythonNet.Initialized)
- {
- var errorReason = pythonNet.VersionMismatch ? "Python version
is incompatible with PythonNet" : "PYTHONNET_PYDLL not set";
-
- bool inCIJob =
Environment.GetEnvironmentVariable("GITHUB_ACTIONS") == "true";
- bool inVerificationJob =
Environment.GetEnvironmentVariable("TEST_CSHARP") == "1";
-
- // Skip these tests if this is not in CI or is a verification
job and PythonNet couldn't be initialized
- Skip.If(inVerificationJob || !inCIJob, $"{errorReason};
skipping C Data Interface tests.");
-
- // Otherwise throw
- throw new Exception($"{errorReason}; cannot run C Data
Interface tests.");
- }
+ pythonNet.EnsureInitialized();
}
private static Schema GetTestSchema()
diff --git a/test/Apache.Arrow.Tests/CDeviceDataInterfacePythonTests.cs
b/test/Apache.Arrow.Tests/CDeviceDataInterfacePythonTests.cs
new file mode 100644
index 0000000..339d494
--- /dev/null
+++ b/test/Apache.Arrow.Tests/CDeviceDataInterfacePythonTests.cs
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using Apache.Arrow.C;
+using Apache.Arrow.Types;
+using Python.Runtime;
+using Xunit;
+
+namespace Apache.Arrow.Tests
+{
+ [Collection("PythonNet")]
+ [Experimental("ArrowDeviceDataApi")]
+ public class CDeviceDataInterfacePythonTest
+ {
+ public CDeviceDataInterfacePythonTest(PythonNetFixture pythonNet)
+ {
+ pythonNet.EnsureInitialized();
+ }
+
+ private IArrowArray GetTestArray()
+ {
+ var builder = new StringArray.Builder();
+ builder.Append("hello");
+ builder.Append("world");
+ builder.AppendNull();
+ builder.Append("foo");
+ builder.Append("bar");
+ return builder.Build();
+ }
+
+ private dynamic GetPythonArray()
+ {
+ using (Py.GIL())
+ {
+ dynamic pa = Py.Import("pyarrow");
+ return pa.array(new[] { "hello", "world", null, "foo", "bar"
});
+ }
+ }
+
+ private RecordBatch GetTestRecordBatch()
+ {
+ Field[] fields = new[]
+ {
+ new Field("col1", Int64Type.Default, true),
+ new Field("col2", StringType.Default, true),
+ new Field("col3", DoubleType.Default, true),
+ };
+ return new RecordBatch(
+ new Schema(fields, null),
+ new IArrowArray[]
+ {
+ new Int64Array.Builder().AppendRange(new long[] { 1, 2, 3
}).AppendNull().Append(5).Build(),
+ GetTestArray(),
+ new DoubleArray.Builder().AppendRange(new double[] { 0.0,
1.4, 2.5, 3.6, 4.7 }).Build(),
+ },
+ 5);
+ }
+
+ private dynamic GetPythonRecordBatch()
+ {
+ using (Py.GIL())
+ {
+ dynamic pa = Py.Import("pyarrow");
+ dynamic table = pa.table(
+ new PyList(new PyObject[]
+ {
+ pa.array(new long?[] { 1, 2, 3, null, 5 }),
+ pa.array(new[] { "hello", "world", null, "foo", "bar"
}),
+ pa.array(new[] { 0.0, 1.4, 2.5, 3.6, 4.7 })
+ }),
+ new[] { "col1", "col2", "col3" });
+
+ return table.to_batches()[0];
+ }
+ }
+
+ [SkippableFact]
+ public unsafe void ExportArrayToDeviceAndImportInPython()
+ {
+ IArrowArray array = GetTestArray();
+ dynamic pyArray = GetPythonArray();
+
+ CArrowDeviceArray* cDeviceArray = CArrowDeviceArray.Create();
+ CArrowDeviceArrayExporter.ExportArray(array, cDeviceArray);
+
+ CArrowSchema* cSchema = CArrowSchema.Create();
+ CArrowSchemaExporter.ExportType(array.Data.DataType, cSchema);
+
+ long deviceArrayPtr = ((IntPtr)cDeviceArray).ToInt64();
+ long schemaPtr = ((IntPtr)cSchema).ToInt64();
+
+ using (Py.GIL())
+ {
+ dynamic pa = Py.Import("pyarrow");
+ dynamic exportedPyArray =
pa.Array._import_from_c_device(deviceArrayPtr, schemaPtr);
+ Assert.True(exportedPyArray == pyArray);
+ }
+
+ CArrowDeviceArray.Free(cDeviceArray);
+ CArrowSchema.Free(cSchema);
+ }
+
+ [SkippableFact]
+ public unsafe void ImportArrayFromPythonDevice()
+ {
+ CArrowDeviceArray* cDeviceArray = CArrowDeviceArray.Create();
+ CArrowSchema* cSchema = CArrowSchema.Create();
+
+ using (Py.GIL())
+ {
+ dynamic pa = Py.Import("pyarrow");
+ dynamic pyArray = pa.array(new[] { "hello", "world", null,
"foo", "bar" });
+
+ long deviceArrayPtr = ((IntPtr)cDeviceArray).ToInt64();
+ long schemaPtr = ((IntPtr)cSchema).ToInt64();
+ pyArray._export_to_c_device(deviceArrayPtr, schemaPtr);
+ }
+
+ // Verify device fields set by PyArrow
+ Assert.Equal(ArrowDeviceType.Cpu, cDeviceArray->device_type);
+ Assert.Equal(-1, cDeviceArray->device_id);
+ Assert.True(cDeviceArray->sync_event == null);
+
+ ArrowType type = CArrowSchemaImporter.ImportType(cSchema);
+ using (IArrowArray importedArray =
CArrowDeviceArrayImporter.ImportArray(cDeviceArray, type))
+ {
+ StringArray importedStrings = (StringArray)importedArray;
+
+ Assert.Equal(5, importedStrings.Length);
+ Assert.Equal("hello", importedStrings.GetString(0));
+ Assert.Equal("world", importedStrings.GetString(1));
+ Assert.Null(importedStrings.GetString(2));
+ Assert.Equal("foo", importedStrings.GetString(3));
+ Assert.Equal("bar", importedStrings.GetString(4));
+ }
+
+ CArrowDeviceArray.Free(cDeviceArray);
+ CArrowSchema.Free(cSchema);
+ }
+
+ [SkippableFact]
+ public unsafe void ExportRecordBatchToDeviceAndImportInPython()
+ {
+ RecordBatch batch = GetTestRecordBatch();
+ dynamic pyBatch = GetPythonRecordBatch();
+
+ CArrowDeviceArray* cDeviceArray = CArrowDeviceArray.Create();
+ CArrowDeviceArrayExporter.ExportRecordBatch(batch, cDeviceArray);
+
+ CArrowSchema* cSchema = CArrowSchema.Create();
+ CArrowSchemaExporter.ExportSchema(batch.Schema, cSchema);
+
+ long deviceArrayPtr = ((IntPtr)cDeviceArray).ToInt64();
+ long schemaPtr = ((IntPtr)cSchema).ToInt64();
+
+ using (Py.GIL())
+ {
+ dynamic pa = Py.Import("pyarrow");
+ dynamic exportedPyBatch =
pa.RecordBatch._import_from_c_device(deviceArrayPtr, schemaPtr);
+ Assert.True(exportedPyBatch == pyBatch);
+ }
+
+ CArrowDeviceArray.Free(cDeviceArray);
+ CArrowSchema.Free(cSchema);
+ }
+
+ [SkippableFact]
+ public unsafe void ImportRecordBatchFromPythonDevice()
+ {
+ CArrowDeviceArray* cDeviceArray = CArrowDeviceArray.Create();
+ CArrowSchema* cSchema = CArrowSchema.Create();
+
+ using (Py.GIL())
+ {
+ dynamic pa = Py.Import("pyarrow");
+ dynamic table = pa.table(
+ new PyList(new PyObject[]
+ {
+ pa.array(new long?[] { 1, 2, 3, null, 5 }),
+ pa.array(new[] { "hello", "world", null, "foo", "bar"
}),
+ pa.array(new[] { 0.0, 1.4, 2.5, 3.6, 4.7 })
+ }),
+ new[] { "col1", "col2", "col3" });
+
+ dynamic pyBatch = table.to_batches()[0];
+
+ long deviceArrayPtr = ((IntPtr)cDeviceArray).ToInt64();
+ long schemaPtr = ((IntPtr)cSchema).ToInt64();
+ pyBatch._export_to_c_device(deviceArrayPtr, schemaPtr);
+ }
+
+ Assert.Equal(ArrowDeviceType.Cpu, cDeviceArray->device_type);
+ Assert.Equal(-1, cDeviceArray->device_id);
+
+ Schema schema = CArrowSchemaImporter.ImportSchema(cSchema);
+ RecordBatch imported =
CArrowDeviceArrayImporter.ImportRecordBatch(cDeviceArray, schema);
+
+ Assert.Equal(5, imported.Length);
+
+ Int64Array col1 = (Int64Array)imported.Column("col1");
+ Assert.Equal(1, col1.GetValue(0));
+ Assert.Equal(2, col1.GetValue(1));
+ Assert.Equal(3, col1.GetValue(2));
+ Assert.Null(col1.GetValue(3));
+ Assert.Equal(5, col1.GetValue(4));
+
+ StringArray col2 = (StringArray)imported.Column("col2");
+ Assert.Equal("hello", col2.GetString(0));
+ Assert.Equal("world", col2.GetString(1));
+ Assert.Null(col2.GetString(2));
+ Assert.Equal("foo", col2.GetString(3));
+ Assert.Equal("bar", col2.GetString(4));
+
+ DoubleArray col3 = (DoubleArray)imported.Column("col3");
+ Assert.Equal(new double[] { 0.0, 1.4, 2.5, 3.6, 4.7 },
col3.Values.ToArray());
+
+ imported.Dispose();
+ CArrowDeviceArray.Free(cDeviceArray);
+ CArrowSchema.Free(cSchema);
+ }
+
+ [SkippableFact]
+ public unsafe void RoundTripTestBatchViaDevice()
+ {
+ // C# -> Python (via device) -> C# round trip
+ HashSet<ArrowTypeId> unsupported = new HashSet<ArrowTypeId> {
ArrowTypeId.ListView, ArrowTypeId.BinaryView, ArrowTypeId.StringView,
ArrowTypeId.Decimal32, ArrowTypeId.Decimal64 };
+ RecordBatch batch1 = TestData.CreateSampleRecordBatch(4,
excludedTypes: unsupported);
+ RecordBatch batch2 = batch1.Clone();
+
+ CArrowDeviceArray* cExportDeviceArray = CArrowDeviceArray.Create();
+ CArrowDeviceArrayExporter.ExportRecordBatch(batch1,
cExportDeviceArray);
+
+ CArrowSchema* cExportSchema = CArrowSchema.Create();
+ CArrowSchemaExporter.ExportSchema(batch1.Schema, cExportSchema);
+
+ CArrowDeviceArray* cImportDeviceArray = CArrowDeviceArray.Create();
+ CArrowSchema* cImportSchema = CArrowSchema.Create();
+
+ long exportDeviceArrayPtr = ((IntPtr)cExportDeviceArray).ToInt64();
+ long exportSchemaPtr = ((IntPtr)cExportSchema).ToInt64();
+ long importDeviceArrayPtr = ((IntPtr)cImportDeviceArray).ToInt64();
+ long importSchemaPtr = ((IntPtr)cImportSchema).ToInt64();
+
+ using (Py.GIL())
+ {
+ dynamic pa = Py.Import("pyarrow");
+ dynamic exportedPyBatch =
pa.RecordBatch._import_from_c_device(exportDeviceArrayPtr, exportSchemaPtr);
+ // Re-export back via device interface
+ exportedPyBatch._export_to_c_device(importDeviceArrayPtr,
importSchemaPtr);
+ }
+
+ Assert.Equal(ArrowDeviceType.Cpu, cImportDeviceArray->device_type);
+
+ Schema schema = CArrowSchemaImporter.ImportSchema(cImportSchema);
+ RecordBatch importedBatch =
CArrowDeviceArrayImporter.ImportRecordBatch(cImportDeviceArray, schema);
+
+ ArrowReaderVerifier.CompareBatches(batch2, importedBatch,
strictCompare: false);
+
+ CArrowDeviceArray.Free(cExportDeviceArray);
+ CArrowSchema.Free(cExportSchema);
+ CArrowDeviceArray.Free(cImportDeviceArray);
+ CArrowSchema.Free(cImportSchema);
+ }
+ }
+}
diff --git a/test/Apache.Arrow.Tests/CDeviceDataInterfaceTests.cs
b/test/Apache.Arrow.Tests/CDeviceDataInterfaceTests.cs
new file mode 100644
index 0000000..7d416b7
--- /dev/null
+++ b/test/Apache.Arrow.Tests/CDeviceDataInterfaceTests.cs
@@ -0,0 +1,351 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Runtime.InteropServices;
+using System.Threading.Tasks;
+using Apache.Arrow.C;
+using Apache.Arrow.Ipc;
+using Apache.Arrow.Types;
+using Xunit;
+
+namespace Apache.Arrow.Tests
+{
+ [Experimental("ArrowDeviceDataApi")]
+ public class CDeviceDataInterfaceTests
+ {
+ private IArrowArray GetTestArray()
+ {
+ var builder = new StringArray.Builder();
+ builder.Append("hello");
+ builder.Append("world");
+ builder.AppendNull();
+ builder.Append("foo");
+ builder.Append("bar");
+ return builder.Build();
+ }
+
+ private RecordBatch GetTestRecordBatch()
+ {
+ return new RecordBatch.Builder()
+ .Append("strings", false, col => col.String(arr =>
+ {
+ arr.Append("hello");
+ arr.Append("world");
+ arr.AppendNull();
+ }))
+ .Append("ints", false, col => col.Int32(arr =>
+ {
+ arr.Append(1);
+ arr.Append(2);
+ arr.Append(3);
+ }))
+ .Build();
+ }
+
+ [Fact]
+ public unsafe void InitializeDeviceArrayZeroed()
+ {
+ CArrowDeviceArray* ptr = CArrowDeviceArray.Create();
+
+ Assert.Equal(0, ptr->array.length);
+ Assert.Equal(0, ptr->array.null_count);
+ Assert.Equal(0, ptr->array.offset);
+ Assert.Equal(0, ptr->array.n_buffers);
+ Assert.Equal(0, ptr->array.n_children);
+ Assert.True(ptr->array.buffers == null);
+ Assert.True(ptr->array.children == null);
+ Assert.True(ptr->array.dictionary == null);
+ Assert.True(ptr->array.release == default);
+ Assert.True(ptr->array.private_data == null);
+ Assert.Equal(0, ptr->device_id);
+ Assert.Equal((ArrowDeviceType)0, ptr->device_type);
+ Assert.True(ptr->sync_event == null);
+
+ CArrowDeviceArray.Free(ptr);
+ }
+
+ [Fact]
+ public unsafe void InitializeDeviceArrayStreamZeroed()
+ {
+ CArrowDeviceArrayStream* ptr = CArrowDeviceArrayStream.Create();
+
+ Assert.Equal((ArrowDeviceType)0, ptr->device_type);
+ Assert.True(ptr->get_schema == default);
+ Assert.True(ptr->get_next == default);
+ Assert.True(ptr->get_last_error == default);
+ Assert.True(ptr->release == default);
+ Assert.True(ptr->private_data == null);
+
+ CArrowDeviceArrayStream.Free(ptr);
+ }
+
+ [Fact]
+ public unsafe void ExportArraySetsDeviceFields()
+ {
+ IArrowArray array = GetTestArray();
+ CArrowDeviceArray* ptr = CArrowDeviceArray.Create();
+
+ CArrowDeviceArrayExporter.ExportArray(array, ptr);
+
+ Assert.Equal(ArrowDeviceType.Cpu, ptr->device_type);
+ Assert.Equal(-1, ptr->device_id);
+ Assert.True(ptr->sync_event == null);
+ Assert.False(ptr->array.release == default);
+
+ CArrowDeviceArray.Free(ptr);
+ }
+
+ [Fact]
+ public unsafe void ExportRecordBatchSetsDeviceFields()
+ {
+ RecordBatch batch = GetTestRecordBatch();
+ CArrowDeviceArray* ptr = CArrowDeviceArray.Create();
+
+ CArrowDeviceArrayExporter.ExportRecordBatch(batch, ptr);
+
+ Assert.Equal(ArrowDeviceType.Cpu, ptr->device_type);
+ Assert.Equal(-1, ptr->device_id);
+ Assert.True(ptr->sync_event == null);
+ Assert.False(ptr->array.release == default);
+
+ CArrowDeviceArray.Free(ptr);
+ }
+
+ [Fact]
+ public unsafe void ExportImportArrayRoundTrip()
+ {
+ IArrowArray array = GetTestArray();
+ IArrowType dataType = array.Data.DataType;
+ CArrowDeviceArray* ptr = CArrowDeviceArray.Create();
+
+ CArrowDeviceArrayExporter.ExportArray(array, ptr);
+ using IArrowArray imported =
CArrowDeviceArrayImporter.ImportArray(ptr, dataType);
+
+ StringArray importedStrings = (StringArray)imported;
+
+ Assert.Equal(5, importedStrings.Length);
+ Assert.Equal("hello", importedStrings.GetString(0));
+ Assert.Equal("world", importedStrings.GetString(1));
+ Assert.True(importedStrings.IsNull(2));
+ Assert.Equal("foo", importedStrings.GetString(3));
+ Assert.Equal("bar", importedStrings.GetString(4));
+
+ CArrowDeviceArray.Free(ptr);
+ }
+
+ [Fact]
+ public unsafe void ExportImportRecordBatchRoundTrip()
+ {
+ RecordBatch batch = GetTestRecordBatch();
+ Schema schema = batch.Schema;
+ CArrowDeviceArray* ptr = CArrowDeviceArray.Create();
+ CArrowSchema* schemaPtr = CArrowSchema.Create();
+
+ CArrowSchemaExporter.ExportSchema(schema, schemaPtr);
+ CArrowDeviceArrayExporter.ExportRecordBatch(batch, ptr);
+
+ Schema importedSchema =
CArrowSchemaImporter.ImportSchema(schemaPtr);
+ RecordBatch imported =
CArrowDeviceArrayImporter.ImportRecordBatch(ptr, importedSchema);
+
+ Assert.Equal(3, imported.Length);
+ Assert.Equal(2, imported.ColumnCount);
+
+ StringArray importedStrings = (StringArray)imported.Column(0);
+ Assert.Equal("hello", importedStrings.GetString(0));
+ Assert.Equal("world", importedStrings.GetString(1));
+ Assert.True(importedStrings.IsNull(2));
+
+ Int32Array importedInts = (Int32Array)imported.Column(1);
+ Assert.Equal(1, importedInts.GetValue(0));
+ Assert.Equal(2, importedInts.GetValue(1));
+ Assert.Equal(3, importedInts.GetValue(2));
+
+ imported.Dispose();
+ CArrowDeviceArray.Free(ptr);
+ CArrowSchema.Free(schemaPtr);
+ }
+
+ [Fact]
+ public unsafe void ImportNonCpuDeviceArrayThrows()
+ {
+ IArrowArray array = GetTestArray();
+ CArrowDeviceArray* ptr = CArrowDeviceArray.Create();
+
+ CArrowDeviceArrayExporter.ExportArray(array, ptr);
+
+ // Override the device type to simulate a non-CPU array
+ ptr->device_type = ArrowDeviceType.Cuda;
+
+ Assert.Throws<NotSupportedException>(() =>
+ {
+ CArrowDeviceArrayImporter.ImportArray(ptr,
array.Data.DataType);
+ });
+
+ CArrowDeviceArray.Free(ptr);
+ }
+
+ [Fact]
+ public unsafe void ImportNonCpuDeviceRecordBatchThrows()
+ {
+ RecordBatch batch = GetTestRecordBatch();
+ CArrowDeviceArray* ptr = CArrowDeviceArray.Create();
+
+ CArrowDeviceArrayExporter.ExportRecordBatch(batch, ptr);
+
+ // Override the device type to simulate a non-CPU batch
+ ptr->device_type = ArrowDeviceType.Cuda;
+
+ Assert.Throws<NotSupportedException>(() =>
+ {
+ CArrowDeviceArrayImporter.ImportRecordBatch(ptr, batch.Schema);
+ });
+
+ CArrowDeviceArray.Free(ptr);
+ }
+
+ [Fact]
+ public unsafe void CallsReleaseForValidDeviceArray()
+ {
+ IArrowArray array = GetTestArray();
+ CArrowDeviceArray* ptr = CArrowDeviceArray.Create();
+ CArrowDeviceArrayExporter.ExportArray(array, ptr);
+ Assert.False(ptr->array.release == default);
+ CArrowDeviceArrayImporter.ImportArray(ptr,
array.Data.DataType).Dispose();
+ Assert.True(ptr->array.release == default);
+ CArrowDeviceArray.Free(ptr);
+ }
+
+ [Fact]
+ public unsafe void ExportStreamSetsDeviceType()
+ {
+ RecordBatch batch = GetTestRecordBatch();
+ IArrowArrayStream arrayStream = new TestArrayStream(batch.Schema,
batch);
+ CArrowDeviceArrayStream* ptr = CArrowDeviceArrayStream.Create();
+
+ CArrowDeviceArrayStreamExporter.ExportArrayStream(arrayStream,
ptr);
+
+ Assert.Equal(ArrowDeviceType.Cpu, ptr->device_type);
+ Assert.False(ptr->release == default);
+
+ CArrowDeviceArrayStream.Free(ptr);
+ }
+
+ [Fact]
+ public async Task ExportImportDeviceStreamRoundTrip()
+ {
+ RecordBatch batch1 = GetTestRecordBatch();
+ RecordBatch batch2 = GetTestRecordBatch();
+ IArrowArrayStream arrayStream = new TestArrayStream(batch1.Schema,
batch1, batch2);
+
+ IArrowArrayStream imported;
+ unsafe
+ {
+ CArrowDeviceArrayStream* ptr =
CArrowDeviceArrayStream.Create();
+ CArrowDeviceArrayStreamExporter.ExportArrayStream(arrayStream,
ptr);
+
+ imported =
CArrowDeviceArrayStreamImporter.ImportDeviceArrayStream(ptr);
+
+ // Free the unmanaged allocation (stream ownership transferred
to imported)
+ Marshal.FreeHGlobal((IntPtr)ptr);
+ }
+
+ using (imported)
+ {
+ Assert.Equal(batch1.Schema.FieldsList.Count,
imported.Schema.FieldsList.Count);
+
+ RecordBatch importedBatch1 = await
imported.ReadNextRecordBatchAsync();
+ Assert.NotNull(importedBatch1);
+ Assert.Equal(batch1.Length, importedBatch1.Length);
+ Assert.Equal(batch1.ColumnCount, importedBatch1.ColumnCount);
+
+ RecordBatch importedBatch2 = await
imported.ReadNextRecordBatchAsync();
+ Assert.NotNull(importedBatch2);
+ Assert.Equal(batch2.Length, importedBatch2.Length);
+
+ RecordBatch importedBatch3 = await
imported.ReadNextRecordBatchAsync();
+ Assert.Null(importedBatch3);
+
+ importedBatch1.Dispose();
+ importedBatch2.Dispose();
+ }
+ }
+
+ [Fact]
+ public unsafe void ImportNonCpuDeviceStreamThrows()
+ {
+ CArrowDeviceArrayStream* ptr = CArrowDeviceArrayStream.Create();
+
+ // Simulate a non-CPU stream
+ ptr->device_type = ArrowDeviceType.Cuda;
+ // Set a dummy release to make it look valid
+#if NET5_0_OR_GREATER
+ ptr->release = &DummyRelease;
+#else
+ ptr->release = Marshal.GetFunctionPointerForDelegate(new
DummyReleaseDelegate(DummyReleaseManaged));
+#endif
+
+ Assert.Throws<NotSupportedException>(() =>
+ {
+ CArrowDeviceArrayStreamImporter.ImportDeviceArrayStream(ptr);
+ });
+
+ Marshal.FreeHGlobal((IntPtr)ptr);
+ }
+
+#if NET5_0_OR_GREATER
+ [UnmanagedCallersOnly]
+ private unsafe static void DummyRelease(CArrowDeviceArrayStream*
stream)
+ {
+ stream->release = default;
+ }
+#else
+ private unsafe delegate void
DummyReleaseDelegate(CArrowDeviceArrayStream* stream);
+ private unsafe static void
DummyReleaseManaged(CArrowDeviceArrayStream* stream)
+ {
+ stream->release = default;
+ }
+#endif
+
+ /// <summary>
+ /// Simple IArrowArrayStream implementation for testing.
+ /// </summary>
+ private class TestArrayStream : IArrowArrayStream
+ {
+ private readonly Queue<RecordBatch> _batches;
+
+ public TestArrayStream(Schema schema, params RecordBatch[] batches)
+ {
+ Schema = schema;
+ _batches = new Queue<RecordBatch>(batches);
+ }
+
+ public Schema Schema { get; }
+
+ public ValueTask<RecordBatch>
ReadNextRecordBatchAsync(System.Threading.CancellationToken cancellationToken =
default)
+ {
+ if (_batches.Count > 0)
+ {
+ return new ValueTask<RecordBatch>(_batches.Dequeue());
+ }
+ return new ValueTask<RecordBatch>((RecordBatch)null);
+ }
+
+ public void Dispose() { }
+ }
+ }
+}
diff --git a/test/Apache.Arrow.Tests/PythonNetCollection.cs
b/test/Apache.Arrow.Tests/PythonNetCollection.cs
new file mode 100644
index 0000000..2dccf4b
--- /dev/null
+++ b/test/Apache.Arrow.Tests/PythonNetCollection.cs
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Xunit;
+
+namespace Apache.Arrow.Tests
+{
+ /// <summary>
+ /// xUnit collection definition that shares a single <see
cref="PythonNetFixture"/>
+ /// across all test classes annotated with
<c>[Collection("PythonNet")]</c>.
+ /// </summary>
+ [CollectionDefinition("PythonNet")]
+ public class PythonNetCollection : ICollectionFixture<PythonNetFixture>
+ {
+ }
+}
diff --git a/test/Apache.Arrow.Tests/PythonNetFixture.cs
b/test/Apache.Arrow.Tests/PythonNetFixture.cs
new file mode 100644
index 0000000..1d28bbc
--- /dev/null
+++ b/test/Apache.Arrow.Tests/PythonNetFixture.cs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using System.Runtime.InteropServices;
+using Python.Runtime;
+using Xunit;
+
+namespace Apache.Arrow.Tests
+{
+ /// <summary>
+ /// Shared fixture for Python.NET initialization. Python.NET can only be
initialized
+ /// once per process, so this fixture is shared across all test classes via
+ /// <see cref="PythonNetCollection"/>.
+ /// </summary>
+ public class PythonNetFixture : IDisposable
+ {
+ public bool Initialized { get; }
+
+ public bool VersionMismatch { get; }
+
+ public PythonNetFixture()
+ {
+ bool pythonSet =
Environment.GetEnvironmentVariable("PYTHONNET_PYDLL") != null;
+ if (!pythonSet)
+ {
+ Initialized = false;
+ return;
+ }
+
+ try
+ {
+ PythonEngine.Initialize();
+ }
+ catch (NotSupportedException e) when (e.Message.Contains("Python
ABI ") && e.Message.Contains("not supported"))
+ {
+ // An unsupported version of Python is being used
+ Initialized = false;
+ VersionMismatch = true;
+ return;
+ }
+
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows) &&
+ PythonEngine.PythonPath.IndexOf("dlls",
StringComparison.OrdinalIgnoreCase) < 0)
+ {
+ using (Py.GIL())
+ {
+ dynamic sys = Py.Import("sys");
+
sys.path.append(Path.Combine(Path.GetDirectoryName(Environment.GetEnvironmentVariable("PYTHONNET_PYDLL")),
"DLLs"));
+ }
+ }
+
+ Initialized = true;
+ }
+
+ /// <summary>
+ /// Ensures tests are skipped or throw appropriately when Python is
not available.
+ /// Call this from each test class constructor.
+ /// </summary>
+ public void EnsureInitialized()
+ {
+ if (!Initialized)
+ {
+ var errorReason = VersionMismatch ? "Python version is
incompatible with PythonNet" : "PYTHONNET_PYDLL not set";
+
+ bool inCIJob =
Environment.GetEnvironmentVariable("GITHUB_ACTIONS") == "true";
+ bool inVerificationJob =
Environment.GetEnvironmentVariable("TEST_CSHARP") == "1";
+
+ // Skip these tests if this is not in CI or is a verification
job and PythonNet couldn't be initialized
+ Skip.If(inVerificationJob || !inCIJob, $"{errorReason};
skipping Python interop tests.");
+
+ // Otherwise throw
+ throw new Exception($"{errorReason}; cannot run Python interop
tests.");
+ }
+ }
+
+ public void Dispose()
+ {
+ if (Initialized)
+ {
+ PythonEngine.Shutdown();
+ }
+ }
+ }
+}