eramitmittal commented on code in PR #43551:
URL: https://github.com/apache/arrow/pull/43551#discussion_r1703599571


##########
java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java:
##########
@@ -206,6 +216,178 @@ public SchemaResult getExecuteSubstraitSchema(
     return getExecuteSubstraitSchema(substraitPlan, /*transaction*/ null, 
options);
   }
 
+  /**
+   * Execute a bulk ingest on the server.
+   *
+   * @param data data to be ingested
+   * @param ingestOptions options for the ingest request.
+   * @param options RPC-layer hints for this call.
+   * @return the number of rows affected.
+   */
+  public long executeIngest(
+      final VectorSchemaRoot data,
+      final ExecuteIngestOptions ingestOptions,
+      final CallOption... options) {
+    return executeIngest(data, ingestOptions, /*transaction*/ null, options);
+  }
+
+  /**
+   * Execute a bulk ingest on the server.
+   *
+   * @param dataReader data stream to be ingested
+   * @param ingestOptions options for the ingest request.
+   * @param options RPC-layer hints for this call.
+   * @return the number of rows affected.
+   */
+  public long executeIngest(
+      final ArrowStreamReader dataReader,
+      final ExecuteIngestOptions ingestOptions,
+      final CallOption... options) {
+    return executeIngest(dataReader, ingestOptions, /*transaction*/ null, 
options);
+  }
+
+  /**
+   * Execute a bulk ingest on the server.
+   *
+   * @param data data to be ingested
+   * @param ingestOptions options for the ingest request.
+   * @param transaction The transaction that this ingest request is part of.
+   * @param options RPC-layer hints for this call.
+   * @return the number of rows affected.
+   */
+  public long executeIngest(
+      final VectorSchemaRoot data,
+      final ExecuteIngestOptions ingestOptions,
+      Transaction transaction,
+      final CallOption... options) {
+    try {
+      return executeIngest(
+          data, ingestOptions, transaction, 
FlightClient.ClientStreamListener::putNext, options);
+    } catch (FlightRuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw CallStatus.UNKNOWN.withCause(e).toRuntimeException();
+    }
+  }
+
+  /**
+   * Execute a bulk ingest on the server.
+   *
+   * @param dataReader data stream to be ingested
+   * @param ingestOptions options for the ingest request.
+   * @param transaction The transaction that this ingest request is part of.
+   * @param options RPC-layer hints for this call.
+   * @return the number of rows affected.
+   */
+  public long executeIngest(
+      final ArrowStreamReader dataReader,
+      final ExecuteIngestOptions ingestOptions,
+      Transaction transaction,
+      final CallOption... options) {
+
+    try {
+      return executeIngest(
+          dataReader.getVectorSchemaRoot(),
+          ingestOptions,
+          transaction,
+          listener -> {
+            while (true) {
+              try {
+                if (!dataReader.loadNextBatch()) {
+                  break;
+                }
+              } catch (IOException e) {
+                throw new UncheckedIOException(e);
+              }
+              listener.putNext();
+            }
+          },
+          options);
+    } catch (FlightRuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw CallStatus.UNKNOWN.withCause(e).toRuntimeException();
+    }
+  }
+
+  private long executeIngest(
+      final VectorSchemaRoot data,
+      final ExecuteIngestOptions ingestOptions,
+      final Transaction transaction,
+      final Consumer<FlightClient.ClientStreamListener> dataPutter,
+      final CallOption... options)
+      throws Exception {
+    try (FlightStream sqlInfoStream =
+        getStream(
+            getSqlInfo(
+                    SqlInfo.FLIGHT_SQL_SERVER_BULK_INGESTION,
+                    SqlInfo.FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED)
+                .getEndpoints()
+                .get(0)
+                .getTicket())) {
+      boolean bulkIngestionSupported = false;
+      boolean transactionSupported = false;
+
+      while (sqlInfoStream.next()) {
+        VectorSchemaRoot sqlInfoRoot = sqlInfoStream.getRoot();
+
+        UInt4Vector infoName = (UInt4Vector) sqlInfoRoot.getVector(0);
+        DenseUnionVector value = (DenseUnionVector) sqlInfoRoot.getVector(1);
+
+        for (int i = 0; i < sqlInfoRoot.getRowCount(); i++) {
+          final int code = infoName.get(i);
+          if (code == SqlInfo.FLIGHT_SQL_SERVER_BULK_INGESTION_VALUE) {
+            bulkIngestionSupported =
+                Boolean.TRUE.equals(
+                    
value.getBitVector(value.getTypeId(i)).getObject(value.getOffset(i)));
+          } else if (code == 
SqlInfo.FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED_VALUE) {
+            transactionSupported =
+                Boolean.TRUE.equals(
+                    
value.getBitVector(value.getTypeId(i)).getObject(value.getOffset(i)));
+          }
+        }
+      }
+
+      if (!bulkIngestionSupported) {
+        throw new UnsupportedOperationException("Executing bulk ingestion is 
not supported.");
+      }
+
+      if (!isNull(transaction) && !transactionSupported) {
+        throw new UnsupportedOperationException(
+            "Executing bulk ingestion with-in a transaction is not 
supported.");
+      }
+
+      final CommandStatementIngest.Builder builder = 
CommandStatementIngest.newBuilder();
+      if (transaction != null) {
+        
builder.setTransactionId(ByteString.copyFrom(transaction.getTransactionId()));
+      }
+      ingestOptions.updateCommandBuilder(builder);
+
+      final FlightDescriptor descriptor =
+          FlightDescriptor.command(Any.pack(builder.build()).toByteArray());
+      try (final SyncPutListener putListener = new SyncPutListener()) {
+
+        final FlightClient.ClientStreamListener listener =
+            client.startPut(descriptor, data, putListener, options);
+        dataPutter.accept(listener);
+        listener.completed();
+        listener.getResult();
+
+        try (final PutResult result = putListener.read()) {
+          final DoPutUpdateResult doPutUpdateResult =
+              
DoPutUpdateResult.parseFrom(result.getApplicationMetadata().nioBuffer());
+          return doPutUpdateResult.getRecordCount();
+        }
+      }
+    } catch (final UnsupportedOperationException e) {

Review Comment:
   Done. Included new commit with this change



##########
java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java:
##########
@@ -206,6 +216,178 @@ public SchemaResult getExecuteSubstraitSchema(
     return getExecuteSubstraitSchema(substraitPlan, /*transaction*/ null, 
options);
   }
 
+  /**
+   * Execute a bulk ingest on the server.
+   *
+   * @param data data to be ingested
+   * @param ingestOptions options for the ingest request.
+   * @param options RPC-layer hints for this call.
+   * @return the number of rows affected.
+   */
+  public long executeIngest(
+      final VectorSchemaRoot data,
+      final ExecuteIngestOptions ingestOptions,
+      final CallOption... options) {
+    return executeIngest(data, ingestOptions, /*transaction*/ null, options);
+  }
+
+  /**
+   * Execute a bulk ingest on the server.
+   *
+   * @param dataReader data stream to be ingested
+   * @param ingestOptions options for the ingest request.
+   * @param options RPC-layer hints for this call.
+   * @return the number of rows affected.
+   */
+  public long executeIngest(
+      final ArrowStreamReader dataReader,
+      final ExecuteIngestOptions ingestOptions,
+      final CallOption... options) {
+    return executeIngest(dataReader, ingestOptions, /*transaction*/ null, 
options);
+  }
+
+  /**
+   * Execute a bulk ingest on the server.
+   *
+   * @param data data to be ingested
+   * @param ingestOptions options for the ingest request.
+   * @param transaction The transaction that this ingest request is part of.
+   * @param options RPC-layer hints for this call.
+   * @return the number of rows affected.
+   */
+  public long executeIngest(
+      final VectorSchemaRoot data,
+      final ExecuteIngestOptions ingestOptions,
+      Transaction transaction,
+      final CallOption... options) {
+    try {
+      return executeIngest(
+          data, ingestOptions, transaction, 
FlightClient.ClientStreamListener::putNext, options);
+    } catch (FlightRuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw CallStatus.UNKNOWN.withCause(e).toRuntimeException();
+    }
+  }
+
+  /**
+   * Execute a bulk ingest on the server.
+   *
+   * @param dataReader data stream to be ingested
+   * @param ingestOptions options for the ingest request.
+   * @param transaction The transaction that this ingest request is part of.
+   * @param options RPC-layer hints for this call.
+   * @return the number of rows affected.
+   */
+  public long executeIngest(
+      final ArrowStreamReader dataReader,
+      final ExecuteIngestOptions ingestOptions,
+      Transaction transaction,
+      final CallOption... options) {
+
+    try {
+      return executeIngest(
+          dataReader.getVectorSchemaRoot(),
+          ingestOptions,
+          transaction,
+          listener -> {
+            while (true) {
+              try {
+                if (!dataReader.loadNextBatch()) {
+                  break;
+                }
+              } catch (IOException e) {
+                throw new UncheckedIOException(e);
+              }
+              listener.putNext();
+            }
+          },
+          options);
+    } catch (FlightRuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw CallStatus.UNKNOWN.withCause(e).toRuntimeException();
+    }
+  }
+
+  private long executeIngest(
+      final VectorSchemaRoot data,
+      final ExecuteIngestOptions ingestOptions,
+      final Transaction transaction,
+      final Consumer<FlightClient.ClientStreamListener> dataPutter,
+      final CallOption... options)
+      throws Exception {
+    try (FlightStream sqlInfoStream =
+        getStream(
+            getSqlInfo(
+                    SqlInfo.FLIGHT_SQL_SERVER_BULK_INGESTION,
+                    SqlInfo.FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED)
+                .getEndpoints()
+                .get(0)
+                .getTicket())) {
+      boolean bulkIngestionSupported = false;
+      boolean transactionSupported = false;
+
+      while (sqlInfoStream.next()) {
+        VectorSchemaRoot sqlInfoRoot = sqlInfoStream.getRoot();
+
+        UInt4Vector infoName = (UInt4Vector) sqlInfoRoot.getVector(0);
+        DenseUnionVector value = (DenseUnionVector) sqlInfoRoot.getVector(1);
+
+        for (int i = 0; i < sqlInfoRoot.getRowCount(); i++) {
+          final int code = infoName.get(i);
+          if (code == SqlInfo.FLIGHT_SQL_SERVER_BULK_INGESTION_VALUE) {
+            bulkIngestionSupported =
+                Boolean.TRUE.equals(
+                    
value.getBitVector(value.getTypeId(i)).getObject(value.getOffset(i)));
+          } else if (code == 
SqlInfo.FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED_VALUE) {
+            transactionSupported =
+                Boolean.TRUE.equals(
+                    
value.getBitVector(value.getTypeId(i)).getObject(value.getOffset(i)));
+          }
+        }
+      }
+
+      if (!bulkIngestionSupported) {
+        throw new UnsupportedOperationException("Executing bulk ingestion is 
not supported.");
+      }
+
+      if (!isNull(transaction) && !transactionSupported) {
+        throw new UnsupportedOperationException(
+            "Executing bulk ingestion with-in a transaction is not 
supported.");
+      }
+
+      final CommandStatementIngest.Builder builder = 
CommandStatementIngest.newBuilder();
+      if (transaction != null) {
+        
builder.setTransactionId(ByteString.copyFrom(transaction.getTransactionId()));
+      }
+      ingestOptions.updateCommandBuilder(builder);
+
+      final FlightDescriptor descriptor =
+          FlightDescriptor.command(Any.pack(builder.build()).toByteArray());
+      try (final SyncPutListener putListener = new SyncPutListener()) {
+
+        final FlightClient.ClientStreamListener listener =
+            client.startPut(descriptor, data, putListener, options);
+        dataPutter.accept(listener);
+        listener.completed();
+        listener.getResult();
+
+        try (final PutResult result = putListener.read()) {
+          final DoPutUpdateResult doPutUpdateResult =
+              
DoPutUpdateResult.parseFrom(result.getApplicationMetadata().nioBuffer());
+          return doPutUpdateResult.getRecordCount();
+        }
+      }
+    } catch (final UnsupportedOperationException e) {
+      throw CallStatus.UNIMPLEMENTED.withCause(e).toRuntimeException();
+    } catch (final InterruptedException | ExecutionException e) {

Review Comment:
   Done. Included new commit with this change



##########
java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java:
##########
@@ -1003,6 +1185,82 @@ public void close() throws Exception {
     AutoCloseables.close(client);
   }
 
+  /** Class to encapsulateFlight SQL bulk ingest request options. * */

Review Comment:
   Done. Included new commit with this change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to