This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 0624550c fix(go/adbc/driver/snowflake): Fix integration tests by
fixing timestamp handling (#889)
0624550c is described below
commit 0624550c142033c1ed725a6918b741611922240c
Author: Matt Topol <[email protected]>
AuthorDate: Mon Jul 10 20:35:51 2023 -0400
fix(go/adbc/driver/snowflake): Fix integration tests by fixing timestamp
handling (#889)
Currently relies on https://github.com/apache/arrow/pull/36569 being
merged first.
---
go/adbc/driver/snowflake/driver_test.go | 36 +++++++++++++++++
go/adbc/driver/snowflake/record_reader.go | 67 +++++++++++++++++++------------
go/adbc/driver/snowflake/statement.go | 4 +-
go/adbc/go.mod | 2 +-
go/adbc/go.sum | 2 +
5 files changed, 82 insertions(+), 29 deletions(-)
diff --git a/go/adbc/driver/snowflake/driver_test.go
b/go/adbc/driver/snowflake/driver_test.go
index bdbf3791..42ee0971 100644
--- a/go/adbc/driver/snowflake/driver_test.go
+++ b/go/adbc/driver/snowflake/driver_test.go
@@ -316,6 +316,42 @@ func (suite *SnowflakeTests) TearDownSuite() {
suite.db = nil
}
+func (suite *SnowflakeTests) TestSqlIngestTimestamp() {
+ suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn,
"bulk_ingest"))
+
+ sc := arrow.NewSchema([]arrow.Field{{
+ Name: "col", Type: arrow.FixedWidthTypes.Timestamp_us,
+ Nullable: true,
+ }}, nil)
+
+ bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
+ defer bldr.Release()
+
+ tbldr := bldr.Field(0).(*array.TimestampBuilder)
+ tbldr.AppendValues([]arrow.Timestamp{0, 0, 42}, []bool{false, true,
true})
+ rec := bldr.NewRecord()
+ defer rec.Release()
+
+ suite.Require().NoError(suite.stmt.Bind(suite.ctx, rec))
+
suite.Require().NoError(suite.stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"bulk_ingest"))
+ n, err := suite.stmt.ExecuteUpdate(suite.ctx)
+ suite.Require().NoError(err)
+ suite.EqualValues(3, n)
+
+ suite.Require().NoError(suite.stmt.SetSqlQuery("SELECT * FROM
bulk_ingest ORDER BY \"col\" ASC NULLS FIRST"))
+ rdr, n, err := suite.stmt.ExecuteQuery(suite.ctx)
+ suite.Require().NoError(err)
+ defer rdr.Release()
+
+ suite.EqualValues(3, n)
+ suite.True(rdr.Next())
+ result := rdr.Record()
+ suite.Truef(array.RecordEqual(rec, result), "expected: %s\ngot: %s",
rec, result)
+ suite.False(rdr.Next())
+
+ suite.Require().NoError(rdr.Err())
+}
+
func (suite *SnowflakeTests) TestStatementEmptyResultSet() {
// Regression test for https://github.com/apache/arrow-adbc/issues/863
suite.NoError(suite.stmt.SetSqlQuery("SHOW WAREHOUSES"))
diff --git a/go/adbc/driver/snowflake/record_reader.go
b/go/adbc/driver/snowflake/record_reader.go
index 0434674b..63cfb0e7 100644
--- a/go/adbc/driver/snowflake/record_reader.go
+++ b/go/adbc/driver/snowflake/record_reader.go
@@ -113,40 +113,37 @@ func getTransformer(sc *arrow.Schema, ld
gosnowflake.ArrowStreamLoader) (*arrow.
return compute.CastArray(ctx, a,
compute.SafeCastOptions(f.Type))
}
case "TIMESTAMP_NTZ":
- dt := &arrow.TimestampType{Unit: arrow.Nanosecond}
+ dt := &arrow.TimestampType{Unit:
arrow.TimeUnit(srcMeta.Scale / 3)}
f.Type = dt
transformers[i] = func(ctx context.Context, a
arrow.Array) (arrow.Array, error) {
+
+ if a.DataType().ID() != arrow.STRUCT {
+ return compute.CastArray(ctx, a,
compute.SafeCastOptions(dt))
+ }
+
pool := compute.GetAllocator(ctx)
tb := array.NewTimestampBuilder(pool, dt)
defer tb.Release()
- if a.DataType().ID() == arrow.STRUCT {
- structData := a.(*array.Struct)
- epoch :=
structData.Field(0).(*array.Int64).Int64Values()
- fraction :=
structData.Field(1).(*array.Int32).Int32Values()
- for i := 0; i < a.Len(); i++ {
- if a.IsNull(i) {
- tb.AppendNull()
- continue
- }
-
-
tb.Append(arrow.Timestamp(time.Unix(epoch[i], int64(fraction[i])).UnixNano()))
+ structData := a.(*array.Struct)
+ epoch :=
structData.Field(0).(*array.Int64).Int64Values()
+ fraction :=
structData.Field(1).(*array.Int32).Int32Values()
+ for i := 0; i < a.Len(); i++ {
+ if a.IsNull(i) {
+ tb.AppendNull()
+ continue
}
- } else {
- for i, t := range
a.(*array.Int64).Int64Values() {
- if a.IsNull(i) {
- tb.AppendNull()
- continue
- }
- val := time.Unix(0,
int64(t)*int64(math.Pow10(9-int(srcMeta.Scale)))).UTC()
-
tb.Append(arrow.Timestamp(val.UnixNano()))
+ v, err :=
arrow.TimestampFromTime(time.Unix(epoch[i], int64(fraction[i])), dt.TimeUnit())
+ if err != nil {
+ return nil, err
}
+ tb.Append(v)
}
return tb.NewArray(), nil
}
case "TIMESTAMP_LTZ":
- dt := &arrow.TimestampType{Unit: arrow.Nanosecond,
TimeZone: loc.String()}
+ dt := &arrow.TimestampType{Unit:
arrow.TimeUnit(srcMeta.Scale) / 3, TimeZone: loc.String()}
f.Type = dt
transformers[i] = func(ctx context.Context, a
arrow.Array) (arrow.Array, error) {
pool := compute.GetAllocator(ctx)
@@ -163,7 +160,11 @@ func getTransformer(sc *arrow.Schema, ld
gosnowflake.ArrowStreamLoader) (*arrow.
continue
}
-
tb.Append(arrow.Timestamp(time.Unix(epoch[i], int64(fraction[i])).UnixNano()))
+ v, err :=
arrow.TimestampFromTime(time.Unix(epoch[i], int64(fraction[i])), dt.TimeUnit())
+ if err != nil {
+ return nil, err
+ }
+ tb.Append(v)
}
} else {
for i, t := range
a.(*array.Int64).Int64Values() {
@@ -174,13 +175,19 @@ func getTransformer(sc *arrow.Schema, ld
gosnowflake.ArrowStreamLoader) (*arrow.
q := int64(t) /
int64(math.Pow10(int(srcMeta.Scale)))
r := int64(t) %
int64(math.Pow10(int(srcMeta.Scale)))
-
tb.Append(arrow.Timestamp(time.Unix(q, r).UnixNano()))
+ v, err :=
arrow.TimestampFromTime(time.Unix(q, r), dt.Unit)
+ if err != nil {
+ return nil, err
+ }
+ tb.Append(v)
}
}
return tb.NewArray(), nil
}
case "TIMESTAMP_TZ":
- dt := &arrow.TimestampType{Unit: arrow.Nanosecond}
+ // we convert each value to UTC since we have timezone
information
+ // with the data that lets us do so.
+ dt := &arrow.TimestampType{TimeZone: "UTC", Unit:
arrow.TimeUnit(srcMeta.Scale / 3)}
f.Type = dt
transformers[i] = func(ctx context.Context, a
arrow.Array) (arrow.Array, error) {
pool := compute.GetAllocator(ctx)
@@ -198,7 +205,11 @@ func getTransformer(sc *arrow.Schema, ld
gosnowflake.ArrowStreamLoader) (*arrow.
}
loc :=
gosnowflake.Location(int(tzoffset[i]) - 1440)
-
tb.Append(arrow.Timestamp(time.Unix(epoch[i], 0).In(loc).UnixNano()))
+ v, err :=
arrow.TimestampFromTime(time.Unix(epoch[i], 0).In(loc), dt.Unit)
+ if err != nil {
+ return nil, err
+ }
+ tb.Append(v)
}
} else {
epoch :=
structData.Field(0).(*array.Int64).Int64Values()
@@ -211,7 +222,11 @@ func getTransformer(sc *arrow.Schema, ld
gosnowflake.ArrowStreamLoader) (*arrow.
}
loc :=
gosnowflake.Location(int(tzoffset[i]) - 1440)
-
tb.Append(arrow.Timestamp(time.Unix(epoch[i],
int64(fraction[i])).In(loc).UnixNano()))
+ v, err :=
arrow.TimestampFromTime(time.Unix(epoch[i], int64(fraction[i])).In(loc),
dt.Unit)
+ if err != nil {
+ return nil, err
+ }
+ tb.Append(v)
}
}
return tb.NewArray(), nil
diff --git a/go/adbc/driver/snowflake/statement.go
b/go/adbc/driver/snowflake/statement.go
index 481e7f7c..be554780 100644
--- a/go/adbc/driver/snowflake/statement.go
+++ b/go/adbc/driver/snowflake/statement.go
@@ -153,9 +153,9 @@ func toSnowflakeType(dt arrow.DataType) string {
ts := dt.(*arrow.TimestampType)
prec := int(ts.Unit) * 3
if ts.TimeZone == "" {
- return fmt.Sprintf("timestamp_tz(%d)", prec)
+ return fmt.Sprintf("timestamp_ntz(%d)", prec)
}
- return fmt.Sprintf("timestamp_ltz(%d)", prec)
+ return fmt.Sprintf("timestamp_tz(%d)", prec)
case arrow.DENSE_UNION, arrow.SPARSE_UNION:
return "variant"
case arrow.LIST, arrow.LARGE_LIST, arrow.FIXED_SIZE_LIST:
diff --git a/go/adbc/go.mod b/go/adbc/go.mod
index 21bfad3e..e4496c24 100644
--- a/go/adbc/go.mod
+++ b/go/adbc/go.mod
@@ -20,7 +20,7 @@ module github.com/apache/arrow-adbc/go/adbc
go 1.18
require (
- github.com/apache/arrow/go/v13 v13.0.0-20230620164925-94af6c3c9646
+ github.com/apache/arrow/go/v13 v13.0.0-20230710202504-70f447636553
github.com/bluele/gcache v0.0.2
github.com/google/uuid v1.3.0
github.com/snowflakedb/gosnowflake v1.6.21
diff --git a/go/adbc/go.sum b/go/adbc/go.sum
index 97634848..c8b128ec 100644
--- a/go/adbc/go.sum
+++ b/go/adbc/go.sum
@@ -18,6 +18,8 @@ github.com/apache/arrow/go/v12 v12.0.0
h1:xtZE63VWl7qLdB0JObIXvvhGjoVNrQ9ciIHG2O
github.com/apache/arrow/go/v12 v12.0.0/go.mod
h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg=
github.com/apache/arrow/go/v13 v13.0.0-20230620164925-94af6c3c9646
h1:hLcsUn9hiiD7jDfJDKOe1tBfOL5v0wgrya5S8XXqzLw=
github.com/apache/arrow/go/v13 v13.0.0-20230620164925-94af6c3c9646/go.mod
h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
+github.com/apache/arrow/go/v13 v13.0.0-20230710202504-70f447636553
h1:LV3nIWJ2254APRpYAcMxWbxoQwt66gnrkZ5NaDs1IPI=
+github.com/apache/arrow/go/v13 v13.0.0-20230710202504-70f447636553/go.mod
h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
github.com/apache/thrift v0.16.0
h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
github.com/apache/thrift v0.16.0/go.mod
h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/aws/aws-sdk-go-v2 v1.18.0
h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY=