eramitmittal commented on code in PR #43551:
URL: https://github.com/apache/arrow/pull/43551#discussion_r1703554444
##########
java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java:
##########
@@ -206,6 +216,178 @@ public SchemaResult getExecuteSubstraitSchema(
return getExecuteSubstraitSchema(substraitPlan, /*transaction*/ null,
options);
}
+ /**
+ * Execute a bulk ingest on the server.
+ *
+ * @param data data to be ingested
+ * @param ingestOptions options for the ingest request.
+ * @param options RPC-layer hints for this call.
+ * @return the number of rows affected.
+ */
+ public long executeIngest(
+ final VectorSchemaRoot data,
+ final ExecuteIngestOptions ingestOptions,
+ final CallOption... options) {
+ return executeIngest(data, ingestOptions, /*transaction*/ null, options);
+ }
+
+ /**
+ * Execute a bulk ingest on the server.
+ *
+ * @param dataReader data stream to be ingested
+ * @param ingestOptions options for the ingest request.
+ * @param options RPC-layer hints for this call.
+ * @return the number of rows affected.
+ */
+ public long executeIngest(
+ final ArrowStreamReader dataReader,
+ final ExecuteIngestOptions ingestOptions,
+ final CallOption... options) {
+ return executeIngest(dataReader, ingestOptions, /*transaction*/ null,
options);
+ }
+
+ /**
+ * Execute a bulk ingest on the server.
+ *
+ * @param data data to be ingested
+ * @param ingestOptions options for the ingest request.
+ * @param transaction The transaction that this ingest request is part of.
+ * @param options RPC-layer hints for this call.
+ * @return the number of rows affected.
+ */
+ public long executeIngest(
+ final VectorSchemaRoot data,
+ final ExecuteIngestOptions ingestOptions,
+ Transaction transaction,
+ final CallOption... options) {
+ try {
+ return executeIngest(
+ data, ingestOptions, transaction,
FlightClient.ClientStreamListener::putNext, options);
+ } catch (FlightRuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw CallStatus.UNKNOWN.withCause(e).toRuntimeException();
+ }
+ }
+
+ /**
+ * Execute a bulk ingest on the server.
+ *
+ * @param dataReader data stream to be ingested
+ * @param ingestOptions options for the ingest request.
+ * @param transaction The transaction that this ingest request is part of.
+ * @param options RPC-layer hints for this call.
+ * @return the number of rows affected.
+ */
+ public long executeIngest(
+ final ArrowStreamReader dataReader,
+ final ExecuteIngestOptions ingestOptions,
+ Transaction transaction,
+ final CallOption... options) {
+
+ try {
+ return executeIngest(
+ dataReader.getVectorSchemaRoot(),
+ ingestOptions,
+ transaction,
+ listener -> {
+ while (true) {
+ try {
+ if (!dataReader.loadNextBatch()) {
+ break;
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ listener.putNext();
+ }
+ },
+ options);
+ } catch (FlightRuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw CallStatus.UNKNOWN.withCause(e).toRuntimeException();
+ }
+ }
+
+ private long executeIngest(
+ final VectorSchemaRoot data,
+ final ExecuteIngestOptions ingestOptions,
+ final Transaction transaction,
+ final Consumer<FlightClient.ClientStreamListener> dataPutter,
+ final CallOption... options)
+ throws Exception {
Review Comment:
The private method has throws Exception declaration but the public methods
using this private method don't have it. They are already catching and
wrapping. e.g.
` public long executeIngest(
final ArrowStreamReader dataReader,
final ExecuteIngestOptions ingestOptions,
Transaction transaction,
final CallOption... options) {
`
Anyway, with the code to check bulk ingestion support removed, there will be
no need to put a throws declaration. This will be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]