This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new b94397f9 feat(c/driver/sqlite): SQLite timestamp write support (#897)
b94397f9 is described below
commit b94397f994e00c84cc531555aad89e5415b489df
Author: William Ayd <[email protected]>
AuthorDate: Wed Jul 12 06:21:01 2023 -0700
feat(c/driver/sqlite): SQLite timestamp write support (#897)
Currently writes as a string. Here's what it looks like in the db at the
end of the temporal test
```bash
sqlite> select * FROM bulk_ingest;
1970-01-01T00:00:00.000000000
1970-01-01T00:00:00.000000042
```
---
c/driver/sqlite/CMakeLists.txt | 4 ++
c/driver/sqlite/sqlite_test.cc | 23 ++++++-
c/driver/sqlite/statement_reader.c | 121 +++++++++++++++++++++++++++++++++++++
c/validation/adbc_validation.cc | 25 ++++----
4 files changed, 157 insertions(+), 16 deletions(-)
diff --git a/c/driver/sqlite/CMakeLists.txt b/c/driver/sqlite/CMakeLists.txt
index eb5a8453..3914ee8e 100644
--- a/c/driver/sqlite/CMakeLists.txt
+++ b/c/driver/sqlite/CMakeLists.txt
@@ -56,6 +56,10 @@ foreach(LIB_TARGET ${ADBC_LIBRARIES})
target_compile_definitions(${LIB_TARGET} PRIVATE ADBC_EXPORTING)
endforeach()
+include(CheckTypeSize)
+check_type_size("time_t" SIZEOF_TIME_T)
+add_definitions(-DSIZEOF_TIME_T=${SIZEOF_TIME_T})
+
if(ADBC_TEST_LINKAGE STREQUAL "shared")
set(TEST_LINK_LIBS adbc_driver_sqlite_shared)
else()
diff --git a/c/driver/sqlite/sqlite_test.cc b/c/driver/sqlite/sqlite_test.cc
index 47df44e4..9ffd18df 100644
--- a/c/driver/sqlite/sqlite_test.cc
+++ b/c/driver/sqlite/sqlite_test.cc
@@ -43,6 +43,24 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
database, "uri", "file:Sqlite_Transactions?mode=memory&cache=shared",
error);
}
+ AdbcStatusCode DropTable(struct AdbcConnection* connection, const
std::string& name,
+ struct AdbcError* error) const override {
+ struct AdbcStatement statement;
+ std::memset(&statement, 0, sizeof(statement));
+ AdbcStatusCode status = AdbcStatementNew(connection, &statement, error);
+ if (status != ADBC_STATUS_OK) return status;
+
+ std::string query = "DROP TABLE IF EXISTS " + name;
+ status = AdbcStatementSetSqlQuery(&statement, query.c_str(), error);
+ if (status != ADBC_STATUS_OK) {
+ std::ignore = AdbcStatementRelease(&statement, error);
+ return status;
+ }
+ status = AdbcStatementExecuteQuery(&statement, nullptr, nullptr, error);
+ std::ignore = AdbcStatementRelease(&statement, error);
+ return status;
+ }
+
std::string BindParameter(int index) const override { return "?"; }
ArrowType IngestSelectRoundTripType(ArrowType ingest_type) const override {
@@ -59,6 +77,8 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
case NANOARROW_TYPE_FLOAT:
case NANOARROW_TYPE_DOUBLE:
return NANOARROW_TYPE_DOUBLE;
+ case NANOARROW_TYPE_TIMESTAMP:
+ return NANOARROW_TYPE_STRING;
default:
return ingest_type;
}
@@ -171,9 +191,6 @@ class SqliteStatementTest : public ::testing::Test,
void TestSqlIngestUInt64() { GTEST_SKIP() << "Cannot ingest UINT64 (out of
range)"; }
void TestSqlIngestBinary() { GTEST_SKIP() << "Cannot ingest BINARY (not
implemented)"; }
- void TestSqlIngestTimestamp() {
- GTEST_SKIP() << "Cannot ingest TIMESTAMP (not implemented)";
- }
void TestSqlIngestTimestampTz() {
GTEST_SKIP() << "Cannot ingest TIMESTAMP WITH TIMEZONE (not implemented)";
}
diff --git a/c/driver/sqlite/statement_reader.c
b/c/driver/sqlite/statement_reader.c
index 504a4d80..10cb2f1c 100644
--- a/c/driver/sqlite/statement_reader.c
+++ b/c/driver/sqlite/statement_reader.c
@@ -17,9 +17,12 @@
#include "statement_reader.h"
+#include <assert.h>
#include <inttypes.h>
#include <math.h>
+#include <stdint.h>
#include <stdio.h>
+#include <time.h>
#include <adbc.h>
#include <nanoarrow/nanoarrow.h>
@@ -89,6 +92,107 @@ AdbcStatusCode AdbcSqliteBinderSetArrayStream(struct
AdbcSqliteBinder* binder,
memset(values, 0, sizeof(*values));
return AdbcSqliteBinderSet(binder, error);
}
+
+/*
+ Allocates to buf on success. Caller is responsible for freeing.
+ On failure sets error and contents of buf are undefined.
+*/
+static AdbcStatusCode ArrowTimestampToIsoString(int64_t value, enum
ArrowTimeUnit unit,
+ char** buf, struct AdbcError*
error) {
+ int scale = 1;
+ int strlen = 20;
+ int rem = 0;
+
+ switch (unit) {
+ case NANOARROW_TIME_UNIT_SECOND:
+ break;
+ case NANOARROW_TIME_UNIT_MILLI:
+ scale = 1000;
+ strlen = 24;
+ break;
+ case NANOARROW_TIME_UNIT_MICRO:
+ scale = 1000000;
+ strlen = 27;
+ break;
+ case NANOARROW_TIME_UNIT_NANO:
+ scale = 1000000000;
+ strlen = 30;
+ break;
+ }
+
+ const int64_t seconds = value / scale;
+
+#if SIZEOF_TIME_T < 8
+ if ((seconds > INT32_MAX) || (seconds < INT32_MIN)) {
+ SetError(error, "Timestamp %" PRId64 " with unit %d exceeds platform
time_t bounds",
+ value, unit);
+
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+ const time_t time = (time_t)seconds;
+#else
+ const time_t time = seconds;
+#endif
+
+ rem = value % scale;
+ if (rem < 0) {
+ rem = scale + rem;
+ }
+
+ struct tm broken_down_time;
+
+#if defined(_WIN32)
+ if (gmtime_s(&broken_down_time, &time) != 0) {
+ SetError(error,
+ "Could not convert timestamp %" PRId64 " with unit %d to broken
down time",
+ value, unit);
+
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+#else
+ if (gmtime_r(&time, &broken_down_time) != &broken_down_time) {
+ SetError(error,
+ "Could not convert timestamp %" PRId64 " with unit %d to broken
down time",
+ value, unit);
+
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+#endif
+
+ char* tsstr = malloc(strlen + 1);
+ if (tsstr == NULL) {
+ return ADBC_STATUS_IO;
+ }
+
+ if (strftime(tsstr, strlen, "%Y-%m-%dT%H:%M:%S", &broken_down_time) == 0) {
+ SetError(error, "Call to strftime for timestamp %" PRId64 " with unit %d
failed",
+ value, unit);
+ free(tsstr);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ assert(rem >= 0);
+ switch (unit) {
+ case NANOARROW_TIME_UNIT_SECOND:
+ break;
+ case NANOARROW_TIME_UNIT_MILLI:
+ tsstr[19] = '.';
+ snprintf(tsstr + 20, strlen - 20, "%03d", rem % 1000u);
+ break;
+ case NANOARROW_TIME_UNIT_MICRO:
+ tsstr[19] = '.';
+ snprintf(tsstr + 20, strlen - 20, "%06d", rem % 1000000u);
+ break;
+ case NANOARROW_TIME_UNIT_NANO:
+ tsstr[19] = '.';
+ snprintf(tsstr + 20, strlen - 20, "%09d", rem % 1000000000u);
+ break;
+ }
+
+ *buf = tsstr;
+ return ADBC_STATUS_OK;
+}
+
AdbcStatusCode AdbcSqliteBinderBindNext(struct AdbcSqliteBinder* binder,
sqlite3* conn,
sqlite3_stmt* stmt, char* finished,
struct AdbcError* error) {
@@ -195,6 +299,23 @@ AdbcStatusCode AdbcSqliteBinderBindNext(struct
AdbcSqliteBinder* binder, sqlite3
SQLITE_STATIC);
break;
}
+ case NANOARROW_TYPE_TIMESTAMP: {
+ struct ArrowSchemaView bind_schema_view;
+ RAISE_ADBC(ArrowSchemaViewInit(&bind_schema_view,
binder->schema.children[col],
+ &arrow_error));
+ enum ArrowTimeUnit unit = bind_schema_view.time_unit;
+ int64_t value =
+ ArrowArrayViewGetIntUnsafe(binder->batch.children[col],
binder->next_row);
+
+ char* tsstr;
+ RAISE_ADBC(ArrowTimestampToIsoString(value, unit, &tsstr, error));
+
+ // SQLITE_TRANSIENT ensures the value is copied during bind
+ status =
+ sqlite3_bind_text(stmt, col + 1, tsstr, strlen(tsstr),
SQLITE_TRANSIENT);
+ free((char*)tsstr);
+ break;
+ }
default:
SetError(error, "Column %d has unsupported type %s", col,
ArrowTypeString(binder->types[col]));
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index 803f14c0..393d904e 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -1107,13 +1107,14 @@ void StatementTest::TestSqlIngestTemporalType(const
char* timezone) {
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
- const std::vector<std::optional<int64_t>> values = {std::nullopt, 0, 42};
+ const std::vector<std::optional<int64_t>> values = {std::nullopt, -42, 0,
42};
+ const ArrowType type = NANOARROW_TYPE_TIMESTAMP;
// much of this code is shared with TestSqlIngestType with minor
// changes to allow for various time units to be tested
ArrowSchemaInit(&schema.value);
ArrowSchemaSetTypeStruct(&schema.value, 1);
- ArrowSchemaSetTypeDateTime(schema->children[0], NANOARROW_TYPE_TIMESTAMP,
TU, timezone);
+ ArrowSchemaSetTypeDateTime(schema->children[0], type, TU, timezone);
ArrowSchemaSetName(schema->children[0], "col");
ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
values),
IsOkErrno());
@@ -1145,24 +1146,22 @@ void StatementTest::TestSqlIngestTemporalType(const
char* timezone) {
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
- // postgres does not receive/store/send the timezone, just the UTC integer
- // value; we may still want to update CompareSchema to explicitly check
for UTC
- // with TIMESTAMP WITH TIMEZONE and naive for TIMESTAMP
- ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
- {{"col", NANOARROW_TYPE_TIMESTAMP,
NULLABLE}}));
+ ArrowType round_trip_type = quirks()->IngestSelectRoundTripType(type);
+ ASSERT_NO_FATAL_FAILURE(
+ CompareSchema(&reader.schema.value, {{"col", round_trip_type,
NULLABLE}}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(values.size(), reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
- if (TU == NANOARROW_TIME_UNIT_MICRO) {
- // Similar to the TestSqlIngestType implementation we are only now
- // testing values if the unit round trips
- ASSERT_NO_FATAL_FAILURE(
- CompareArray<int64_t>(reader.array_view->children[0], values));
+ if (round_trip_type == type) {
+ // XXX: for now we can't compare values; we would need casting
+ if (TU == NANOARROW_TIME_UNIT_MICRO) {
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<int64_t>(reader.array_view->children[0], values));
+ }
}
-
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}