lidavidm commented on code in PR #38385:
URL: https://github.com/apache/arrow/pull/38385#discussion_r1368901696


##########
go/arrow/flight/flightsql/client.go:
##########
@@ -218,6 +218,48 @@ func (c *Client) ExecuteSubstraitUpdate(ctx 
context.Context, plan SubstraitPlan,
        return updateResult.GetRecordCount(), nil
 }
 
+// ExecuteIngest is for executing a bulk ingestion and only returns the number 
of affected rows.
+func (c *Client) ExecuteIngest(ctx context.Context, rdr array.RecordReader, 
reqOptions *ExecuteIngestOpts, opts ...grpc.CallOption) (n int64, err error) {
+       var (
+               desc         *flight.FlightDescriptor
+               stream       pb.FlightService_DoPutClient
+               wr           *flight.Writer
+               res          *pb.PutResult
+               updateResult pb.DoPutUpdateResult
+       )
+
+       cmd := (*pb.CommandStatementIngest)(reqOptions)
+       if desc, err = descForCommand(cmd); err != nil {
+               return
+       }
+
+       if stream, err = c.Client.DoPut(ctx, opts...); err != nil {
+               return
+       }
+
+       wr = flight.NewRecordWriter(stream, ipc.WithAllocator(c.Alloc), 
ipc.WithSchema(rdr.Schema()))
+       wr.SetFlightDescriptor(desc)
+
+       for rdr.Next() {
+               rec := rdr.Record()
+               wr.Write(rec)
+       }
+
+       if err = stream.CloseSend(); err != nil {
+               return
+       }
+
+       if res, err = stream.Recv(); err != nil {
+               return
+       }
+
+       if err = proto.Unmarshal(res.GetAppMetadata(), &updateResult); err != 
nil {
+               return
+       }
+

Review Comment:
   I believe you have to drain the read-side of the stream here, even if 
technically the server should only have sent one message



##########
format/FlightSql.proto:
##########
@@ -817,6 +817,17 @@ enum SqlInfo {
    * - true: if invoking user-defined or vendor functions using the stored 
procedure escape syntax is supported.
    */
   SQL_STORED_FUNCTIONS_USING_CALL_SYNTAX_SUPPORTED = 576;
+
+  /*
+   * Retrieves a boolean value indicating whether transactions are supported 
for bulk ingestion. If not, invoking
+   * the method commit in the context of a bulk ingestion is a noop, and the 
isolation level is
+   * `arrow.flight.protocol.sql.SqlTransactionIsolationLevel.TRANSACTION_NONE`.
+   *
+   * Returns:
+   * - false: if bulk ingestion transactions are unsupported;
+   * - true: if bulk ingestion transactions are supported.
+   */
+   INGEST_TRANSACTIONS_SUPPORTED = 577;

Review Comment:
   I think this should go in the 0-500 range



##########
format/FlightSql.proto:
##########
@@ -1778,6 +1789,47 @@ message CommandPreparedStatementUpdate {
   bytes prepared_statement_handle = 1;
 }
 
+  /*
+   * Represents a bulk ingestion request. Used in the command member of 
FlightDescriptor
+   * for the the RPC call DoPut to cause the server load the contents of the 
stream's
+   * FlightData into the target destination.
+   */

Review Comment:
   nit: why the extra indent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to