This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 963b57bb feat(c/driver/postgresql): TIMESTAMP COPY Writer (#1185)
963b57bb is described below
commit 963b57bbf254f30aec090104d06e99defb60240d
Author: William Ayd <[email protected]>
AuthorDate: Wed Oct 11 14:16:59 2023 -0400
feat(c/driver/postgresql): TIMESTAMP COPY Writer (#1185)
---
c/driver/postgresql/postgres_copy_reader.h | 80 +++++++++++++--
c/driver/postgresql/postgres_copy_reader_test.cc | 122 +++++++++++++++++++++++
2 files changed, 196 insertions(+), 6 deletions(-)
diff --git a/c/driver/postgresql/postgres_copy_reader.h
b/c/driver/postgresql/postgres_copy_reader.h
index 21b08d31..3a26442c 100644
--- a/c/driver/postgresql/postgres_copy_reader.h
+++ b/c/driver/postgresql/postgres_copy_reader.h
@@ -28,6 +28,7 @@
#include <nanoarrow/nanoarrow.hpp>
+#include "common/utils.h"
#include "postgres_type.h"
#include "postgres_util.h"
@@ -1211,10 +1212,61 @@ class PostgresCopyBinaryFieldWriter : public
PostgresCopyFieldWriter {
}
};
-static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType
arrow_type,
- PostgresCopyFieldWriter** out,
- ArrowError* error) {
- switch (arrow_type) {
+template <enum ArrowTimeUnit TU>
+class PostgresCopyTimestampFieldWriter : public PostgresCopyFieldWriter {
+ public:
+ ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error)
override {
+ constexpr int32_t field_size_bytes = sizeof(int64_t);
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes,
error));
+
+ int64_t raw_value = ArrowArrayViewGetIntUnsafe(array_view_, index);
+ int64_t value;
+
+ bool overflow_safe = true;
+ switch (TU) {
+ case NANOARROW_TIME_UNIT_SECOND:
+ if ((overflow_safe = raw_value <= kMaxSafeSecondsToMicros &&
+ raw_value >= kMinSafeSecondsToMicros)) {
+ value = raw_value * 1000000;
+ }
+ break;
+ case NANOARROW_TIME_UNIT_MILLI:
+ if ((overflow_safe = raw_value <= kMaxSafeMillisToMicros &&
+ raw_value >= kMinSafeMillisToMicros)) {
+ value = raw_value * 1000;
+ }
+ break;
+ case NANOARROW_TIME_UNIT_MICRO:
+ value = raw_value;
+ break;
+ case NANOARROW_TIME_UNIT_NANO:
+ value = raw_value / 1000;
+ break;
+ }
+
+ if (!overflow_safe) {
+ ArrowErrorSet(
+ error,
+ "Row %" PRId64 " timestamp value %" PRId64 " with unit %d would
overflow",
+ index,
+ raw_value,
+ TU);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ // 2000-01-01 00:00:00.000000 in microseconds
+ constexpr int64_t kPostgresTimestampEpoch = 946684800000000;
+ const int64_t scaled = value - kPostgresTimestampEpoch;
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int64_t>(buffer, scaled, error));
+
+ return ADBC_STATUS_OK;
+ }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(
+ const struct ArrowSchemaView& schema_view, PostgresCopyFieldWriter** out,
+ ArrowError* error) {
+ switch (schema_view.type) {
case NANOARROW_TYPE_BOOL:
*out = new PostgresCopyBooleanFieldWriter();
return NANOARROW_OK;
@@ -1243,6 +1295,23 @@ static inline ArrowErrorCode MakeCopyFieldWriter(const
enum ArrowType arrow_type
case NANOARROW_TYPE_LARGE_STRING:
*out = new PostgresCopyBinaryFieldWriter();
return NANOARROW_OK;
+ case NANOARROW_TYPE_TIMESTAMP: {
+ switch (schema_view.time_unit) {
+ case NANOARROW_TIME_UNIT_NANO:
+ *out = new
PostgresCopyTimestampFieldWriter<NANOARROW_TIME_UNIT_NANO>();
+ break;
+ case NANOARROW_TIME_UNIT_MILLI:
+ *out = new
PostgresCopyTimestampFieldWriter<NANOARROW_TIME_UNIT_MILLI>();
+ break;
+ case NANOARROW_TIME_UNIT_MICRO:
+ *out = new
PostgresCopyTimestampFieldWriter<NANOARROW_TIME_UNIT_MICRO>();
+ break;
+ case NANOARROW_TIME_UNIT_SECOND:
+ *out = new
PostgresCopyTimestampFieldWriter<NANOARROW_TIME_UNIT_SECOND>();
+ break;
+ }
+ return NANOARROW_OK;
+ }
default:
return EINVAL;
}
@@ -1293,9 +1362,8 @@ class PostgresCopyStreamWriter {
NANOARROW_OK) {
return ADBC_STATUS_INTERNAL;
}
- const ArrowType arrow_type = schema_view.type;
PostgresCopyFieldWriter* child_writer;
- NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(arrow_type, &child_writer,
error));
+ NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(schema_view, &child_writer,
error));
root_writer_.AppendChild(std::unique_ptr<PostgresCopyFieldWriter>(child_writer));
}
diff --git a/c/driver/postgresql/postgres_copy_reader_test.cc
b/c/driver/postgresql/postgres_copy_reader_test.cc
index 1fd80124..e4bbb9ea 100644
--- a/c/driver/postgresql/postgres_copy_reader_test.cc
+++ b/c/driver/postgresql/postgres_copy_reader_test.cc
@@ -16,7 +16,9 @@
// under the License.
#include <optional>
+#include <tuple>
+#include <gtest/gtest-param-test.h>
#include <gtest/gtest.h>
#include <nanoarrow/nanoarrow.hpp>
@@ -678,6 +680,126 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadNumeric) {
EXPECT_EQ(std::string(item.data, item.size_bytes), "inf");
}
+
+// COPY (SELECT CAST(col AS TIMESTAMP) FROM ( VALUES ('1900-01-01 12:34:56'),
+// ('2100-01-01 12:34:56'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT
BINARY);
+static uint8_t kTestPgCopyTimestamp[] = {
+ 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
+ 0x00, 0x08, 0xff, 0xf4, 0xc9, 0xf9, 0x07, 0xe5, 0x9c, 0x00, 0x00,
+ 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0b, 0x36, 0x30, 0x2d, 0xa5,
+ 0xfc, 0x00, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
+
+TEST(PostgresCopyUtilsTest, PostgresCopyReadTimestamp) {
+ ArrowBufferView data;
+ data.data.as_uint8 = kTestPgCopyTimestamp;
+ data.size_bytes = sizeof(kTestPgCopyTimestamp);
+
+ auto col_type = PostgresType(PostgresTypeId::kTimestamp);
+ PostgresType input_type(PostgresTypeId::kRecord);
+ input_type.AppendChild("col", col_type);
+
+ PostgresCopyStreamTester tester;
+ ASSERT_EQ(tester.Init(input_type), NANOARROW_OK);
+ ASSERT_EQ(tester.ReadAll(&data), ENODATA);
+ ASSERT_EQ(data.data.as_uint8 - kTestPgCopyTimestamp,
sizeof(kTestPgCopyTimestamp));
+ ASSERT_EQ(data.size_bytes, 0);
+
+ nanoarrow::UniqueArray array;
+ ASSERT_EQ(tester.GetArray(array.get()), NANOARROW_OK);
+ ASSERT_EQ(array->length, 3);
+ ASSERT_EQ(array->n_children, 1);
+
+ auto validity = reinterpret_cast<const
uint8_t*>(array->children[0]->buffers[0]);
+ auto data_buffer = reinterpret_cast<const
int64_t*>(array->children[0]->buffers[1]);
+ ASSERT_NE(validity, nullptr);
+ ASSERT_NE(data_buffer, nullptr);
+
+ ASSERT_TRUE(ArrowBitGet(validity, 0));
+ ASSERT_TRUE(ArrowBitGet(validity, 1));
+ ASSERT_FALSE(ArrowBitGet(validity, 3));
+
+ ASSERT_EQ(data_buffer[0], -2208943504000000);
+ ASSERT_EQ(data_buffer[1], 4102490096000000);
+}
+
+using TimestampTestParamType = std::tuple<enum ArrowTimeUnit,
+ const char *,
+ std::vector<std::optional<int64_t>>>;
+
+class PostgresCopyWriteTimestampTest : public testing::TestWithParam<
+ TimestampTestParamType> {
+};
+
+TEST_P(PostgresCopyWriteTimestampTest, WritesProperBufferValues) {
+ adbc_validation::Handle<struct ArrowSchema> schema;
+ adbc_validation::Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+
+ TimestampTestParamType parameters = GetParam();
+ enum ArrowTimeUnit unit = std::get<0>(parameters);
+ const char* timezone = std::get<1>(parameters);
+
+ const std::vector<std::optional<int64_t>> values = std::get<2>(parameters);
+
+ ArrowSchemaInit(&schema.value);
+ ArrowSchemaSetTypeStruct(&schema.value, 1);
+ ArrowSchemaSetTypeDateTime(schema->children[0],
+ NANOARROW_TYPE_TIMESTAMP,
+ unit,
+ timezone);
+ ArrowSchemaSetName(schema->children[0], "col");
+ ASSERT_EQ(adbc_validation::MakeBatch<int64_t>(&schema.value,
+ &array.value,
+ &na_error,
+ values),
+ ADBC_STATUS_OK);
+
+ PostgresCopyStreamWriteTester tester;
+ ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+ ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);
+
+ const struct ArrowBuffer buf = tester.WriteBuffer();
+ // The last 2 bytes of a message can be transmitted via PQputCopyData
+ // so no need to test those bytes from the Writer
+ constexpr size_t buf_size = sizeof(kTestPgCopyTimestamp) - 2;
+ ASSERT_EQ(buf.size_bytes, buf_size);
+ for (size_t i = 0; i < buf_size; i++) {
+ ASSERT_EQ(buf.data[i], kTestPgCopyTimestamp[i]);
+ }
+}
+
+static const std::vector<TimestampTestParamType> ts_values {
+ {NANOARROW_TIME_UNIT_SECOND, nullptr,
+ {-2208943504, 4102490096, std::nullopt}},
+ {NANOARROW_TIME_UNIT_MILLI, nullptr,
+ {-2208943504000, 4102490096000, std::nullopt}},
+ {NANOARROW_TIME_UNIT_MICRO, nullptr,
+ {-2208943504000000, 4102490096000000, std::nullopt}},
+ {NANOARROW_TIME_UNIT_NANO, nullptr,
+ {-2208943504000000000, 4102490096000000000, std::nullopt}},
+ {NANOARROW_TIME_UNIT_SECOND, "UTC",
+ {-2208943504, 4102490096, std::nullopt}},
+ {NANOARROW_TIME_UNIT_MILLI, "UTC",
+ {-2208943504000, 4102490096000, std::nullopt}},
+ {NANOARROW_TIME_UNIT_MICRO, "UTC",
+ {-2208943504000000, 4102490096000000, std::nullopt}},
+ {NANOARROW_TIME_UNIT_NANO, "UTC",
+ {-2208943504000000000, 4102490096000000000, std::nullopt}},
+ {NANOARROW_TIME_UNIT_SECOND, "America/New_York",
+ {-2208943504, 4102490096, std::nullopt}},
+ {NANOARROW_TIME_UNIT_MILLI, "America/New_York",
+ {-2208943504000, 4102490096000, std::nullopt}},
+ {NANOARROW_TIME_UNIT_MICRO, "America/New_York",
+ {-2208943504000000, 4102490096000000, std::nullopt}},
+ {NANOARROW_TIME_UNIT_NANO, "America/New_York",
+ {-2208943504000000000, 4102490096000000000, std::nullopt}},
+};
+
+INSTANTIATE_TEST_SUITE_P(PostgresCopyWriteTimestamp,
+ PostgresCopyWriteTimestampTest,
+ testing::ValuesIn(ts_values));
+
// COPY (SELECT CAST("col" AS TEXT) AS "col" FROM ( VALUES ('abc'), ('1234'),
// (NULL::text)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
static uint8_t kTestPgCopyText[] = {