This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 41e0879586 GH-36407: [C++] Add 
arrow::ipc::Listener::OnSchemaDecoded(schema, filtered_schema) (#36533)
41e0879586 is described below

commit 41e08795864b96f8fbbc9c42975d82e3bb0bb6d1
Author: Sutou Kouhei <[email protected]>
AuthorDate: Sat Jul 8 03:50:15 2023 +0900

    GH-36407: [C++] Add arrow::ipc::Listener::OnSchemaDecoded(schema, 
filtered_schema) (#36533)
    
    ### Rationale for this change
    
    The current `arrow::ipc::Listener::OnSchemaDecoded()` receives only the 
original schema. But we have filtered schema in decode phase. Filtered schema 
is useful for some cases. So we should pass the filtered schema too.
    
    ### What changes are included in this PR?
    
    This adds `arrow::ipc::Listener::OnSchemaDecoded(schema, filtered_schema)`. 
The default implementation of it just calls `OnSchemaDecoded(schema)` to keep 
backward compatibility.
    
    This also adds `arrow::ipc::CollectListener::filtered_schema()`.
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    Yes.
    * Closes: #36407
    
    Authored-by: Sutou Kouhei <[email protected]>
    Signed-off-by: Benjamin Kietzman <[email protected]>
---
 cpp/src/arrow/ipc/reader.cc | 17 +++++++++++------
 cpp/src/arrow/ipc/reader.h  | 27 +++++++++++++++++++++++++--
 2 files changed, 36 insertions(+), 8 deletions(-)

diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 6bd9d33023..eadba69f05 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -871,7 +871,7 @@ class StreamDecoderInternal : public MessageDecoderListener 
{
         num_read_initial_dictionaries_(0),
         dictionary_memo_(),
         schema_(nullptr),
-        out_schema_(nullptr),
+        filtered_schema_(nullptr),
         stats_(),
         swap_endian_(false) {}
 
@@ -900,7 +900,7 @@ class StreamDecoderInternal : public MessageDecoderListener 
{
 
   Listener* raw_listener() const { return listener_.get(); }
 
-  std::shared_ptr<Schema> schema() const { return out_schema_; }
+  std::shared_ptr<Schema> schema() const { return filtered_schema_; }
 
   ReadStats stats() const { return stats_; }
 
@@ -915,14 +915,14 @@ class StreamDecoderInternal : public 
MessageDecoderListener {
  private:
   Status OnSchemaMessageDecoded(std::unique_ptr<Message> message) {
     RETURN_NOT_OK(UnpackSchemaMessage(*message, options_, &dictionary_memo_, 
&schema_,
-                                      &out_schema_, &field_inclusion_mask_,
+                                      &filtered_schema_, 
&field_inclusion_mask_,
                                       &swap_endian_));
 
     num_required_initial_dictionaries_ = dictionary_memo_.fields().num_dicts();
     num_read_initial_dictionaries_ = 0;
     if (num_required_initial_dictionaries_ == 0) {
       state_ = State::RECORD_BATCHES;
-      RETURN_NOT_OK(listener_->OnSchemaDecoded(schema_));
+      RETURN_NOT_OK(listener_->OnSchemaDecoded(schema_, filtered_schema_));
     } else {
       state_ = State::INITIAL_DICTIONARIES;
     }
@@ -939,7 +939,7 @@ class StreamDecoderInternal : public MessageDecoderListener 
{
     num_read_initial_dictionaries_++;
     if (num_read_initial_dictionaries_ == num_required_initial_dictionaries_) {
       state_ = State::RECORD_BATCHES;
-      ARROW_RETURN_NOT_OK(listener_->OnSchemaDecoded(schema_));
+      ARROW_RETURN_NOT_OK(listener_->OnSchemaDecoded(schema_, 
filtered_schema_));
     }
     return Status::OK();
   }
@@ -987,7 +987,7 @@ class StreamDecoderInternal : public MessageDecoderListener 
{
   int num_read_initial_dictionaries_;
   DictionaryMemo dictionary_memo_;
   std::shared_ptr<Schema> schema_;
-  std::shared_ptr<Schema> out_schema_;
+  std::shared_ptr<Schema> filtered_schema_;
   ReadStats stats_;
   bool swap_endian_;
 };
@@ -1959,6 +1959,11 @@ Status Listener::OnEOS() { return Status::OK(); }
 
 Status Listener::OnSchemaDecoded(std::shared_ptr<Schema> schema) { return 
Status::OK(); }
 
+Status Listener::OnSchemaDecoded(std::shared_ptr<Schema> schema,
+                                 std::shared_ptr<Schema> filtered_schema) {
+  return OnSchemaDecoded(std::move(schema));
+}
+
 Status Listener::OnRecordBatchDecoded(std::shared_ptr<RecordBatch> 
record_batch) {
   return Status::NotImplemented("OnRecordBatchDecoded() callback isn't 
implemented");
 }
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index edc2560854..1698abd14b 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -273,6 +273,8 @@ class ARROW_EXPORT Listener {
   /// \return Status
   ///
   /// \see StreamDecoder
+  ///
+  /// \since 13.0.0
   virtual Status OnRecordBatchWithMetadataDecoded(
       RecordBatchWithMetadata record_batch_with_metadata);
 
@@ -285,6 +287,21 @@ class ARROW_EXPORT Listener {
   ///
   /// \see StreamDecoder
   virtual Status OnSchemaDecoded(std::shared_ptr<Schema> schema);
+
+  /// \brief Called when a schema is decoded.
+  ///
+  /// The default implementation just calls OnSchemaDecoded(schema)
+  /// (without filtered_schema) to keep backward compatibility.
+  ///
+  /// \param[in] schema a schema decoded
+  /// \param[in] filtered_schema a filtered schema that only has read fields
+  /// \return Status
+  ///
+  /// \see StreamDecoder
+  ///
+  /// \since 13.0.0
+  virtual Status OnSchemaDecoded(std::shared_ptr<Schema> schema,
+                                 std::shared_ptr<Schema> filtered_schema);
 };
 
 /// \brief Collect schema and record batches decoded by StreamDecoder.
@@ -294,11 +311,13 @@ class ARROW_EXPORT Listener {
 /// \since 0.17.0
 class ARROW_EXPORT CollectListener : public Listener {
  public:
-  CollectListener() : schema_(), record_batches_(), metadatas_() {}
+  CollectListener() : schema_(), filtered_schema_(), record_batches_(), 
metadatas_() {}
   virtual ~CollectListener() = default;
 
-  Status OnSchemaDecoded(std::shared_ptr<Schema> schema) override {
+  Status OnSchemaDecoded(std::shared_ptr<Schema> schema,
+                         std::shared_ptr<Schema> filtered_schema) override {
     schema_ = std::move(schema);
+    filtered_schema_ = std::move(filtered_schema);
     return Status::OK();
   }
 
@@ -312,6 +331,9 @@ class ARROW_EXPORT CollectListener : public Listener {
   /// \return the decoded schema
   std::shared_ptr<Schema> schema() const { return schema_; }
 
+  /// \return the filtered schema
+  std::shared_ptr<Schema> filtered_schema() const { return filtered_schema_; }
+
   /// \return the all decoded record batches
   const std::vector<std::shared_ptr<RecordBatch>>& record_batches() const {
     return record_batches_;
@@ -348,6 +370,7 @@ class ARROW_EXPORT CollectListener : public Listener {
 
  private:
   std::shared_ptr<Schema> schema_;
+  std::shared_ptr<Schema> filtered_schema_;
   std::vector<std::shared_ptr<RecordBatch>> record_batches_;
   std::vector<std::shared_ptr<KeyValueMetadata>> metadatas_;
 };

Reply via email to