This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 6517a0dd feat(c/driver/postgresql): INTERVAL COPY Writer (#1184)
6517a0dd is described below

commit 6517a0dd6f17366a7c103acb50d82496204f4a45
Author: William Ayd <[email protected]>
AuthorDate: Wed Oct 11 16:38:06 2023 -0400

    feat(c/driver/postgresql): INTERVAL COPY Writer (#1184)
---
 c/driver/postgresql/postgres_copy_reader.h       |  92 +++++++++++++
 c/driver/postgresql/postgres_copy_reader_test.cc | 158 ++++++++++++++++++++++-
 2 files changed, 249 insertions(+), 1 deletion(-)

diff --git a/c/driver/postgresql/postgres_copy_reader.h 
b/c/driver/postgresql/postgres_copy_reader.h
index 3a26442c..0c0f003b 100644
--- a/c/driver/postgresql/postgres_copy_reader.h
+++ b/c/driver/postgresql/postgres_copy_reader.h
@@ -1199,6 +1199,77 @@ class PostgresCopyDoubleFieldWriter : public 
PostgresCopyFieldWriter {
   }
 };
 
+class PostgresCopyIntervalFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
+    constexpr int32_t field_size_bytes = 16;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, 
error));
+
+    struct ArrowInterval interval;
+    ArrowIntervalInit(&interval, NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO);
+    ArrowArrayViewGetIntervalUnsafe(array_view_, index, &interval);
+    const int64_t ms = interval.ns / 1000;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int64_t>(buffer, ms, error));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, interval.days, 
error));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, interval.months, 
error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+template <enum ArrowTimeUnit TU>
+class PostgresCopyDurationFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
+    constexpr int32_t field_size_bytes = 16;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, 
error));
+
+    int64_t raw_value = ArrowArrayViewGetIntUnsafe(array_view_, index);
+    int64_t value;
+
+    bool overflow_safe = true;
+    switch (TU) {
+      case NANOARROW_TIME_UNIT_SECOND:
+        if ((overflow_safe = raw_value <= kMaxSafeSecondsToMicros &&
+             raw_value >= kMinSafeSecondsToMicros)) {
+          value = raw_value * 1000000;
+        }
+        break;
+      case NANOARROW_TIME_UNIT_MILLI:
+        if ((overflow_safe = raw_value <= kMaxSafeMillisToMicros &&
+             raw_value >= kMinSafeMillisToMicros)) {
+          value = raw_value * 1000;
+        }
+        break;
+      case NANOARROW_TIME_UNIT_MICRO:
+        value = raw_value;
+        break;
+      case NANOARROW_TIME_UNIT_NANO:
+        value = raw_value / 1000;
+        break;
+    }
+
+    if (!overflow_safe) {
+      ArrowErrorSet(
+          error,
+          "Row %" PRId64 " duration value %" PRId64 " with unit %d would 
overflow",
+          index,
+          raw_value,
+          TU);
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
+
+    // 2000-01-01 00:00:00.000000 in microseconds
+    constexpr uint32_t days = 0;
+    constexpr uint32_t months = 0;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int64_t>(buffer, value, error));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, days, error));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, months, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
 class PostgresCopyBinaryFieldWriter : public PostgresCopyFieldWriter {
  public:
   ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
@@ -1312,6 +1383,27 @@ static inline ArrowErrorCode MakeCopyFieldWriter(
         }
         return NANOARROW_OK;
     }
+    case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
+      *out = new PostgresCopyIntervalFieldWriter();
+      return NANOARROW_OK;
+    case NANOARROW_TYPE_DURATION: {
+      switch (schema_view.time_unit) {
+        case NANOARROW_TIME_UNIT_SECOND:
+          *out = new 
PostgresCopyDurationFieldWriter<NANOARROW_TIME_UNIT_SECOND>();
+          break;
+        case NANOARROW_TIME_UNIT_MILLI:
+          *out = new 
PostgresCopyDurationFieldWriter<NANOARROW_TIME_UNIT_MILLI>();
+          break;
+        case NANOARROW_TIME_UNIT_MICRO:
+          *out = new 
PostgresCopyDurationFieldWriter<NANOARROW_TIME_UNIT_MICRO>();
+
+          break;
+        case NANOARROW_TIME_UNIT_NANO:
+          *out = new 
PostgresCopyDurationFieldWriter<NANOARROW_TIME_UNIT_NANO>();
+          break;
+      }
+      return NANOARROW_OK;
+    }
     default:
       return EINVAL;
   }
diff --git a/c/driver/postgresql/postgres_copy_reader_test.cc 
b/c/driver/postgresql/postgres_copy_reader_test.cc
index e4bbb9ea..6404b21e 100644
--- a/c/driver/postgresql/postgres_copy_reader_test.cc
+++ b/c/driver/postgresql/postgres_copy_reader_test.cc
@@ -680,7 +680,6 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadNumeric) {
   EXPECT_EQ(std::string(item.data, item.size_bytes), "inf");
 }
 
-
 // COPY (SELECT CAST(col AS TIMESTAMP) FROM (  VALUES ('1900-01-01 12:34:56'),
 // ('2100-01-01 12:34:56'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT 
BINARY);
 static uint8_t kTestPgCopyTimestamp[] = {
@@ -800,6 +799,163 @@ INSTANTIATE_TEST_SUITE_P(PostgresCopyWriteTimestamp,
                          PostgresCopyWriteTimestampTest,
                          testing::ValuesIn(ts_values));
 
+// COPY (SELECT CAST(col AS INTERVAL) FROM (  VALUES ('-1 months -2 days -4 
seconds'),
+// ('1 months 2 days 4 seconds'), (NULL)) AS drvd("col")) TO STDOUT WITH 
(FORMAT BINARY);
+static uint8_t kTestPgCopyInterval[] = {
+    0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x10, 0xff,
+    0xff, 0xff, 0xff, 0xff, 0xc2, 0xf7, 0x00, 0xff, 0xff, 0xff, 0xfe, 0xff, 
0xff, 0xff,
+    0xff, 0x00, 0x01, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x3d, 0x09,
+    0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0xff, 
0xff, 0xff,
+    0xff, 0xff, 0xff};
+
+TEST(PostgresCopyUtilsTest, PostgresCopyReadInterval) {
+  ArrowBufferView data;
+  data.data.as_uint8 = kTestPgCopyInterval;
+  data.size_bytes = sizeof(kTestPgCopyInterval);
+
+  auto col_type = PostgresType(PostgresTypeId::kInterval);
+  PostgresType input_type(PostgresTypeId::kRecord);
+  input_type.AppendChild("col", col_type);
+
+  PostgresCopyStreamTester tester;
+  ASSERT_EQ(tester.Init(input_type), NANOARROW_OK);
+  ASSERT_EQ(tester.ReadAll(&data), ENODATA);
+  ASSERT_EQ(data.data.as_uint8 - kTestPgCopyInterval, 
sizeof(kTestPgCopyInterval));
+  ASSERT_EQ(data.size_bytes, 0);
+
+  nanoarrow::UniqueArray array;
+  ASSERT_EQ(tester.GetArray(array.get()), NANOARROW_OK);
+  ASSERT_EQ(array->length, 3);
+  ASSERT_EQ(array->n_children, 1);
+
+  nanoarrow::UniqueSchema schema;
+  tester.GetSchema(schema.get());
+
+  nanoarrow::UniqueArrayView array_view;
+  ASSERT_EQ(ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), 
nullptr),
+            NANOARROW_OK);
+  ASSERT_EQ(array_view->children[0]->storage_type,
+            NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO);
+  ASSERT_EQ(ArrowArrayViewSetArray(array_view.get(), array.get(), nullptr), 
NANOARROW_OK);
+
+  auto validity = array_view->children[0]->buffer_views[0].data.as_uint8;
+  ASSERT_TRUE(ArrowBitGet(validity, 0));
+  ASSERT_TRUE(ArrowBitGet(validity, 1));
+  ASSERT_FALSE(ArrowBitGet(validity, 2));
+
+  struct ArrowInterval interval;
+  ArrowIntervalInit(&interval, NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO);
+  ArrowArrayViewGetIntervalUnsafe(array_view->children[0], 0, &interval);
+  ASSERT_EQ(interval.months, -1);
+  ASSERT_EQ(interval.days, -2);
+  ASSERT_EQ(interval.ns, -4000000000);
+  ArrowArrayViewGetIntervalUnsafe(array_view->children[0], 1, &interval);
+  ASSERT_EQ(interval.months, 1);
+  ASSERT_EQ(interval.days, 2);
+  ASSERT_EQ(interval.ns, 4000000000);
+}
+
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteInterval) {
+  adbc_validation::Handle<struct ArrowSchema> schema;
+  adbc_validation::Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  const enum ArrowType type = NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO;
+  // values are days, months, ns
+  struct ArrowInterval neg_interval;
+  struct ArrowInterval pos_interval;
+
+  ArrowIntervalInit(&neg_interval, type);
+  ArrowIntervalInit(&pos_interval, type);
+
+  neg_interval.months = -1;
+  neg_interval.days = -2;
+  neg_interval.ns = -4000000000;
+
+  pos_interval.months = 1;
+  pos_interval.days = 2;
+  pos_interval.ns = 4000000000;
+
+  const std::vector<std::optional<ArrowInterval*>> values = {
+    &neg_interval, &pos_interval, std::nullopt};
+
+  ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", type}}), 
ADBC_STATUS_OK);
+
+  ASSERT_EQ(adbc_validation::MakeBatch<ArrowInterval*>(
+              &schema.value, &array.value, &na_error, values), ADBC_STATUS_OK);
+
+  PostgresCopyStreamWriteTester tester;
+  ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+  ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);
+
+  const struct ArrowBuffer buf = tester.WriteBuffer();
+  // The last 2 bytes of a message can be transmitted via PQputCopyData
+  // so no need to test those bytes from the Writer
+  constexpr size_t buf_size = sizeof(kTestPgCopyInterval) - 2;
+  ASSERT_EQ(buf.size_bytes, buf_size);
+  for (size_t i = 0; i < buf_size; i++) {
+    ASSERT_EQ(buf.data[i], kTestPgCopyInterval[i]);
+  }
+}
+
+// Writing a DURATION from NANOARROW produces INTERVAL in postgres without 
day/month
+// COPY (SELECT CAST(col AS INTERVAL) FROM (  VALUES ('-4 seconds'),
+// ('4 seconds'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT BINARY);
+static uint8_t kTestPgCopyDuration[] = {
+    0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x10, 0xff,
+    0xff, 0xff, 0xff, 0xff, 0xc2, 0xf7, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x3d, 0x09,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xff, 
0xff, 0xff,
+    0xff, 0xff, 0xff};
+using DurationTestParamType = std::tuple<enum ArrowTimeUnit,
+  std::vector<std::optional<int64_t>>>;
+
+class PostgresCopyWriteDurationTest : public testing::TestWithParam<
+  DurationTestParamType> {};
+
+TEST_P(PostgresCopyWriteDurationTest, WritesProperBufferValues) {
+  adbc_validation::Handle<struct ArrowSchema> schema;
+  adbc_validation::Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  const enum ArrowType type = NANOARROW_TYPE_DURATION;
+
+  DurationTestParamType parameters = GetParam();
+  enum ArrowTimeUnit unit = std::get<0>(parameters);
+  const std::vector<std::optional<int64_t>> values = std::get<1>(parameters);
+
+  ArrowSchemaInit(&schema.value);
+  ArrowSchemaSetTypeStruct(&schema.value, 1);
+  ArrowSchemaSetTypeDateTime(schema->children[0], type, unit, nullptr);
+  ArrowSchemaSetName(schema->children[0], "col");
+  ASSERT_EQ(adbc_validation::MakeBatch<int64_t>(
+              &schema.value, &array.value, &na_error, values), ADBC_STATUS_OK);
+
+  PostgresCopyStreamWriteTester tester;
+  ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+  ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);
+
+  const struct ArrowBuffer buf = tester.WriteBuffer();
+  // The last 2 bytes of a message can be transmitted via PQputCopyData
+  // so no need to test those bytes from the Writer
+  constexpr size_t buf_size = sizeof(kTestPgCopyDuration) - 2;
+  ASSERT_EQ(buf.size_bytes, buf_size);
+  for (size_t i = 0; i < buf_size; i++) {
+    ASSERT_EQ(buf.data[i], kTestPgCopyDuration[i]);
+  }
+}
+
+static const std::vector<DurationTestParamType> duration_params {
+  {NANOARROW_TIME_UNIT_SECOND, {-4, 4, std::nullopt}},
+  {NANOARROW_TIME_UNIT_MILLI, {-4000, 4000, std::nullopt}},
+  {NANOARROW_TIME_UNIT_MICRO, {-4000000, 4000000, std::nullopt}},
+  {NANOARROW_TIME_UNIT_NANO, {-4000000000, 4000000000, std::nullopt}},
+};
+
+INSTANTIATE_TEST_SUITE_P(PostgresCopyWriteDuration,
+                         PostgresCopyWriteDurationTest,
+                         testing::ValuesIn(duration_params));
+
 // COPY (SELECT CAST("col" AS TEXT) AS "col" FROM (  VALUES ('abc'), ('1234'),
 // (NULL::text)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
 static uint8_t kTestPgCopyText[] = {

Reply via email to