pitrou commented on code in PR #36489:
URL: https://github.com/apache/arrow/pull/36489#discussion_r1273657296


##########
cpp/src/arrow/c/bridge.cc:
##########
@@ -663,6 +671,118 @@ Status ExportRecordBatch(const RecordBatch& batch, struct 
ArrowArray* out,
   return Status::OK();
 }
 
+//////////////////////////////////////////////////////////////////////////
+// C device arrays
+
+Status ValidateDeviceInfo(const ArrayData& data,
+                          std::optional<DeviceAllocationType>* device_type,
+                          int64_t* device_id) {
+  for (const auto& buf : data.buffers) {
+    if (!buf) {
+      continue;
+    }
+
+    if (*device_type == std::nullopt) {
+      *device_type = buf->device_type();
+      *device_id = buf->device()->device_id();
+      continue;
+    }
+
+    if (buf->device_type() != *device_type) {
+      return Status::Invalid(
+          "Exporting device array with buffers on more than one device.");
+    }
+
+    if (buf->device()->device_id() != *device_id) {
+      return Status::Invalid(
+          "Exporting device array with buffers on multiple device ids.");
+    }
+  }
+
+  for (const auto& child : data.child_data) {
+    RETURN_NOT_OK(ValidateDeviceInfo(*child, device_type, device_id));
+  }
+

Review Comment:
   Should also validate `data.dictionary`, if any.



##########
cpp/src/arrow/buffer.h:
##########
@@ -294,6 +310,7 @@ class ARROW_EXPORT Buffer {
   const uint8_t* data_;
   int64_t size_;
   int64_t capacity_;
+  std::optional<DeviceAllocationType> device_type_;

Review Comment:
   Why is this optional, are there any cases where the information is not 
available?



##########
cpp/src/arrow/c/bridge.cc:
##########
@@ -663,6 +671,118 @@ Status ExportRecordBatch(const RecordBatch& batch, struct 
ArrowArray* out,
   return Status::OK();
 }
 
+//////////////////////////////////////////////////////////////////////////
+// C device arrays
+
+Status ValidateDeviceInfo(const ArrayData& data,

Review Comment:
   Can you put helper functions inside the anonymous namespace?



##########
cpp/src/arrow/c/bridge.cc:
##########
@@ -663,6 +671,118 @@ Status ExportRecordBatch(const RecordBatch& batch, struct 
ArrowArray* out,
   return Status::OK();
 }
 
+//////////////////////////////////////////////////////////////////////////
+// C device arrays
+
+Status ValidateDeviceInfo(const ArrayData& data,
+                          std::optional<DeviceAllocationType>* device_type,
+                          int64_t* device_id) {
+  for (const auto& buf : data.buffers) {
+    if (!buf) {
+      continue;
+    }
+
+    if (*device_type == std::nullopt) {
+      *device_type = buf->device_type();
+      *device_id = buf->device()->device_id();
+      continue;
+    }
+
+    if (buf->device_type() != *device_type) {
+      return Status::Invalid(
+          "Exporting device array with buffers on more than one device.");
+    }
+
+    if (buf->device()->device_id() != *device_id) {
+      return Status::Invalid(
+          "Exporting device array with buffers on multiple device ids.");
+    }
+  }
+
+  for (const auto& child : data.child_data) {
+    RETURN_NOT_OK(ValidateDeviceInfo(*child, device_type, device_id));
+  }
+
+  return Status::OK();
+}
+
+Result<std::pair<std::optional<DeviceAllocationType>, int64_t>> 
ValidateDeviceInfo(
+    const ArrayData& data) {
+  std::optional<DeviceAllocationType> device_type;
+  int64_t device_id = -1;
+  RETURN_NOT_OK(ValidateDeviceInfo(data, &device_type, &device_id));
+  return std::make_pair(device_type, device_id);
+}
+
+Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
+                         struct ArrowDeviceArray* out, struct ArrowSchema* 
out_schema) {
+  if (sync_event.sync_event != nullptr && sync_event.release_func) {

Review Comment:
   Hmm, did you test this? It seems you're missing a logical negation here, 
i.e. `!sync_event.release_func`.



##########
cpp/src/arrow/gpu/cuda_memory.h:
##########
@@ -258,5 +260,9 @@ Result<uintptr_t> GetDeviceAddress(const uint8_t* cpu_data,
 ARROW_EXPORT
 Result<uint8_t*> GetHostAddress(uintptr_t device_ptr);
 
+ARROW_EXPORT
+Result<std::shared_ptr<MemoryManager>> DefaultMemoryMapper(ArrowDeviceType 
device_type,

Review Comment:
   It seems weird to have this in `arrow/gpu`, why not under `arrow/c` instead?



##########
cpp/src/arrow/c/bridge_test.cc:
##########
@@ -1112,6 +1130,392 @@ TEST_F(TestArrayExport, ExportRecordBatch) {
   }
 }
 
+////////////////////////////////////////////////////////////////////////////
+// Device Array Export Tests
+
+static const char kMyDeviceTypeName[] = "arrowtest::MyDevice";
+static const ArrowDeviceType kMyDeviceType = ARROW_DEVICE_EXT_DEV;
+
+class MyBuffer final : public MutableBuffer {
+ public:
+  using MutableBuffer::MutableBuffer;
+
+  ~MyBuffer() { default_memory_pool()->Free(const_cast<uint8_t*>(data_), 
size_); }
+};
+
+class MyMemoryManager : public CPUMemoryManager {
+ public:
+  explicit MyMemoryManager(const std::shared_ptr<Device>& device)
+      : CPUMemoryManager(device, default_memory_pool()) {}
+
+  Result<std::unique_ptr<Buffer>> AllocateBuffer(int64_t size) override {
+    uint8_t* data;
+    RETURN_NOT_OK(pool_->Allocate(size, &data));
+    return std::make_unique<MyBuffer>(data, size, shared_from_this());
+  }
+
+ protected:
+  Result<std::shared_ptr<Buffer>> CopyBufferFrom(
+      const std::shared_ptr<Buffer>& buf,
+      const std::shared_ptr<MemoryManager>& from) override {
+    return CopyNonOwnedFrom(*buf, from);
+  }
+  Result<std::unique_ptr<Buffer>> CopyNonOwnedFrom(
+      const Buffer& buf, const std::shared_ptr<MemoryManager>& from) override {
+    if (!from->is_cpu()) {
+      return nullptr;
+    }
+
+    ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(buf.size()));
+    if (buf.size() > 0) {
+      memcpy(dest->mutable_data(), buf.data(), 
static_cast<size_t>(buf.size()));
+    }
+    return std::move(dest);
+  }
+};
+
+class MyDevice : public Device {
+ public:
+  explicit MyDevice(int value) : Device(true), value_(value) {}

Review Comment:
   Would be nice to be more explicit: `Device(/*xxx=*/ true)`



##########
cpp/src/arrow/c/bridge.cc:
##########
@@ -1275,7 +1401,20 @@ class ImportedBuffer : public Buffer {
 
 struct ArrayImporter {
   explicit ArrayImporter(const std::shared_ptr<DataType>& type)
-      : type_(type), zero_size_buffer_(std::make_shared<Buffer>(kZeroSizeArea, 
0)) {}
+      : type_(type),
+        zero_size_buffer_(std::make_shared<Buffer>(kZeroSizeArea, 0)),
+        device_type_(DeviceAllocationType::kCPU) {}
+
+  Status Import(struct ArrowDeviceArray* src, const DeviceMemoryMapper& 
mapper) {
+    ARROW_ASSIGN_OR_RAISE(memory_mgr_, mapper(src->device_type, 
src->device_id));
+    device_type_ = static_cast<DeviceAllocationType>(src->device_type);
+    RETURN_NOT_OK(Import(&src->array));
+    import_->sync_event_ = src->sync_event;

Review Comment:
   Do we plan to do something with this member? For now it seems entirely 
unused.



##########
cpp/src/arrow/c/bridge.cc:
##########
@@ -663,6 +671,118 @@ Status ExportRecordBatch(const RecordBatch& batch, struct 
ArrowArray* out,
   return Status::OK();
 }
 
+//////////////////////////////////////////////////////////////////////////
+// C device arrays
+
+Status ValidateDeviceInfo(const ArrayData& data,
+                          std::optional<DeviceAllocationType>* device_type,
+                          int64_t* device_id) {
+  for (const auto& buf : data.buffers) {
+    if (!buf) {
+      continue;
+    }
+
+    if (*device_type == std::nullopt) {
+      *device_type = buf->device_type();
+      *device_id = buf->device()->device_id();
+      continue;
+    }
+
+    if (buf->device_type() != *device_type) {
+      return Status::Invalid(
+          "Exporting device array with buffers on more than one device.");

Review Comment:
   "with more than one device allocation type" perhaps?



##########
cpp/src/arrow/c/bridge_test.cc:
##########
@@ -1112,6 +1130,392 @@ TEST_F(TestArrayExport, ExportRecordBatch) {
   }
 }
 
+////////////////////////////////////////////////////////////////////////////
+// Device Array Export Tests
+
+static const char kMyDeviceTypeName[] = "arrowtest::MyDevice";
+static const ArrowDeviceType kMyDeviceType = ARROW_DEVICE_EXT_DEV;
+
+class MyBuffer final : public MutableBuffer {
+ public:
+  using MutableBuffer::MutableBuffer;
+
+  ~MyBuffer() { default_memory_pool()->Free(const_cast<uint8_t*>(data_), 
size_); }
+};
+
+class MyMemoryManager : public CPUMemoryManager {
+ public:
+  explicit MyMemoryManager(const std::shared_ptr<Device>& device)
+      : CPUMemoryManager(device, default_memory_pool()) {}
+
+  Result<std::unique_ptr<Buffer>> AllocateBuffer(int64_t size) override {
+    uint8_t* data;
+    RETURN_NOT_OK(pool_->Allocate(size, &data));
+    return std::make_unique<MyBuffer>(data, size, shared_from_this());
+  }
+
+ protected:
+  Result<std::shared_ptr<Buffer>> CopyBufferFrom(
+      const std::shared_ptr<Buffer>& buf,
+      const std::shared_ptr<MemoryManager>& from) override {
+    return CopyNonOwnedFrom(*buf, from);
+  }
+  Result<std::unique_ptr<Buffer>> CopyNonOwnedFrom(
+      const Buffer& buf, const std::shared_ptr<MemoryManager>& from) override {
+    if (!from->is_cpu()) {
+      return nullptr;
+    }
+
+    ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(buf.size()));
+    if (buf.size() > 0) {
+      memcpy(dest->mutable_data(), buf.data(), 
static_cast<size_t>(buf.size()));
+    }
+    return std::move(dest);
+  }
+};
+
+class MyDevice : public Device {
+ public:
+  explicit MyDevice(int value) : Device(true), value_(value) {}
+  const char* type_name() const override { return kMyDeviceTypeName; }
+  std::string ToString() const override { return kMyDeviceTypeName; }
+  bool Equals(const Device& other) const override {
+    if (other.type_name() != kMyDeviceTypeName || other.device_type() != 
device_type()) {
+      return false;
+    }
+    return checked_cast<const MyDevice&>(other).value_ == value_;
+  }
+  DeviceAllocationType device_type() const override {
+    return static_cast<DeviceAllocationType>(kMyDeviceType);
+  }
+  int64_t device_id() const override { return value_; }
+  std::shared_ptr<MemoryManager> default_memory_manager() override {
+    return std::make_shared<MyMemoryManager>(shared_from_this());
+  }
+
+ protected:
+  int value_;
+};
+
+class TestDeviceArrayExport : public ::testing::Test {
+ public:
+  void SetUp() override { pool_ = default_memory_pool(); }
+
+  static Result<std::shared_ptr<ArrayData>> ToDeviceData(
+      const std::shared_ptr<MemoryManager>& mm, const ArrayData& data) {
+    arrow::BufferVector buffers;
+    for (const auto& buf : data.buffers) {
+      if (buf) {
+        ARROW_ASSIGN_OR_RAISE(auto dest, mm->CopyBuffer(buf, mm));
+        buffers.push_back(dest);
+      } else {
+        buffers.push_back(nullptr);
+      }
+    }
+
+    arrow::ArrayDataVector children;
+    for (const auto& child : data.child_data) {
+      ARROW_ASSIGN_OR_RAISE(auto dest, ToDeviceData(mm, *child));
+      children.push_back(dest);
+    }

Review Comment:
   Should take care of the dictionary array as well.



##########
cpp/src/arrow/device.h:
##########
@@ -71,6 +95,9 @@ class ARROW_EXPORT Device : public 
std::enable_shared_from_this<Device>,
   /// MemoryManager instances with non-default parameters.
   virtual std::shared_ptr<MemoryManager> default_memory_manager() = 0;
 
+  /// \brief Return the DeviceAllocationType of this device
+  virtual DeviceAllocationType device_type() const = 0;

Review Comment:
   So this probably should be a property of `MemoryManager` instead, I think, 
since there is not a one-to-one mapping between device types and device 
allocation types.



##########
cpp/src/arrow/c/bridge.cc:
##########
@@ -663,6 +671,118 @@ Status ExportRecordBatch(const RecordBatch& batch, struct 
ArrowArray* out,
   return Status::OK();
 }
 
+//////////////////////////////////////////////////////////////////////////
+// C device arrays
+
+Status ValidateDeviceInfo(const ArrayData& data,
+                          std::optional<DeviceAllocationType>* device_type,
+                          int64_t* device_id) {
+  for (const auto& buf : data.buffers) {
+    if (!buf) {
+      continue;
+    }
+
+    if (*device_type == std::nullopt) {
+      *device_type = buf->device_type();
+      *device_id = buf->device()->device_id();
+      continue;
+    }
+
+    if (buf->device_type() != *device_type) {
+      return Status::Invalid(
+          "Exporting device array with buffers on more than one device.");
+    }
+
+    if (buf->device()->device_id() != *device_id) {
+      return Status::Invalid(
+          "Exporting device array with buffers on multiple device ids.");
+    }
+  }
+
+  for (const auto& child : data.child_data) {
+    RETURN_NOT_OK(ValidateDeviceInfo(*child, device_type, device_id));
+  }
+
+  return Status::OK();
+}
+
+Result<std::pair<std::optional<DeviceAllocationType>, int64_t>> 
ValidateDeviceInfo(
+    const ArrayData& data) {
+  std::optional<DeviceAllocationType> device_type;
+  int64_t device_id = -1;
+  RETURN_NOT_OK(ValidateDeviceInfo(data, &device_type, &device_id));
+  return std::make_pair(device_type, device_id);
+}
+
+Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
+                         struct ArrowDeviceArray* out, struct ArrowSchema* 
out_schema) {
+  if (sync_event.sync_event != nullptr && sync_event.release_func) {
+    return Status::Invalid(
+        "Must provide a release event function if providing a non-null event");
+  }
+
+  SchemaExportGuard guard(out_schema);
+  if (out_schema != nullptr) {
+    RETURN_NOT_OK(ExportType(*array.type(), out_schema));
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*array.data()));
+  if (!device_info.first) {
+    out->device_type = ARROW_DEVICE_CPU;
+  } else {
+    out->device_type = static_cast<ArrowDeviceType>(*device_info.first);
+  }
+  out->device_id = device_info.second;
+
+  ArrayExporter exporter;
+  RETURN_NOT_OK(exporter.Export(array.data()));
+  exporter.Finish(&out->array);
+
+  auto* pdata = 
reinterpret_cast<ExportedArrayPrivateData*>(out->array.private_data);
+  pdata->sync_event_ = sync_event;
+  out->sync_event = sync_event.sync_event;

Review Comment:
   Nit: add `std::move`?



##########
cpp/src/arrow/c/bridge.cc:
##########
@@ -663,6 +671,118 @@ Status ExportRecordBatch(const RecordBatch& batch, struct 
ArrowArray* out,
   return Status::OK();
 }
 
+//////////////////////////////////////////////////////////////////////////
+// C device arrays
+
+Status ValidateDeviceInfo(const ArrayData& data,
+                          std::optional<DeviceAllocationType>* device_type,
+                          int64_t* device_id) {
+  for (const auto& buf : data.buffers) {
+    if (!buf) {
+      continue;
+    }
+
+    if (*device_type == std::nullopt) {
+      *device_type = buf->device_type();
+      *device_id = buf->device()->device_id();
+      continue;
+    }
+
+    if (buf->device_type() != *device_type) {
+      return Status::Invalid(
+          "Exporting device array with buffers on more than one device.");
+    }
+
+    if (buf->device()->device_id() != *device_id) {
+      return Status::Invalid(
+          "Exporting device array with buffers on multiple device ids.");
+    }
+  }
+
+  for (const auto& child : data.child_data) {
+    RETURN_NOT_OK(ValidateDeviceInfo(*child, device_type, device_id));
+  }
+
+  return Status::OK();
+}
+
+Result<std::pair<std::optional<DeviceAllocationType>, int64_t>> 
ValidateDeviceInfo(
+    const ArrayData& data) {
+  std::optional<DeviceAllocationType> device_type;
+  int64_t device_id = -1;
+  RETURN_NOT_OK(ValidateDeviceInfo(data, &device_type, &device_id));
+  return std::make_pair(device_type, device_id);
+}
+
+Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
+                         struct ArrowDeviceArray* out, struct ArrowSchema* 
out_schema) {
+  if (sync_event.sync_event != nullptr && sync_event.release_func) {
+    return Status::Invalid(
+        "Must provide a release event function if providing a non-null event");
+  }
+
+  SchemaExportGuard guard(out_schema);
+  if (out_schema != nullptr) {
+    RETURN_NOT_OK(ExportType(*array.type(), out_schema));
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*array.data()));
+  if (!device_info.first) {
+    out->device_type = ARROW_DEVICE_CPU;
+  } else {
+    out->device_type = static_cast<ArrowDeviceType>(*device_info.first);
+  }
+  out->device_id = device_info.second;
+
+  ArrayExporter exporter;
+  RETURN_NOT_OK(exporter.Export(array.data()));
+  exporter.Finish(&out->array);
+
+  auto* pdata = 
reinterpret_cast<ExportedArrayPrivateData*>(out->array.private_data);
+  pdata->sync_event_ = sync_event;
+  out->sync_event = sync_event.sync_event;
+
+  guard.Detach();
+  return Status::OK();
+}
+
+Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent 
sync_event,
+                               struct ArrowDeviceArray* out,
+                               struct ArrowSchema* out_schema) {
+  if (sync_event.sync_event != nullptr && sync_event.release_func == nullptr) {
+    return Status::Invalid(
+        "Must provide a release event function if providing a non-null event");
+  }
+
+  // XXX perhaps bypass ToStructArray for speed?
+  ARROW_ASSIGN_OR_RAISE(auto array, batch.ToStructArray());
+
+  SchemaExportGuard guard(out_schema);
+  if (out_schema != nullptr) {
+    // Export the schema, not the struct type, so as not to lose top-level 
metadata
+    RETURN_NOT_OK(ExportSchema(*batch.schema(), out_schema));
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*array->data()));

Review Comment:
   There's a lot of copy-pasted code here that could be avoided IMHO.



##########
cpp/src/arrow/c/bridge_test.cc:
##########
@@ -1112,6 +1130,392 @@ TEST_F(TestArrayExport, ExportRecordBatch) {
   }
 }
 
+////////////////////////////////////////////////////////////////////////////
+// Device Array Export Tests
+
+static const char kMyDeviceTypeName[] = "arrowtest::MyDevice";
+static const ArrowDeviceType kMyDeviceType = ARROW_DEVICE_EXT_DEV;
+
+class MyBuffer final : public MutableBuffer {
+ public:
+  using MutableBuffer::MutableBuffer;
+
+  ~MyBuffer() { default_memory_pool()->Free(const_cast<uint8_t*>(data_), 
size_); }
+};
+
+class MyMemoryManager : public CPUMemoryManager {
+ public:
+  explicit MyMemoryManager(const std::shared_ptr<Device>& device)
+      : CPUMemoryManager(device, default_memory_pool()) {}
+
+  Result<std::unique_ptr<Buffer>> AllocateBuffer(int64_t size) override {
+    uint8_t* data;
+    RETURN_NOT_OK(pool_->Allocate(size, &data));
+    return std::make_unique<MyBuffer>(data, size, shared_from_this());
+  }
+
+ protected:
+  Result<std::shared_ptr<Buffer>> CopyBufferFrom(
+      const std::shared_ptr<Buffer>& buf,
+      const std::shared_ptr<MemoryManager>& from) override {
+    return CopyNonOwnedFrom(*buf, from);
+  }
+  Result<std::unique_ptr<Buffer>> CopyNonOwnedFrom(
+      const Buffer& buf, const std::shared_ptr<MemoryManager>& from) override {
+    if (!from->is_cpu()) {
+      return nullptr;
+    }
+
+    ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(buf.size()));
+    if (buf.size() > 0) {
+      memcpy(dest->mutable_data(), buf.data(), 
static_cast<size_t>(buf.size()));
+    }
+    return std::move(dest);
+  }
+};
+
+class MyDevice : public Device {
+ public:
+  explicit MyDevice(int value) : Device(true), value_(value) {}
+  const char* type_name() const override { return kMyDeviceTypeName; }
+  std::string ToString() const override { return kMyDeviceTypeName; }
+  bool Equals(const Device& other) const override {
+    if (other.type_name() != kMyDeviceTypeName || other.device_type() != 
device_type()) {
+      return false;
+    }
+    return checked_cast<const MyDevice&>(other).value_ == value_;
+  }
+  DeviceAllocationType device_type() const override {
+    return static_cast<DeviceAllocationType>(kMyDeviceType);
+  }
+  int64_t device_id() const override { return value_; }
+  std::shared_ptr<MemoryManager> default_memory_manager() override {
+    return std::make_shared<MyMemoryManager>(shared_from_this());
+  }
+
+ protected:
+  int value_;
+};
+
+class TestDeviceArrayExport : public ::testing::Test {
+ public:
+  void SetUp() override { pool_ = default_memory_pool(); }
+
+  static Result<std::shared_ptr<ArrayData>> ToDeviceData(
+      const std::shared_ptr<MemoryManager>& mm, const ArrayData& data) {
+    arrow::BufferVector buffers;
+    for (const auto& buf : data.buffers) {
+      if (buf) {
+        ARROW_ASSIGN_OR_RAISE(auto dest, mm->CopyBuffer(buf, mm));
+        buffers.push_back(dest);
+      } else {
+        buffers.push_back(nullptr);
+      }
+    }
+
+    arrow::ArrayDataVector children;
+    for (const auto& child : data.child_data) {
+      ARROW_ASSIGN_OR_RAISE(auto dest, ToDeviceData(mm, *child));
+      children.push_back(dest);
+    }
+
+    return ArrayData::Make(data.type, data.length, buffers, children, 
data.null_count,
+                           data.offset);
+  }
+
+  static Result<std::shared_ptr<Array>> ToDevice(const 
std::shared_ptr<MemoryManager>& mm,
+                                                 const ArrayData& data) {
+    ARROW_ASSIGN_OR_RAISE(auto result, ToDeviceData(mm, data));
+    return MakeArray(result);
+  }
+
+  template <typename ArrayFactory>
+  static std::function<Result<std::shared_ptr<Array>>()> ToDeviceFactory(
+      const std::shared_ptr<MemoryManager>& mm, ArrayFactory&& factory) {
+    return [&]() { return ToDevice(mm, *factory()->data()); };
+  }
+
+  static std::function<Result<std::shared_ptr<Array>>()> JSONArrayFactory(
+      const std::shared_ptr<MemoryManager>& mm, std::shared_ptr<DataType> type,
+      const char* json) {
+    return [=]() { return ToDevice(mm, *ArrayFromJSON(type, json)->data()); };
+  }
+
+  template <typename ArrayFactory, typename ExportCheckFunc>
+  void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& 
check_func) {
+    auto orig_bytes = pool_->bytes_allocated();
+
+    std::shared_ptr<Array> arr;
+    ASSERT_OK_AND_ASSIGN(arr, ToResult(factory()));
+    ARROW_SCOPED_TRACE("type = ", arr->type()->ToString(),
+                       ", array data = ", arr->ToString());
+    const ArrayData& data = *arr->data();  // non-owning reference
+    struct ArrowDeviceArray c_export;
+    ASSERT_OK(ExportDeviceArray(*arr, {nullptr, nullptr}, &c_export));
+
+    ArrayExportGuard guard(&c_export.array);
+    auto new_bytes = pool_->bytes_allocated();
+    ASSERT_GT(new_bytes, orig_bytes);
+
+    // Release the shared_ptr<Array>, underlying data should be held alive
+    arr.reset();
+    ASSERT_EQ(pool_->bytes_allocated(), new_bytes);
+    check_func(&c_export, data, kMyDeviceType, 1, nullptr);
+
+    // Release the ArrowArray, underlying data should be destroyed
+    guard.Release();
+    ASSERT_EQ(pool_->bytes_allocated(), orig_bytes);
+  }
+
+  template <typename ArrayFactory>
+  void TestNested(ArrayFactory&& factory) {
+    ArrayExportChecker checker;
+    TestWithArrayFactory(std::forward<ArrayFactory>(factory), checker);
+  }
+
+  void TestNested(const std::shared_ptr<MemoryManager>& mm,
+                  const std::shared_ptr<DataType>& type, const char* json) {
+    TestNested(JSONArrayFactory(mm, type, json));
+  }
+
+  template <typename ArrayFactory>
+  void TestPrimitive(ArrayFactory&& factory) {
+    TestNested(std::forward<ArrayFactory>(factory));
+  }
+
+  void TestPrimitive(const std::shared_ptr<MemoryManager>& mm,
+                     const std::shared_ptr<DataType>& type, const char* json) {
+    TestNested(mm, type, json);
+  }
+
+ protected:
+  MemoryPool* pool_;
+};
+
+TEST_F(TestDeviceArrayExport, Primitive) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  TestPrimitive(mm, int8(), "[1, 2, null, -3]");
+  TestPrimitive(mm, int16(), "[1, 2, -3]");
+  TestPrimitive(mm, int32(), "[1, 2, null, -3]");
+  TestPrimitive(mm, int64(), "[1, 2, -3]");
+  TestPrimitive(mm, uint8(), "[1, 2, 3]");
+  TestPrimitive(mm, uint16(), "[1, 2, null, 3]");
+  TestPrimitive(mm, uint32(), "[1, 2, 3]");
+  TestPrimitive(mm, uint64(), "[1, 2, null, 3]");
+
+  TestPrimitive(mm, boolean(), "[true, false, null]");
+
+  TestPrimitive(mm, float32(), "[1.5, null]");
+  TestPrimitive(mm, float64(), "[1.5, null]");
+
+  TestPrimitive(mm, fixed_size_binary(3), R"(["foo", "bar", null])");
+  TestPrimitive(mm, binary(), R"(["foo", "bar", null])");
+  TestPrimitive(mm, large_binary(), R"(["foo", "bar", null])");
+  TestPrimitive(mm, utf8(), R"(["foo", "bar", null])");
+  TestPrimitive(mm, large_utf8(), R"(["foo", "bar", null])");
+
+  TestPrimitive(mm, decimal(16, 4), R"(["1234.5670", null])");
+  TestPrimitive(mm, decimal256(16, 4), R"(["1234.5670", null])");
+
+  TestPrimitive(mm, month_day_nano_interval(), R"([[-1, 5, 20], null])");
+}
+
+TEST_F(TestDeviceArrayExport, PrimitiveSliced) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  auto factory = [=]() {
+    return (*ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, null, -3]")->data()))
+        ->Slice(1, 2);
+  };
+  TestPrimitive(factory);
+}
+
+TEST_F(TestDeviceArrayExport, Temporal) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  const char* json = "[1, 2, null, 42]";
+  TestPrimitive(mm, date32(), json);
+  TestPrimitive(mm, date64(), json);
+  TestPrimitive(mm, time32(TimeUnit::SECOND), json);
+  TestPrimitive(mm, time32(TimeUnit::MILLI), json);
+  TestPrimitive(mm, time64(TimeUnit::MICRO), json);
+  TestPrimitive(mm, time64(TimeUnit::NANO), json);
+  TestPrimitive(mm, duration(TimeUnit::SECOND), json);
+  TestPrimitive(mm, duration(TimeUnit::MILLI), json);
+  TestPrimitive(mm, duration(TimeUnit::MICRO), json);
+  TestPrimitive(mm, duration(TimeUnit::NANO), json);
+  TestPrimitive(mm, month_interval(), json);
+
+  TestPrimitive(mm, day_time_interval(), "[[7, 600], null]");
+
+  json = R"(["1970-01-01","2000-02-29","1900-02-28"])";
+  TestPrimitive(mm, timestamp(TimeUnit::SECOND), json);
+  TestPrimitive(mm, timestamp(TimeUnit::SECOND, "Europe/Paris"), json);
+  TestPrimitive(mm, timestamp(TimeUnit::MILLI), json);
+  TestPrimitive(mm, timestamp(TimeUnit::MILLI, "Europe/Paris"), json);
+  TestPrimitive(mm, timestamp(TimeUnit::MICRO), json);
+  TestPrimitive(mm, timestamp(TimeUnit::MICRO, "Europe/Paris"), json);
+  TestPrimitive(mm, timestamp(TimeUnit::NANO), json);
+  TestPrimitive(mm, timestamp(TimeUnit::NANO, "Europe/Paris"), json);
+}
+
+TEST_F(TestDeviceArrayExport, List) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  TestNested(mm, list(int8()), "[[1, 2], [3, null], null]");
+  TestNested(mm, large_list(uint16()), "[[1, 2], [3, null], null]");
+  TestNested(mm, fixed_size_list(int64(), 2), "[[1, 2], [3, null], null]");
+
+  TestNested(mm, list(large_list(int32())), "[[[1, 2], [3], null], null]");
+}
+
+TEST_F(TestDeviceArrayExport, ListSliced) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  {
+    auto factory = [=]() {
+      return (*ToDevice(
+                  mm, *ArrayFromJSON(list(int8()), "[[1, 2], [3, null], [4, 5, 
6], null]")
+                           ->data()))
+          ->Slice(1, 2);
+    };
+    TestNested(factory);
+  }
+  {
+    auto factory = [=]() {
+      auto values =
+          (*ToDevice(mm,
+                     *ArrayFromJSON(int16(), "[1, 2, 3, 4, null, 5, 6, 7, 
8]")->data()))
+              ->Slice(1, 6);
+      auto offsets = (*ToDevice(mm, *ArrayFromJSON(int32(), "[0, 2, 3, 5, 
6]")->data()))
+                         ->Slice(2, 4);
+      return ListArray::FromArrays(*offsets, *values);
+    };
+    TestNested(factory);
+  }
+}
+
+TEST_F(TestDeviceArrayExport, Struct) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  const char* data = R"([[1, "foo"], [2, null]])";
+  auto type = struct_({field("a", int8()), field("b", utf8())});
+  TestNested(mm, type, data);
+}
+
+TEST_F(TestDeviceArrayExport, Map) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  const char* json = R"([[[1, "foo"], [2, null]], [[3, "bar"]]])";
+  TestNested(mm, map(int8(), utf8()), json);
+  TestNested(mm, map(int8(), utf8(), /*keys_sorted=*/true), json);
+}
+
+TEST_F(TestDeviceArrayExport, Union) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  const char* data = "[null, [42, 1], [43, true], [42, null], [42, 2]]";
+  // Dense
+  auto field_a = field("a", int8());
+  auto field_b = field("b", boolean(), /*nullable=*/false);
+  auto type = dense_union({field_a, field_b}, {42, 43});
+  TestNested(mm, type, data);
+  // Sparse
+  field_a = field("a", int8(), /*nullable=*/false);
+  field_b = field("b", boolean());
+  type = sparse_union({field_a, field_b}, {42, 43});
+  TestNested(mm, type, data);
+}
+
+TEST_F(TestDeviceArrayExport, Extension) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  TestPrimitive(ToDeviceFactory(mm, ExampleUuid));
+  TestPrimitive(ToDeviceFactory(mm, ExampleSmallint));
+  TestPrimitive(ToDeviceFactory(mm, ExampleComplex128));
+}
+
+TEST_F(TestDeviceArrayExport, ExportArrayAndType) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  struct ArrowSchema c_schema {};
+  struct ArrowDeviceArray c_array {};
+  SchemaExportGuard schema_guard(&c_schema);
+  ArrayExportGuard array_guard(&c_array.array);
+
+  auto array = ToDevice(mm, *ArrayFromJSON(int8(), "[1, 2, 
3]")->data()).ValueOrDie();
+  ASSERT_OK(ExportDeviceArray(*array, {nullptr, nullptr}, &c_array, 
&c_schema));
+  const ArrayData& data = *array->data();
+  array.reset();
+  ASSERT_FALSE(ArrowSchemaIsReleased(&c_schema));
+  ASSERT_FALSE(ArrowArrayIsReleased(&c_array.array));
+  ASSERT_EQ(c_schema.format, std::string("c"));
+  ASSERT_EQ(c_schema.n_children, 0);
+  ArrayExportChecker checker{};
+  checker(&c_array, data, kMyDeviceType, 1, nullptr);
+}
+
+TEST_F(TestDeviceArrayExport, ExportRecordBatch) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  struct ArrowSchema c_schema {};
+  struct ArrowDeviceArray c_array {};
+
+  auto schema = ::arrow::schema(
+      {field("ints", int16()), field("bools", boolean(), /*nullable=*/false)});
+  schema = schema->WithMetadata(key_value_metadata(kMetadataKeys2, 
kMetadataValues2));
+  auto arr0 = ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, 
null]")->data()).ValueOrDie();
+  auto arr1 = ToDevice(mm, *ArrayFromJSON(boolean(), "[false, true, 
false]")->data())
+                  .ValueOrDie();
+
+  auto batch_factory = [&]() { return RecordBatch::Make(schema, 3, {arr0, 
arr1}); };
+
+  {
+    auto batch = batch_factory();
+
+    ASSERT_OK(ExportDeviceRecordBatch(*batch, {nullptr, nullptr}, &c_array, 
&c_schema));
+    SchemaExportGuard schema_guard(&c_schema);
+    ArrayExportGuard array_guard(&c_array.array);
+    RecordBatchExportChecker checker{};
+    checker(&c_array, *batch, kMyDeviceType, 1, nullptr);
+
+    // create batch anew, with the same buffer pointers
+    batch = batch_factory();
+    checker(&c_array, *batch, kMyDeviceType, 1, nullptr);
+  }
+  {
+    // Check one can export both schema and record batch at once
+    auto batch = batch_factory();
+
+    ASSERT_OK(ExportDeviceRecordBatch(*batch, {nullptr, nullptr}, &c_array, 
&c_schema));
+    SchemaExportGuard schema_guard(&c_schema);
+    ArrayExportGuard array_guard(&c_array.array);
+    ASSERT_EQ(c_schema.format, std::string("+s"));
+    ASSERT_EQ(c_schema.n_children, 2);
+    ASSERT_NE(c_schema.metadata, nullptr);
+    ASSERT_EQ(kEncodedMetadata2,
+              std::string(c_schema.metadata, kEncodedMetadata2.size()));
+    RecordBatchExportChecker checker{};
+    checker(&c_array, *batch, kMyDeviceType, 1, nullptr);
+
+    // Create batch anew, with the same buffer pointers
+    batch = batch_factory();
+    checker(&c_array, *batch, kMyDeviceType, 1, nullptr);
+  }
+}
+

Review Comment:
   It would be nice to have tests for importing device arrays.
   Also tests that the sync event and its release func are taken into account.



##########
cpp/src/arrow/c/bridge_test.cc:
##########
@@ -1112,6 +1130,392 @@ TEST_F(TestArrayExport, ExportRecordBatch) {
   }
 }
 
+////////////////////////////////////////////////////////////////////////////
+// Device Array Export Tests
+
+static const char kMyDeviceTypeName[] = "arrowtest::MyDevice";
+static const ArrowDeviceType kMyDeviceType = ARROW_DEVICE_EXT_DEV;
+
+class MyBuffer final : public MutableBuffer {
+ public:
+  using MutableBuffer::MutableBuffer;
+
+  ~MyBuffer() { default_memory_pool()->Free(const_cast<uint8_t*>(data_), 
size_); }
+};
+
+class MyMemoryManager : public CPUMemoryManager {
+ public:
+  explicit MyMemoryManager(const std::shared_ptr<Device>& device)
+      : CPUMemoryManager(device, default_memory_pool()) {}
+
+  Result<std::unique_ptr<Buffer>> AllocateBuffer(int64_t size) override {
+    uint8_t* data;
+    RETURN_NOT_OK(pool_->Allocate(size, &data));
+    return std::make_unique<MyBuffer>(data, size, shared_from_this());
+  }
+
+ protected:
+  Result<std::shared_ptr<Buffer>> CopyBufferFrom(
+      const std::shared_ptr<Buffer>& buf,
+      const std::shared_ptr<MemoryManager>& from) override {
+    return CopyNonOwnedFrom(*buf, from);
+  }
+  Result<std::unique_ptr<Buffer>> CopyNonOwnedFrom(
+      const Buffer& buf, const std::shared_ptr<MemoryManager>& from) override {
+    if (!from->is_cpu()) {
+      return nullptr;
+    }
+
+    ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(buf.size()));
+    if (buf.size() > 0) {
+      memcpy(dest->mutable_data(), buf.data(), 
static_cast<size_t>(buf.size()));
+    }
+    return std::move(dest);
+  }
+};
+
+class MyDevice : public Device {
+ public:
+  explicit MyDevice(int value) : Device(true), value_(value) {}
+  const char* type_name() const override { return kMyDeviceTypeName; }
+  std::string ToString() const override { return kMyDeviceTypeName; }
+  bool Equals(const Device& other) const override {
+    if (other.type_name() != kMyDeviceTypeName || other.device_type() != 
device_type()) {
+      return false;
+    }
+    return checked_cast<const MyDevice&>(other).value_ == value_;
+  }
+  DeviceAllocationType device_type() const override {
+    return static_cast<DeviceAllocationType>(kMyDeviceType);
+  }
+  int64_t device_id() const override { return value_; }
+  std::shared_ptr<MemoryManager> default_memory_manager() override {
+    return std::make_shared<MyMemoryManager>(shared_from_this());
+  }
+
+ protected:
+  int value_;
+};
+
+class TestDeviceArrayExport : public ::testing::Test {
+ public:
+  void SetUp() override { pool_ = default_memory_pool(); }
+
+  static Result<std::shared_ptr<ArrayData>> ToDeviceData(
+      const std::shared_ptr<MemoryManager>& mm, const ArrayData& data) {
+    arrow::BufferVector buffers;
+    for (const auto& buf : data.buffers) {
+      if (buf) {
+        ARROW_ASSIGN_OR_RAISE(auto dest, mm->CopyBuffer(buf, mm));
+        buffers.push_back(dest);
+      } else {
+        buffers.push_back(nullptr);
+      }
+    }
+
+    arrow::ArrayDataVector children;
+    for (const auto& child : data.child_data) {
+      ARROW_ASSIGN_OR_RAISE(auto dest, ToDeviceData(mm, *child));
+      children.push_back(dest);
+    }
+
+    return ArrayData::Make(data.type, data.length, buffers, children, 
data.null_count,
+                           data.offset);
+  }
+
+  static Result<std::shared_ptr<Array>> ToDevice(const 
std::shared_ptr<MemoryManager>& mm,
+                                                 const ArrayData& data) {
+    ARROW_ASSIGN_OR_RAISE(auto result, ToDeviceData(mm, data));
+    return MakeArray(result);
+  }
+
+  template <typename ArrayFactory>
+  static std::function<Result<std::shared_ptr<Array>>()> ToDeviceFactory(
+      const std::shared_ptr<MemoryManager>& mm, ArrayFactory&& factory) {
+    return [&]() { return ToDevice(mm, *factory()->data()); };
+  }
+
+  static std::function<Result<std::shared_ptr<Array>>()> JSONArrayFactory(
+      const std::shared_ptr<MemoryManager>& mm, std::shared_ptr<DataType> type,
+      const char* json) {
+    return [=]() { return ToDevice(mm, *ArrayFromJSON(type, json)->data()); };
+  }
+
+  template <typename ArrayFactory, typename ExportCheckFunc>
+  void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& 
check_func) {
+    auto orig_bytes = pool_->bytes_allocated();
+
+    std::shared_ptr<Array> arr;
+    ASSERT_OK_AND_ASSIGN(arr, ToResult(factory()));
+    ARROW_SCOPED_TRACE("type = ", arr->type()->ToString(),
+                       ", array data = ", arr->ToString());
+    const ArrayData& data = *arr->data();  // non-owning reference
+    struct ArrowDeviceArray c_export;
+    ASSERT_OK(ExportDeviceArray(*arr, {nullptr, nullptr}, &c_export));
+
+    ArrayExportGuard guard(&c_export.array);
+    auto new_bytes = pool_->bytes_allocated();
+    ASSERT_GT(new_bytes, orig_bytes);
+
+    // Release the shared_ptr<Array>, underlying data should be held alive
+    arr.reset();
+    ASSERT_EQ(pool_->bytes_allocated(), new_bytes);
+    check_func(&c_export, data, kMyDeviceType, 1, nullptr);
+
+    // Release the ArrowArray, underlying data should be destroyed
+    guard.Release();
+    ASSERT_EQ(pool_->bytes_allocated(), orig_bytes);
+  }
+
+  template <typename ArrayFactory>
+  void TestNested(ArrayFactory&& factory) {
+    ArrayExportChecker checker;
+    TestWithArrayFactory(std::forward<ArrayFactory>(factory), checker);
+  }
+
+  void TestNested(const std::shared_ptr<MemoryManager>& mm,
+                  const std::shared_ptr<DataType>& type, const char* json) {
+    TestNested(JSONArrayFactory(mm, type, json));
+  }
+
+  template <typename ArrayFactory>
+  void TestPrimitive(ArrayFactory&& factory) {
+    TestNested(std::forward<ArrayFactory>(factory));
+  }
+
+  void TestPrimitive(const std::shared_ptr<MemoryManager>& mm,
+                     const std::shared_ptr<DataType>& type, const char* json) {
+    TestNested(mm, type, json);
+  }
+
+ protected:
+  MemoryPool* pool_;
+};
+
+TEST_F(TestDeviceArrayExport, Primitive) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  TestPrimitive(mm, int8(), "[1, 2, null, -3]");
+  TestPrimitive(mm, int16(), "[1, 2, -3]");
+  TestPrimitive(mm, int32(), "[1, 2, null, -3]");
+  TestPrimitive(mm, int64(), "[1, 2, -3]");
+  TestPrimitive(mm, uint8(), "[1, 2, 3]");
+  TestPrimitive(mm, uint16(), "[1, 2, null, 3]");
+  TestPrimitive(mm, uint32(), "[1, 2, 3]");
+  TestPrimitive(mm, uint64(), "[1, 2, null, 3]");
+
+  TestPrimitive(mm, boolean(), "[true, false, null]");
+
+  TestPrimitive(mm, float32(), "[1.5, null]");
+  TestPrimitive(mm, float64(), "[1.5, null]");
+
+  TestPrimitive(mm, fixed_size_binary(3), R"(["foo", "bar", null])");
+  TestPrimitive(mm, binary(), R"(["foo", "bar", null])");
+  TestPrimitive(mm, large_binary(), R"(["foo", "bar", null])");
+  TestPrimitive(mm, utf8(), R"(["foo", "bar", null])");
+  TestPrimitive(mm, large_utf8(), R"(["foo", "bar", null])");
+
+  TestPrimitive(mm, decimal(16, 4), R"(["1234.5670", null])");
+  TestPrimitive(mm, decimal256(16, 4), R"(["1234.5670", null])");
+
+  TestPrimitive(mm, month_day_nano_interval(), R"([[-1, 5, 20], null])");

Review Comment:
   TBH, I'm not sure it's useful to copy/paste all those tests, besides the 
readability burden of duplicate code.
   
   We just want to check that primitive, nested, extension and dictionary are 
properly handled in general (detailed export behaviour is already tested 
previously).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to