This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 0624550c fix(go/adbc/driver/snowflake): Fix integration tests by 
fixing timestamp handling (#889)
0624550c is described below

commit 0624550c142033c1ed725a6918b741611922240c
Author: Matt Topol <[email protected]>
AuthorDate: Mon Jul 10 20:35:51 2023 -0400

    fix(go/adbc/driver/snowflake): Fix integration tests by fixing timestamp 
handling (#889)
    
    Currently relies on https://github.com/apache/arrow/pull/36569 being
    merged first.
---
 go/adbc/driver/snowflake/driver_test.go   | 36 +++++++++++++++++
 go/adbc/driver/snowflake/record_reader.go | 67 +++++++++++++++++++------------
 go/adbc/driver/snowflake/statement.go     |  4 +-
 go/adbc/go.mod                            |  2 +-
 go/adbc/go.sum                            |  2 +
 5 files changed, 82 insertions(+), 29 deletions(-)

diff --git a/go/adbc/driver/snowflake/driver_test.go 
b/go/adbc/driver/snowflake/driver_test.go
index bdbf3791..42ee0971 100644
--- a/go/adbc/driver/snowflake/driver_test.go
+++ b/go/adbc/driver/snowflake/driver_test.go
@@ -316,6 +316,42 @@ func (suite *SnowflakeTests) TearDownSuite() {
        suite.db = nil
 }
 
+func (suite *SnowflakeTests) TestSqlIngestTimestamp() {
+       suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn, 
"bulk_ingest"))
+
+       sc := arrow.NewSchema([]arrow.Field{{
+               Name: "col", Type: arrow.FixedWidthTypes.Timestamp_us,
+               Nullable: true,
+       }}, nil)
+
+       bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
+       defer bldr.Release()
+
+       tbldr := bldr.Field(0).(*array.TimestampBuilder)
+       tbldr.AppendValues([]arrow.Timestamp{0, 0, 42}, []bool{false, true, 
true})
+       rec := bldr.NewRecord()
+       defer rec.Release()
+
+       suite.Require().NoError(suite.stmt.Bind(suite.ctx, rec))
+       
suite.Require().NoError(suite.stmt.SetOption(adbc.OptionKeyIngestTargetTable, 
"bulk_ingest"))
+       n, err := suite.stmt.ExecuteUpdate(suite.ctx)
+       suite.Require().NoError(err)
+       suite.EqualValues(3, n)
+
+       suite.Require().NoError(suite.stmt.SetSqlQuery("SELECT * FROM 
bulk_ingest ORDER BY \"col\" ASC NULLS FIRST"))
+       rdr, n, err := suite.stmt.ExecuteQuery(suite.ctx)
+       suite.Require().NoError(err)
+       defer rdr.Release()
+
+       suite.EqualValues(3, n)
+       suite.True(rdr.Next())
+       result := rdr.Record()
+       suite.Truef(array.RecordEqual(rec, result), "expected: %s\ngot: %s", 
rec, result)
+       suite.False(rdr.Next())
+
+       suite.Require().NoError(rdr.Err())
+}
+
 func (suite *SnowflakeTests) TestStatementEmptyResultSet() {
        // Regression test for https://github.com/apache/arrow-adbc/issues/863
        suite.NoError(suite.stmt.SetSqlQuery("SHOW WAREHOUSES"))
diff --git a/go/adbc/driver/snowflake/record_reader.go 
b/go/adbc/driver/snowflake/record_reader.go
index 0434674b..63cfb0e7 100644
--- a/go/adbc/driver/snowflake/record_reader.go
+++ b/go/adbc/driver/snowflake/record_reader.go
@@ -113,40 +113,37 @@ func getTransformer(sc *arrow.Schema, ld 
gosnowflake.ArrowStreamLoader) (*arrow.
                                return compute.CastArray(ctx, a, 
compute.SafeCastOptions(f.Type))
                        }
                case "TIMESTAMP_NTZ":
-                       dt := &arrow.TimestampType{Unit: arrow.Nanosecond}
+                       dt := &arrow.TimestampType{Unit: 
arrow.TimeUnit(srcMeta.Scale / 3)}
                        f.Type = dt
                        transformers[i] = func(ctx context.Context, a 
arrow.Array) (arrow.Array, error) {
+
+                               if a.DataType().ID() != arrow.STRUCT {
+                                       return compute.CastArray(ctx, a, 
compute.SafeCastOptions(dt))
+                               }
+
                                pool := compute.GetAllocator(ctx)
                                tb := array.NewTimestampBuilder(pool, dt)
                                defer tb.Release()
 
-                               if a.DataType().ID() == arrow.STRUCT {
-                                       structData := a.(*array.Struct)
-                                       epoch := 
structData.Field(0).(*array.Int64).Int64Values()
-                                       fraction := 
structData.Field(1).(*array.Int32).Int32Values()
-                                       for i := 0; i < a.Len(); i++ {
-                                               if a.IsNull(i) {
-                                                       tb.AppendNull()
-                                                       continue
-                                               }
-
-                                               
tb.Append(arrow.Timestamp(time.Unix(epoch[i], int64(fraction[i])).UnixNano()))
+                               structData := a.(*array.Struct)
+                               epoch := 
structData.Field(0).(*array.Int64).Int64Values()
+                               fraction := 
structData.Field(1).(*array.Int32).Int32Values()
+                               for i := 0; i < a.Len(); i++ {
+                                       if a.IsNull(i) {
+                                               tb.AppendNull()
+                                               continue
                                        }
-                               } else {
-                                       for i, t := range 
a.(*array.Int64).Int64Values() {
-                                               if a.IsNull(i) {
-                                                       tb.AppendNull()
-                                                       continue
-                                               }
 
-                                               val := time.Unix(0, 
int64(t)*int64(math.Pow10(9-int(srcMeta.Scale)))).UTC()
-                                               
tb.Append(arrow.Timestamp(val.UnixNano()))
+                                       v, err := 
arrow.TimestampFromTime(time.Unix(epoch[i], int64(fraction[i])), dt.TimeUnit())
+                                       if err != nil {
+                                               return nil, err
                                        }
+                                       tb.Append(v)
                                }
                                return tb.NewArray(), nil
                        }
                case "TIMESTAMP_LTZ":
-                       dt := &arrow.TimestampType{Unit: arrow.Nanosecond, 
TimeZone: loc.String()}
+                       dt := &arrow.TimestampType{Unit: 
arrow.TimeUnit(srcMeta.Scale) / 3, TimeZone: loc.String()}
                        f.Type = dt
                        transformers[i] = func(ctx context.Context, a 
arrow.Array) (arrow.Array, error) {
                                pool := compute.GetAllocator(ctx)
@@ -163,7 +160,11 @@ func getTransformer(sc *arrow.Schema, ld 
gosnowflake.ArrowStreamLoader) (*arrow.
                                                        continue
                                                }
 
-                                               
tb.Append(arrow.Timestamp(time.Unix(epoch[i], int64(fraction[i])).UnixNano()))
+                                               v, err := 
arrow.TimestampFromTime(time.Unix(epoch[i], int64(fraction[i])), dt.TimeUnit())
+                                               if err != nil {
+                                                       return nil, err
+                                               }
+                                               tb.Append(v)
                                        }
                                } else {
                                        for i, t := range 
a.(*array.Int64).Int64Values() {
@@ -174,13 +175,19 @@ func getTransformer(sc *arrow.Schema, ld 
gosnowflake.ArrowStreamLoader) (*arrow.
 
                                                q := int64(t) / 
int64(math.Pow10(int(srcMeta.Scale)))
                                                r := int64(t) % 
int64(math.Pow10(int(srcMeta.Scale)))
-                                               
tb.Append(arrow.Timestamp(time.Unix(q, r).UnixNano()))
+                                               v, err := 
arrow.TimestampFromTime(time.Unix(q, r), dt.Unit)
+                                               if err != nil {
+                                                       return nil, err
+                                               }
+                                               tb.Append(v)
                                        }
                                }
                                return tb.NewArray(), nil
                        }
                case "TIMESTAMP_TZ":
-                       dt := &arrow.TimestampType{Unit: arrow.Nanosecond}
+                       // we convert each value to UTC since we have timezone 
information
+                       // with the data that lets us do so.
+                       dt := &arrow.TimestampType{TimeZone: "UTC", Unit: 
arrow.TimeUnit(srcMeta.Scale / 3)}
                        f.Type = dt
                        transformers[i] = func(ctx context.Context, a 
arrow.Array) (arrow.Array, error) {
                                pool := compute.GetAllocator(ctx)
@@ -198,7 +205,11 @@ func getTransformer(sc *arrow.Schema, ld 
gosnowflake.ArrowStreamLoader) (*arrow.
                                                }
 
                                                loc := 
gosnowflake.Location(int(tzoffset[i]) - 1440)
-                                               
tb.Append(arrow.Timestamp(time.Unix(epoch[i], 0).In(loc).UnixNano()))
+                                               v, err := 
arrow.TimestampFromTime(time.Unix(epoch[i], 0).In(loc), dt.Unit)
+                                               if err != nil {
+                                                       return nil, err
+                                               }
+                                               tb.Append(v)
                                        }
                                } else {
                                        epoch := 
structData.Field(0).(*array.Int64).Int64Values()
@@ -211,7 +222,11 @@ func getTransformer(sc *arrow.Schema, ld 
gosnowflake.ArrowStreamLoader) (*arrow.
                                                }
 
                                                loc := 
gosnowflake.Location(int(tzoffset[i]) - 1440)
-                                               
tb.Append(arrow.Timestamp(time.Unix(epoch[i], 
int64(fraction[i])).In(loc).UnixNano()))
+                                               v, err := 
arrow.TimestampFromTime(time.Unix(epoch[i], int64(fraction[i])).In(loc), 
dt.Unit)
+                                               if err != nil {
+                                                       return nil, err
+                                               }
+                                               tb.Append(v)
                                        }
                                }
                                return tb.NewArray(), nil
diff --git a/go/adbc/driver/snowflake/statement.go 
b/go/adbc/driver/snowflake/statement.go
index 481e7f7c..be554780 100644
--- a/go/adbc/driver/snowflake/statement.go
+++ b/go/adbc/driver/snowflake/statement.go
@@ -153,9 +153,9 @@ func toSnowflakeType(dt arrow.DataType) string {
                ts := dt.(*arrow.TimestampType)
                prec := int(ts.Unit) * 3
                if ts.TimeZone == "" {
-                       return fmt.Sprintf("timestamp_tz(%d)", prec)
+                       return fmt.Sprintf("timestamp_ntz(%d)", prec)
                }
-               return fmt.Sprintf("timestamp_ltz(%d)", prec)
+               return fmt.Sprintf("timestamp_tz(%d)", prec)
        case arrow.DENSE_UNION, arrow.SPARSE_UNION:
                return "variant"
        case arrow.LIST, arrow.LARGE_LIST, arrow.FIXED_SIZE_LIST:
diff --git a/go/adbc/go.mod b/go/adbc/go.mod
index 21bfad3e..e4496c24 100644
--- a/go/adbc/go.mod
+++ b/go/adbc/go.mod
@@ -20,7 +20,7 @@ module github.com/apache/arrow-adbc/go/adbc
 go 1.18
 
 require (
-       github.com/apache/arrow/go/v13 v13.0.0-20230620164925-94af6c3c9646
+       github.com/apache/arrow/go/v13 v13.0.0-20230710202504-70f447636553
        github.com/bluele/gcache v0.0.2
        github.com/google/uuid v1.3.0
        github.com/snowflakedb/gosnowflake v1.6.21
diff --git a/go/adbc/go.sum b/go/adbc/go.sum
index 97634848..c8b128ec 100644
--- a/go/adbc/go.sum
+++ b/go/adbc/go.sum
@@ -18,6 +18,8 @@ github.com/apache/arrow/go/v12 v12.0.0 
h1:xtZE63VWl7qLdB0JObIXvvhGjoVNrQ9ciIHG2O
 github.com/apache/arrow/go/v12 v12.0.0/go.mod 
h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg=
 github.com/apache/arrow/go/v13 v13.0.0-20230620164925-94af6c3c9646 
h1:hLcsUn9hiiD7jDfJDKOe1tBfOL5v0wgrya5S8XXqzLw=
 github.com/apache/arrow/go/v13 v13.0.0-20230620164925-94af6c3c9646/go.mod 
h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
+github.com/apache/arrow/go/v13 v13.0.0-20230710202504-70f447636553 
h1:LV3nIWJ2254APRpYAcMxWbxoQwt66gnrkZ5NaDs1IPI=
+github.com/apache/arrow/go/v13 v13.0.0-20230710202504-70f447636553/go.mod 
h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
 github.com/apache/thrift v0.16.0 
h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
 github.com/apache/thrift v0.16.0/go.mod 
h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
 github.com/aws/aws-sdk-go-v2 v1.18.0 
h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY=

Reply via email to