raulcd commented on code in PR #49339:
URL: https://github.com/apache/arrow/pull/49339#discussion_r2829330726


##########
cpp/src/arrow/flight/transport/grpc/async_grpc_server.h:
##########
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Async gRPC-based. This is a PoC.
+
+#pragma once
+
+#include <grpcpp/generic/async_generic_service.h>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/flight/protocol_internal.h"
+#include "arrow/flight/serialization_internal.h"
+#include "arrow/flight/server.h"
+#include "arrow/flight/transport/grpc/customize_grpc.h"
+#include "arrow/flight/transport/grpc/serialization_internal.h"
+#include "arrow/record_batch.h"
+
+namespace arrow::flight::transport::grpc {
+
+namespace pb = arrow::flight::protocol;
+
+// DoGet using gRPC's generic callback API with ServerGenericBidiReactor.
+class DoGetReactor : public ::grpc::ServerGenericBidiReactor {
+ public:
+  DoGetReactor() { StartRead(&request_buf_); }
+
+  void OnReadDone(bool ok) override {
+    // Request has been read.
+    if (!ok) {
+      Finish(::grpc::Status(::grpc::StatusCode::INTERNAL, "Failed to read 
request"));
+      return;
+    }
+
+    // DoGet request must contain the Ticket.
+    // TODO Parse ticket, we do not care about it in this PoC.
+    WriteNextPayload();
+  }
+
+  void OnWriteDone(bool ok) override {
+    // We have finished writing. We can write the next payload or finish the 
stream.
+    if (!ok) {
+      Finish(::grpc::Status(::grpc::StatusCode::INTERNAL, "Write failed"));
+      return;
+    }
+    WriteNextPayload();
+  }
+
+  void OnCancel() override {
+    // Client cancelled the RPC. We must implement this out of the PoC.
+  }
+
+  void OnDone() override { delete this; }
+
+ private:
+  void WriteNextPayload() {
+    FlightPayload payload;
+    if (data_stream_ == nullptr) {
+      auto schema = arrow::schema(
+          {arrow::field("a", arrow::int64()), arrow::field("b", 
arrow::int64())});
+      arrow::Int64Builder builder_a, builder_b;
+      (void)builder_a.AppendValues({1, 2, 3, 4, 5});
+      (void)builder_b.AppendValues({10, 20, 30, 40, 50});
+      auto arr_a = *builder_a.Finish();
+      auto arr_b = *builder_b.Finish();
+      auto batch = arrow::RecordBatch::Make(schema, 5, {arr_a, arr_b});
+      auto reader =
+          RecordBatchReader::Make({batch, batch, batch, batch, 
batch}).ValueOrDie();
+      data_stream_ = std::make_unique<RecordBatchStream>(std::move(reader));
+      payload = data_stream_->GetSchemaPayload().ValueOrDie();
+    } else {
+      payload = data_stream_->Next().ValueOrDie();
+    }
+
+    if (payload.ipc_message.metadata == nullptr) {
+      Finish(::grpc::Status::OK);
+      return;
+    }
+
+    auto buffers = payload.SerializeToBuffers().ValueOrDie();

Review Comment:
   Extract the buffers (`arrow::BufferVector`) directly from the 
`FlightPayload` this is a new API.



##########
cpp/src/arrow/flight/serialization_internal.cc:
##########
@@ -612,6 +619,135 @@ Status ToProto(const CloseSessionResult& result, 
pb::CloseSessionResult* pb_resu
   return Status::OK();
 }
 
+namespace {
+
+using google::protobuf::internal::WireFormatLite;
+using google::protobuf::io::ArrayOutputStream;
+using google::protobuf::io::CodedOutputStream;
+
+static constexpr int64_t kInt32Max = std::numeric_limits<int32_t>::max();
+static const uint8_t kSerializePaddingBytes[8] = {0, 0, 0, 0, 0, 0, 0, 0};
+
+arrow::Status IpcMessageHeaderSize(const arrow::ipc::IpcPayload& ipc_msg, bool 
has_body,
+                                   size_t* header_size, int32_t* 
metadata_size) {

Review Comment:
   This and `SerializePayloadToBuffers` below are almost copy&paste from what 
we had at `cpp/src/arrow/flight/transport/grpc/serialization_internal.cc` -->  
`FlightDataSerialize`
   the main difference is they deal with `arrow::Buffer`s instead of 
`grpc::ByteBuffer`'s



##########
cpp/src/arrow/flight/transport/grpc/async_grpc_server.h:
##########
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Async gRPC-based. This is a PoC.
+
+#pragma once
+
+#include <grpcpp/generic/async_generic_service.h>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/flight/protocol_internal.h"
+#include "arrow/flight/serialization_internal.h"
+#include "arrow/flight/server.h"
+#include "arrow/flight/transport/grpc/customize_grpc.h"
+#include "arrow/flight/transport/grpc/serialization_internal.h"
+#include "arrow/record_batch.h"
+
+namespace arrow::flight::transport::grpc {
+
+namespace pb = arrow::flight::protocol;
+
+// DoGet using gRPC's generic callback API with ServerGenericBidiReactor.
+class DoGetReactor : public ::grpc::ServerGenericBidiReactor {
+ public:
+  DoGetReactor() { StartRead(&request_buf_); }
+
+  void OnReadDone(bool ok) override {
+    // Request has been read.
+    if (!ok) {
+      Finish(::grpc::Status(::grpc::StatusCode::INTERNAL, "Failed to read 
request"));
+      return;
+    }
+
+    // DoGet request must contain the Ticket.
+    // TODO Parse ticket, we do not care about it in this PoC.
+    WriteNextPayload();
+  }
+
+  void OnWriteDone(bool ok) override {
+    // We have finished writing. We can write the next payload or finish the 
stream.
+    if (!ok) {
+      Finish(::grpc::Status(::grpc::StatusCode::INTERNAL, "Write failed"));
+      return;
+    }
+    WriteNextPayload();
+  }
+
+  void OnCancel() override {
+    // Client cancelled the RPC. We must implement this out of the PoC.
+  }
+
+  void OnDone() override { delete this; }
+
+ private:
+  void WriteNextPayload() {
+    FlightPayload payload;
+    if (data_stream_ == nullptr) {
+      auto schema = arrow::schema(
+          {arrow::field("a", arrow::int64()), arrow::field("b", 
arrow::int64())});
+      arrow::Int64Builder builder_a, builder_b;
+      (void)builder_a.AppendValues({1, 2, 3, 4, 5});
+      (void)builder_b.AppendValues({10, 20, 30, 40, 50});
+      auto arr_a = *builder_a.Finish();
+      auto arr_b = *builder_b.Finish();
+      auto batch = arrow::RecordBatch::Make(schema, 5, {arr_a, arr_b});
+      auto reader =
+          RecordBatchReader::Make({batch, batch, batch, batch, 
batch}).ValueOrDie();
+      data_stream_ = std::make_unique<RecordBatchStream>(std::move(reader));
+      payload = data_stream_->GetSchemaPayload().ValueOrDie();
+    } else {
+      payload = data_stream_->Next().ValueOrDie();

Review Comment:
   As discussed we should use `RecordBatchStream` to get the `FlightPayload`s 
not generate the payloads manually.



##########
cpp/src/arrow/flight/transport/grpc/serialization_internal.cc:
##########
@@ -176,142 +172,31 @@ arrow::Result<::grpc::Slice> SliceFromBuffer(const 
std::shared_ptr<Buffer>& buf)
   return slice;
 }
 
-const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 0, 0, 0};
-
-// Update the sizes of our Protobuf fields based on the given IPC payload.
-::grpc::Status IpcMessageHeaderSize(const arrow::ipc::IpcPayload& ipc_msg, 
bool has_body,
-                                    size_t* header_size, int32_t* 
metadata_size) {
-  DCHECK_LE(ipc_msg.metadata->size(), kInt32Max);
-  *metadata_size = static_cast<int32_t>(ipc_msg.metadata->size());
-
-  // 1 byte for metadata tag
-  *header_size += 1 + WireFormatLite::LengthDelimitedSize(*metadata_size);
-
-  // 2 bytes for body tag
-  if (has_body) {
-    // We write the body tag in the header but not the actual body data
-    *header_size += 2 + 
WireFormatLite::LengthDelimitedSize(ipc_msg.body_length) -
-                    ipc_msg.body_length;
-  }
-
-  return ::grpc::Status::OK;
-}
-
-}  // namespace
-
 ::grpc::Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out,
                                    bool* own_buffer) {

Review Comment:
   All the manual deserialization has been moved to a common 
`arrow::Result<arrow::BufferVector> SerializePayloadToBuffers(const 
arrow::flight::FlightPayload& msg)`  on 
`cpp/src/arrow/flight/serialization_internal.cc` which is called from 
`FlightPayload::SerializeToBuffers`.
   We just use the arrow::buffers here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to