lidavidm commented on code in PR #97: URL: https://github.com/apache/arrow-adbc/pull/97#discussion_r959584216
########## go/adbc/sqldriver/driver.go: ########## @@ -0,0 +1,695 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package sqldriver is a wrapper around the ADBC (Arrow Database +// Connectivity) interfaces to support the standard golang database/sql +// package. +// +// This allows any ADBC driver implementation to also be used as-is +// with the database/sql package of the standard library rather than +// having to implement drivers for both separately. +// +// Registering the driver can be done by importing this and then running +// +// sql.Register("drivername", sqldriver.Driver{adbcdriver}) +// +// EXPERIMENTAL. The ADBC interfaces are subject to change and as such +// this wrapper is also subject to change based on that. +package sqldriver + +import ( + "context" + "database/sql" + "database/sql/driver" + "errors" + "io" + "reflect" + "strconv" + "strings" + "unsafe" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow/go/v10/arrow" + "github.com/apache/arrow/go/v10/arrow/array" + "github.com/apache/arrow/go/v10/arrow/decimal128" + "github.com/apache/arrow/go/v10/arrow/decimal256" + "github.com/apache/arrow/go/v10/arrow/memory" +) + +func getIsolationlevel(lvl sql.IsolationLevel) adbc.OptionIsolationLevel { + switch lvl { + case sql.LevelDefault: + return adbc.LevelDefault + case sql.LevelReadUncommitted: + return adbc.LevelReadUncommitted + case sql.LevelReadCommitted: + return adbc.LevelReadCommitted + case sql.LevelRepeatableRead: + return adbc.LevelRepeatableRead + case sql.LevelSnapshot: + return adbc.LevelSnapshot + case sql.LevelSerializable: + return adbc.LevelSerializable + case sql.LevelLinearizable: + return adbc.LevelLinearizable + } + return "" +} + +func parseConnectStr(str string) (ret map[string]string, err error) { + ret = make(map[string]string) + for _, kv := range strings.Split(str, ";") { + parsed := strings.Split(kv, "=") + if len(parsed) != 2 { + return nil, &adbc.Error{ + Msg: "invalid format for connection string", + Code: adbc.StatusInvalidArgument, + } + } + + ret[strings.TrimSpace(parsed[0])] = strings.TrimSpace(parsed[1]) + } + return +} + +type connector struct { + driver adbc.Driver +} + +// Connect returns a connection to the database. Connect may +// return a cached connection (one previously closed), but doing +// so is unnecessary; the sql package maintains a pool of idle +// connections for efficient re-use. +// +// The provided context.Context is for dialing purposes only +// (see net.DialContext) and should not be stored or used for +// other purposes. A default timeout should still be used when +// dialing as a connection pool may call Connect asynchronously +// to any query. +// +// The returned connection is only used by one goroutine at a time. +func (c *connector) Connect(ctx context.Context) (driver.Conn, error) { + cnxn, err := c.driver.Open(ctx) + if err != nil { + return nil, err + } + + return &conn{Conn: cnxn, drv: c.driver}, nil +} + +// Driver returns the underlying Driver of the connector, +// mainly to maintain compatibility with the Driver method on sql.DB +func (c *connector) Driver() driver.Driver { return &Driver{c.driver} } + +type Driver struct { + Driver adbc.Driver +} + +// Open returns a new connection to the database. The name +// should be semi-colon separated key-value pairs of the form: +// key=value;key2=value2;..... +// +// Open may return a cached connection (one previously closed), +// but doing so is unnecessary; the sql package maintains a pool +// of idle connections for efficient re-use. +// +// The returned connection is only used by one goroutine at a time. +func (d *Driver) Open(name string) (driver.Conn, error) { + opts, err := parseConnectStr(name) + if err != nil { + return nil, err + } + d.Driver.SetOptions(opts) + + cnxn, err := d.Driver.Open(context.Background()) + if err != nil { + return nil, err + } + + return &conn{Conn: cnxn, drv: d.Driver}, nil +} + +// OpenConnector expects the same format as driver.Open +func (d *Driver) OpenConnector(name string) (driver.Connector, error) { + opts, err := parseConnectStr(name) + if err != nil { + return nil, err + } + d.Driver.SetOptions(opts) + + return &connector{d.Driver}, nil +} + +type ctxOptsKey struct{} + +func SetOptionsInCtx(ctx context.Context, opts map[string]string) context.Context { + return context.WithValue(ctx, ctxOptsKey{}, opts) +} + +func GetOptionsFromCtx(ctx context.Context) map[string]string { + v, ok := ctx.Value(ctxOptsKey{}).(map[string]string) + if !ok { + return nil + } + return v +} + +// conn is a connection to a database. It is not used concurrently by +// multiple goroutines. It is assumed to be stateful. +type conn struct { + Conn adbc.Connection + drv adbc.Driver +} + +// Close invalidates and potentially stops any current prepared +// statements and transactions, marking this connection as no longer +// in use. +// +// Because the sql package maintains a free pool of connections and +// only calls Close when there's a surplus of idle connections, +// it shouldn't be necessary for drivers to do their own connection +// caching. +// +// Drivers must ensure all network calls made by Close do not block +// indefinitely (e.g. apply a timeout) +func (c *conn) Close() error { + return c.Conn.Close() +} + +// Begin exists to fulfill the Conn interface, but will return an error. +// Instead, the ConnBeginTx interface is implemented instead. +// +// Deprecated +func (c *conn) Begin() (driver.Tx, error) { + return nil, &adbc.Error{Code: adbc.StatusNotImplemented} +} + +func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) { + if postopt, ok := c.Conn.(adbc.PostInitOptions); ok { + if err := postopt.SetOption(adbc.OptionKeyAutoCommit, adbc.OptionValueDisabled); err != nil { + return nil, err + } + isolationLevel := getIsolationlevel(sql.IsolationLevel(opts.Isolation)) + if isolationLevel == "" { + return nil, &adbc.Error{Code: adbc.StatusNotImplemented} + } + + if err := postopt.SetOption(adbc.OptionKeyIsolationLevel, string(isolationLevel)); err != nil { + return nil, err + } + + if opts.ReadOnly { + if err := postopt.SetOption(adbc.OptionKeyTransactionReadOnly, adbc.OptionValueEnabled); err != nil { + return nil, err + } + } + return tx{ctx: ctx, conn: c.Conn}, nil + } + + return nil, &adbc.Error{Code: adbc.StatusNotImplemented} +} + +// Prepare returns a prepared statement, bound to this connection. +func (c *conn) Prepare(query string) (driver.Stmt, error) { + return c.PrepareContext(context.Background(), query) +} + +// PrepareContext returns a prepared statement, bound to this connection. +// Context is for the preparation of the statement. The statement must not +// store the context within the statement itself. +func (c *conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { + opts := GetOptionsFromCtx(ctx) + s, err := c.Conn.NewStatement(opts) + if err != nil { + return nil, err + } + + if err := s.SetSqlQuery(query); err != nil { + s.Close() + return nil, err + } + + paramSchema, err := s.GetParameterSchema() + var adbcErr adbc.Error + if errors.As(err, &adbcErr) { + if adbcErr.Code != adbc.StatusNotImplemented { + return nil, err + } + } + + return &stmt{stmt: s, paramSchema: paramSchema}, nil +} + +type tx struct { + ctx context.Context + conn adbc.Connection +} + +func (t tx) Commit() error { + if err := t.conn.Commit(t.ctx); err != nil { + return err + } + + return t.conn.(adbc.PostInitOptions).SetOption(adbc.OptionKeyAutoCommit, adbc.OptionValueEnabled) +} + +func (t tx) Rollback() error { + if err := t.conn.Rollback(t.ctx); err != nil { + return err + } + return t.conn.(adbc.PostInitOptions).SetOption(adbc.OptionKeyAutoCommit, adbc.OptionValueEnabled) +} + +type stmt struct { + stmt adbc.Statement + paramSchema *arrow.Schema +} + +func (s *stmt) Close() error { + return s.stmt.Close() +} + +func (s *stmt) NumInput() int { + if s.paramSchema == nil { + return -1 + } + + return len(s.paramSchema.Fields()) +} + +func (s *stmt) Exec(args []driver.Value) (driver.Result, error) { + return nil, driver.ErrSkip +} + +func (s *stmt) Query(args []driver.Value) (driver.Rows, error) { + return nil, driver.ErrSkip +} + +func checkType[T any](val any) bool { + switch val.(type) { + case T, *T: + default: + return false + } + return true +} + +func isCorrectParamType(typ arrow.Type, val driver.Value) bool { + switch typ { + case arrow.BINARY: + return checkType[[]byte](val) + case arrow.BOOL: + return checkType[bool](val) + case arrow.INT8: + return checkType[int8](val) + case arrow.UINT8: + return checkType[uint8](val) + case arrow.INT16: + return checkType[int16](val) + case arrow.UINT16: + return checkType[uint16](val) + case arrow.INT32: + return checkType[int32](val) + case arrow.UINT32: + return checkType[uint32](val) + case arrow.INT64: + return checkType[int64](val) + case arrow.UINT64: + return checkType[uint64](val) + case arrow.STRING: + return checkType[string](val) + case arrow.FLOAT32: + return checkType[float32](val) + case arrow.FLOAT64: + return checkType[float64](val) + case arrow.DATE32: + return checkType[arrow.Date32](val) + case arrow.DATE64: + return checkType[arrow.Date64](val) + case arrow.TIME32: + return checkType[arrow.Time32](val) + case arrow.TIME64: + return checkType[arrow.Time64](val) + case arrow.TIMESTAMP: + return checkType[arrow.Timestamp](val) + } + // TODO: add more types here + return true Review Comment: nit: wouldn't `false` be a more reasonable default? ########## adbc.h: ########## @@ -654,6 +654,71 @@ AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection, /// enabled. #define ADBC_CONNECTION_OPTION_AUTOCOMMIT "adbc.connection.autocommit" +/// \brief The name of the canonical option for whether the current +/// transaction should be restricted to being a read-only transaction. +#define ADBC_CONNECTION_OPTION_TRANSACTION_READ_ONLY \ + "adbc.connection.transaction.readonly" Review Comment: Interesting, so Golang has you explicitly manage transactions. DBAPI/JDBC/ODBC have "auto commit on" and you implicitly manage transactions (by disabling autocommit, then calling commit/rollback). In these APIs, readonly is a property of the connection, not the transaction. Given that the rest of the APIs are patterned after DBAPI/JDBC/ODBC, I think this would make more sense as a connection property as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
