This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 6517a0dd feat(c/driver/postgresql): INTERVAL COPY Writer (#1184)
6517a0dd is described below
commit 6517a0dd6f17366a7c103acb50d82496204f4a45
Author: William Ayd <[email protected]>
AuthorDate: Wed Oct 11 16:38:06 2023 -0400
feat(c/driver/postgresql): INTERVAL COPY Writer (#1184)
---
c/driver/postgresql/postgres_copy_reader.h | 92 +++++++++++++
c/driver/postgresql/postgres_copy_reader_test.cc | 158 ++++++++++++++++++++++-
2 files changed, 249 insertions(+), 1 deletion(-)
diff --git a/c/driver/postgresql/postgres_copy_reader.h
b/c/driver/postgresql/postgres_copy_reader.h
index 3a26442c..0c0f003b 100644
--- a/c/driver/postgresql/postgres_copy_reader.h
+++ b/c/driver/postgresql/postgres_copy_reader.h
@@ -1199,6 +1199,77 @@ class PostgresCopyDoubleFieldWriter : public
PostgresCopyFieldWriter {
}
};
+class PostgresCopyIntervalFieldWriter : public PostgresCopyFieldWriter {
+ public:
+ ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error)
override {
+ constexpr int32_t field_size_bytes = 16;
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes,
error));
+
+ struct ArrowInterval interval;
+ ArrowIntervalInit(&interval, NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO);
+ ArrowArrayViewGetIntervalUnsafe(array_view_, index, &interval);
+ const int64_t ms = interval.ns / 1000;
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int64_t>(buffer, ms, error));
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, interval.days,
error));
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, interval.months,
error));
+
+ return ADBC_STATUS_OK;
+ }
+};
+
+template <enum ArrowTimeUnit TU>
+class PostgresCopyDurationFieldWriter : public PostgresCopyFieldWriter {
+ public:
+ ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error)
override {
+ constexpr int32_t field_size_bytes = 16;
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes,
error));
+
+ int64_t raw_value = ArrowArrayViewGetIntUnsafe(array_view_, index);
+ int64_t value;
+
+ bool overflow_safe = true;
+ switch (TU) {
+ case NANOARROW_TIME_UNIT_SECOND:
+ if ((overflow_safe = raw_value <= kMaxSafeSecondsToMicros &&
+ raw_value >= kMinSafeSecondsToMicros)) {
+ value = raw_value * 1000000;
+ }
+ break;
+ case NANOARROW_TIME_UNIT_MILLI:
+ if ((overflow_safe = raw_value <= kMaxSafeMillisToMicros &&
+ raw_value >= kMinSafeMillisToMicros)) {
+ value = raw_value * 1000;
+ }
+ break;
+ case NANOARROW_TIME_UNIT_MICRO:
+ value = raw_value;
+ break;
+ case NANOARROW_TIME_UNIT_NANO:
+ value = raw_value / 1000;
+ break;
+ }
+
+ if (!overflow_safe) {
+ ArrowErrorSet(
+ error,
+ "Row %" PRId64 " duration value %" PRId64 " with unit %d would
overflow",
+ index,
+ raw_value,
+ TU);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ // 2000-01-01 00:00:00.000000 in microseconds
+ constexpr uint32_t days = 0;
+ constexpr uint32_t months = 0;
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int64_t>(buffer, value, error));
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, days, error));
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, months, error));
+
+ return ADBC_STATUS_OK;
+ }
+};
+
class PostgresCopyBinaryFieldWriter : public PostgresCopyFieldWriter {
public:
ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error)
override {
@@ -1312,6 +1383,27 @@ static inline ArrowErrorCode MakeCopyFieldWriter(
}
return NANOARROW_OK;
}
+ case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
+ *out = new PostgresCopyIntervalFieldWriter();
+ return NANOARROW_OK;
+ case NANOARROW_TYPE_DURATION: {
+ switch (schema_view.time_unit) {
+ case NANOARROW_TIME_UNIT_SECOND:
+ *out = new
PostgresCopyDurationFieldWriter<NANOARROW_TIME_UNIT_SECOND>();
+ break;
+ case NANOARROW_TIME_UNIT_MILLI:
+ *out = new
PostgresCopyDurationFieldWriter<NANOARROW_TIME_UNIT_MILLI>();
+ break;
+ case NANOARROW_TIME_UNIT_MICRO:
+ *out = new
PostgresCopyDurationFieldWriter<NANOARROW_TIME_UNIT_MICRO>();
+
+ break;
+ case NANOARROW_TIME_UNIT_NANO:
+ *out = new
PostgresCopyDurationFieldWriter<NANOARROW_TIME_UNIT_NANO>();
+ break;
+ }
+ return NANOARROW_OK;
+ }
default:
return EINVAL;
}
diff --git a/c/driver/postgresql/postgres_copy_reader_test.cc
b/c/driver/postgresql/postgres_copy_reader_test.cc
index e4bbb9ea..6404b21e 100644
--- a/c/driver/postgresql/postgres_copy_reader_test.cc
+++ b/c/driver/postgresql/postgres_copy_reader_test.cc
@@ -680,7 +680,6 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadNumeric) {
EXPECT_EQ(std::string(item.data, item.size_bytes), "inf");
}
-
// COPY (SELECT CAST(col AS TIMESTAMP) FROM ( VALUES ('1900-01-01 12:34:56'),
// ('2100-01-01 12:34:56'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT
BINARY);
static uint8_t kTestPgCopyTimestamp[] = {
@@ -800,6 +799,163 @@ INSTANTIATE_TEST_SUITE_P(PostgresCopyWriteTimestamp,
PostgresCopyWriteTimestampTest,
testing::ValuesIn(ts_values));
+// COPY (SELECT CAST(col AS INTERVAL) FROM ( VALUES ('-1 months -2 days -4
seconds'),
+// ('1 months 2 days 4 seconds'), (NULL)) AS drvd("col")) TO STDOUT WITH
(FORMAT BINARY);
+static uint8_t kTestPgCopyInterval[] = {
+ 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x10, 0xff,
+ 0xff, 0xff, 0xff, 0xff, 0xc2, 0xf7, 0x00, 0xff, 0xff, 0xff, 0xfe, 0xff,
0xff, 0xff,
+ 0xff, 0x00, 0x01, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00,
0x3d, 0x09,
+ 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0xff,
0xff, 0xff,
+ 0xff, 0xff, 0xff};
+
+TEST(PostgresCopyUtilsTest, PostgresCopyReadInterval) {
+ ArrowBufferView data;
+ data.data.as_uint8 = kTestPgCopyInterval;
+ data.size_bytes = sizeof(kTestPgCopyInterval);
+
+ auto col_type = PostgresType(PostgresTypeId::kInterval);
+ PostgresType input_type(PostgresTypeId::kRecord);
+ input_type.AppendChild("col", col_type);
+
+ PostgresCopyStreamTester tester;
+ ASSERT_EQ(tester.Init(input_type), NANOARROW_OK);
+ ASSERT_EQ(tester.ReadAll(&data), ENODATA);
+ ASSERT_EQ(data.data.as_uint8 - kTestPgCopyInterval,
sizeof(kTestPgCopyInterval));
+ ASSERT_EQ(data.size_bytes, 0);
+
+ nanoarrow::UniqueArray array;
+ ASSERT_EQ(tester.GetArray(array.get()), NANOARROW_OK);
+ ASSERT_EQ(array->length, 3);
+ ASSERT_EQ(array->n_children, 1);
+
+ nanoarrow::UniqueSchema schema;
+ tester.GetSchema(schema.get());
+
+ nanoarrow::UniqueArrayView array_view;
+ ASSERT_EQ(ArrowArrayViewInitFromSchema(array_view.get(), schema.get(),
nullptr),
+ NANOARROW_OK);
+ ASSERT_EQ(array_view->children[0]->storage_type,
+ NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO);
+ ASSERT_EQ(ArrowArrayViewSetArray(array_view.get(), array.get(), nullptr),
NANOARROW_OK);
+
+ auto validity = array_view->children[0]->buffer_views[0].data.as_uint8;
+ ASSERT_TRUE(ArrowBitGet(validity, 0));
+ ASSERT_TRUE(ArrowBitGet(validity, 1));
+ ASSERT_FALSE(ArrowBitGet(validity, 2));
+
+ struct ArrowInterval interval;
+ ArrowIntervalInit(&interval, NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO);
+ ArrowArrayViewGetIntervalUnsafe(array_view->children[0], 0, &interval);
+ ASSERT_EQ(interval.months, -1);
+ ASSERT_EQ(interval.days, -2);
+ ASSERT_EQ(interval.ns, -4000000000);
+ ArrowArrayViewGetIntervalUnsafe(array_view->children[0], 1, &interval);
+ ASSERT_EQ(interval.months, 1);
+ ASSERT_EQ(interval.days, 2);
+ ASSERT_EQ(interval.ns, 4000000000);
+}
+
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteInterval) {
+ adbc_validation::Handle<struct ArrowSchema> schema;
+ adbc_validation::Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ const enum ArrowType type = NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO;
+ // values are days, months, ns
+ struct ArrowInterval neg_interval;
+ struct ArrowInterval pos_interval;
+
+ ArrowIntervalInit(&neg_interval, type);
+ ArrowIntervalInit(&pos_interval, type);
+
+ neg_interval.months = -1;
+ neg_interval.days = -2;
+ neg_interval.ns = -4000000000;
+
+ pos_interval.months = 1;
+ pos_interval.days = 2;
+ pos_interval.ns = 4000000000;
+
+ const std::vector<std::optional<ArrowInterval*>> values = {
+ &neg_interval, &pos_interval, std::nullopt};
+
+ ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", type}}),
ADBC_STATUS_OK);
+
+ ASSERT_EQ(adbc_validation::MakeBatch<ArrowInterval*>(
+ &schema.value, &array.value, &na_error, values), ADBC_STATUS_OK);
+
+ PostgresCopyStreamWriteTester tester;
+ ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+ ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);
+
+ const struct ArrowBuffer buf = tester.WriteBuffer();
+ // The last 2 bytes of a message can be transmitted via PQputCopyData
+ // so no need to test those bytes from the Writer
+ constexpr size_t buf_size = sizeof(kTestPgCopyInterval) - 2;
+ ASSERT_EQ(buf.size_bytes, buf_size);
+ for (size_t i = 0; i < buf_size; i++) {
+ ASSERT_EQ(buf.data[i], kTestPgCopyInterval[i]);
+ }
+}
+
+// Writing a DURATION from NANOARROW produces INTERVAL in postgres without
day/month
+// COPY (SELECT CAST(col AS INTERVAL) FROM ( VALUES ('-4 seconds'),
+// ('4 seconds'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT BINARY);
+static uint8_t kTestPgCopyDuration[] = {
+ 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x10, 0xff,
+ 0xff, 0xff, 0xff, 0xff, 0xc2, 0xf7, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00,
0x3d, 0x09,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xff,
0xff, 0xff,
+ 0xff, 0xff, 0xff};
+using DurationTestParamType = std::tuple<enum ArrowTimeUnit,
+ std::vector<std::optional<int64_t>>>;
+
+class PostgresCopyWriteDurationTest : public testing::TestWithParam<
+ DurationTestParamType> {};
+
+TEST_P(PostgresCopyWriteDurationTest, WritesProperBufferValues) {
+ adbc_validation::Handle<struct ArrowSchema> schema;
+ adbc_validation::Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ const enum ArrowType type = NANOARROW_TYPE_DURATION;
+
+ DurationTestParamType parameters = GetParam();
+ enum ArrowTimeUnit unit = std::get<0>(parameters);
+ const std::vector<std::optional<int64_t>> values = std::get<1>(parameters);
+
+ ArrowSchemaInit(&schema.value);
+ ArrowSchemaSetTypeStruct(&schema.value, 1);
+ ArrowSchemaSetTypeDateTime(schema->children[0], type, unit, nullptr);
+ ArrowSchemaSetName(schema->children[0], "col");
+ ASSERT_EQ(adbc_validation::MakeBatch<int64_t>(
+ &schema.value, &array.value, &na_error, values), ADBC_STATUS_OK);
+
+ PostgresCopyStreamWriteTester tester;
+ ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+ ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);
+
+ const struct ArrowBuffer buf = tester.WriteBuffer();
+ // The last 2 bytes of a message can be transmitted via PQputCopyData
+ // so no need to test those bytes from the Writer
+ constexpr size_t buf_size = sizeof(kTestPgCopyDuration) - 2;
+ ASSERT_EQ(buf.size_bytes, buf_size);
+ for (size_t i = 0; i < buf_size; i++) {
+ ASSERT_EQ(buf.data[i], kTestPgCopyDuration[i]);
+ }
+}
+
+static const std::vector<DurationTestParamType> duration_params {
+ {NANOARROW_TIME_UNIT_SECOND, {-4, 4, std::nullopt}},
+ {NANOARROW_TIME_UNIT_MILLI, {-4000, 4000, std::nullopt}},
+ {NANOARROW_TIME_UNIT_MICRO, {-4000000, 4000000, std::nullopt}},
+ {NANOARROW_TIME_UNIT_NANO, {-4000000000, 4000000000, std::nullopt}},
+};
+
+INSTANTIATE_TEST_SUITE_P(PostgresCopyWriteDuration,
+ PostgresCopyWriteDurationTest,
+ testing::ValuesIn(duration_params));
+
// COPY (SELECT CAST("col" AS TEXT) AS "col" FROM ( VALUES ('abc'), ('1234'),
// (NULL::text)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
static uint8_t kTestPgCopyText[] = {