lidavidm commented on code in PR #3150:
URL: https://github.com/apache/arrow-adbc/pull/3150#discussion_r2204036004


##########
go/adbc/adbc.go:
##########
@@ -818,3 +818,37 @@ type GetSetOptions interface {
        GetOptionInt(key string) (int64, error)
        GetOptionDouble(key string) (float64, error)
 }
+
+// IngestStream is a helper for executing a bulk ingestion. This is a wrapper 
around
+// the five-step boilerplate of NewStatement, SetOption, Bind,
+// Execute, and Close.
+//
+// This is not part of the ADBC API specification.

Review Comment:
   Can we move this to ext.go? 
https://github.com/apache/arrow-adbc/blob/main/go/adbc/ext.go



##########
go/adbc/drivermgr/wrapper_sqlite_test.go:
##########
@@ -628,3 +628,71 @@ func TestDriverMgrCustomInitFunc(t *testing.T) {
                assert.Contains(t, exp.Msg, "undefined symbol: 
ThisSymbolDoesNotExist")
        }
 }
+
+func (dm *DriverMgrSuite) TestIngestStream() {
+       // 1) Create the target table
+       st, err := dm.conn.NewStatement()
+       dm.Require().NoError(err)
+       defer validation.CheckedClose(dm.T(), st)
+
+       dm.NoError(st.SetSqlQuery(`
+        CREATE TABLE IF NOT EXISTS ingest_test (
+            col1 INTEGER,
+            col2 TEXT
+        )
+    `))
+       n, err := st.ExecuteUpdate(dm.ctx)
+       dm.NoError(err)
+       dm.Equal(int64(0), n, "CREATE TABLE should report 0 rows affected")
+
+       // 2) Build two Arrow batches of data
+       schema := arrow.NewSchema(
+               []arrow.Field{
+                       {Name: "col1", Type: arrow.PrimitiveTypes.Int64, 
Nullable: true},
+                       {Name: "col2", Type: arrow.BinaryTypes.String, 
Nullable: true},
+               }, nil,
+       )
+       b := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+       defer b.Release()
+
+       // first batch: 3 rows
+       b.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)

Review Comment:
   ArrayFromJSON might make this easier



##########
go/adbc/adbc.go:
##########
@@ -818,3 +818,37 @@ type GetSetOptions interface {
        GetOptionInt(key string) (int64, error)
        GetOptionDouble(key string) (float64, error)
 }
+
+// IngestStream is a helper for executing a bulk ingestion. This is a wrapper 
around
+// the five-step boilerplate of NewStatement, SetOption, Bind,
+// Execute, and Close.
+//
+// This is not part of the ADBC API specification.
+func IngestStream(ctx context.Context, cnxn Connection, reader 
array.RecordReader, opts map[string]string) (int64, error) {
+       // 1) Create a new statement
+       stmt, err := cnxn.NewStatement()
+       if err != nil {
+               return -1, fmt.Errorf("IngestStream: NewStatement: %w", err)
+       }
+       defer stmt.Close()

Review Comment:
   Close is fallible, so you have to handle the error. You need something like 
this (with a named error return value)
   
   ```go
   defer func() {
        err = errors.Join(err, stmt.Close())
   }()
   ```



##########
go/adbc/drivermgr/wrapper_sqlite_test.go:
##########
@@ -628,3 +628,71 @@ func TestDriverMgrCustomInitFunc(t *testing.T) {
                assert.Contains(t, exp.Msg, "undefined symbol: 
ThisSymbolDoesNotExist")
        }
 }
+
+func (dm *DriverMgrSuite) TestIngestStream() {
+       // 1) Create the target table
+       st, err := dm.conn.NewStatement()
+       dm.Require().NoError(err)
+       defer validation.CheckedClose(dm.T(), st)
+
+       dm.NoError(st.SetSqlQuery(`
+        CREATE TABLE IF NOT EXISTS ingest_test (
+            col1 INTEGER,
+            col2 TEXT
+        )
+    `))
+       n, err := st.ExecuteUpdate(dm.ctx)
+       dm.NoError(err)
+       dm.Equal(int64(0), n, "CREATE TABLE should report 0 rows affected")

Review Comment:
   Ingest can create the table itself, so this isn't necessarily necessary



##########
go/adbc/adbc.go:
##########
@@ -818,3 +818,37 @@ type GetSetOptions interface {
        GetOptionInt(key string) (int64, error)
        GetOptionDouble(key string) (float64, error)
 }
+
+// IngestStream is a helper for executing a bulk ingestion. This is a wrapper 
around
+// the five-step boilerplate of NewStatement, SetOption, Bind,
+// Execute, and Close.
+//
+// This is not part of the ADBC API specification.
+func IngestStream(ctx context.Context, cnxn Connection, reader 
array.RecordReader, opts map[string]string) (int64, error) {

Review Comment:
   Maybe we should accept things like target table directly as parameters? Or 
have an explicit parameters struct? (We can keep the map for any other options 
but I think the idea ought to be that we make the common parameters into formal 
parameters instead of requiring the option)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to