This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ebc88c8fa GH-26153: [C++] Share common codes for 
RecordBatchStreamReader and StreamDecoder (#36344)
7ebc88c8fa is described below

commit 7ebc88c8fae62ed97bc30865c845c8061132af7e
Author: Sutou Kouhei <[email protected]>
AuthorDate: Tue Jul 4 09:35:34 2023 +0900

    GH-26153: [C++] Share common codes for RecordBatchStreamReader and 
StreamDecoder (#36344)
    
    ### Rationale for this change
    
    Because they (pull-based and push-based) must have the same behavior.
    
    ### What changes are included in this PR?
    
    This PR extracts reusable codes to StreamDecoderInternal from 
StreamDecoderImpl. External API isn't changed for
    RecordBatchStreamReader and StreamDecoder.
    
    This PR adds some external API to implement this:
    
    * arrow::Status::ToStringWithoutContextLines(): This is only for testing. 
We can get stable result of ASSERT_RAISES_WITH_MESSAGE() with/without 
-DARROW_EXTRA_ERROR_CONTEXT=ON by this.
    
      We can extract this and related changes to separated PR if we want.
    
    * arrow::ipc::Listener::OnRecordBatchWithMetadataDecoded(): Because 
RecordBatchStreamReader wants not only RecordBatch but also custom metadata. 
OnRecordBatchWithMetadataDecoded() receives RecordBatchWithMetadata. 
OnRecordBatchDecoded() still exists and it's used by default for backward 
compatibility.
    
    * arrow::ipc::CollectListener::metadatas(), 
arrow::ipc::CollectListener::num_record_batches(), 
arrow::ipc::CollectListener::PopRecordBatch(), 
arrow::ipc::CollectListener::PopRecordBatchWithMetadat(): If we add these APIs, 
we can use CollectListner in RecordBatchStreamReader. We can create an internal 
listener only for RecordBatchStreamReader if don't want to extend 
CollectListener.
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    Yes.
    
    **This PR includes breaking changes to public APIs.**
    
    `arrow::ipc::CollectListener::record_batches()` returns `const 
std::vector<std::shared_ptr<RecordBatch>>&` instead of 
`std::vector<std::shared_ptr<RecordBatch>>`.
    
    * Closes: #26153
    
    Lead-authored-by: Sutou Kouhei <[email protected]>
    Co-authored-by: Sutou Kouhei <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 cpp/src/arrow/ipc/read_write_test.cc |  25 ++-
 cpp/src/arrow/ipc/reader.cc          | 398 +++++++++++++++--------------------
 cpp/src/arrow/ipc/reader.h           |  56 ++++-
 cpp/src/arrow/status.cc              |  18 ++
 cpp/src/arrow/status.h               |   6 +
 cpp/src/arrow/testing/gtest_util.h   |   6 +-
 6 files changed, 262 insertions(+), 247 deletions(-)

diff --git a/cpp/src/arrow/ipc/read_write_test.cc 
b/cpp/src/arrow/ipc/read_write_test.cc
index 7de81eff7a..ae5fde39d8 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -2082,29 +2082,28 @@ TEST(TestRecordBatchStreamReader, 
NotEnoughDictionaries) {
   // error
   ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());
 
-  auto AssertFailsWith = [](std::shared_ptr<Buffer> stream, const std::string& 
ex_error) {
+  auto Read = [](std::shared_ptr<Buffer> stream) -> Status {
     io::BufferReader reader(stream);
-    ASSERT_OK_AND_ASSIGN(auto ipc_reader, 
RecordBatchStreamReader::Open(&reader));
+    ARROW_ASSIGN_OR_RAISE(auto ipc_reader, 
RecordBatchStreamReader::Open(&reader));
     std::shared_ptr<RecordBatch> batch;
-    Status s = ipc_reader->ReadNext(&batch);
-    ASSERT_TRUE(s.IsInvalid());
-    ASSERT_EQ(ex_error, s.message().substr(0, ex_error.size()));
+    return ipc_reader->ReadNext(&batch);
   };
 
   // Stream terminates before reading all dictionaries
   std::shared_ptr<Buffer> truncated_stream;
   SpliceMessages(buffer, {0, 1}, &truncated_stream);
-  std::string ex_message =
-      ("IPC stream ended without reading the expected number (3)"
-       " of dictionaries");
-  AssertFailsWith(truncated_stream, ex_message);
+  ASSERT_RAISES_WITH_MESSAGE(Invalid,
+                             "Invalid: IPC stream ended without "
+                             "reading the expected number (3) of dictionaries",
+                             Read(truncated_stream));
 
   // One of the dictionaries is missing, then we see a record batch
   SpliceMessages(buffer, {0, 1, 2, 4}, &truncated_stream);
-  ex_message =
-      ("IPC stream did not have the expected number (3) of dictionaries "
-       "at the start of the stream");
-  AssertFailsWith(truncated_stream, ex_message);
+  ASSERT_RAISES_WITH_MESSAGE(Invalid,
+                             "Invalid: IPC stream did not have "
+                             "the expected number (3) of dictionaries "
+                             "at the start of the stream",
+                             Read(truncated_stream));
 }
 
 TEST(TestRecordBatchStreamReader, MalformedInput) {
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 694cc73225..6a5ab4598e 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -849,88 +849,113 @@ Status ReadDictionary(const Message& message, const 
IpcReadContext& context,
   return ReadDictionary(*message.metadata(), context, kind, reader.get());
 }
 
-// ----------------------------------------------------------------------
-// RecordBatchStreamReader implementation
-
-class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
+// Streaming format decoder
+class StreamDecoderInternal : public MessageDecoderListener {
  public:
-  Status Open(std::unique_ptr<MessageReader> message_reader,
-              const IpcReadOptions& options) {
-    message_reader_ = std::move(message_reader);
-    options_ = options;
+  enum State {
+    SCHEMA,
+    INITIAL_DICTIONARIES,
+    RECORD_BATCHES,
+    EOS,
+  };
 
-    // Read schema
-    ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Message> message, ReadNextMessage());
-    if (!message) {
-      return Status::Invalid("Tried reading schema message, was null or length 
0");
-    }
+  explicit StreamDecoderInternal(std::shared_ptr<Listener> listener,
+                                 IpcReadOptions options)
+      : listener_(std::move(listener)),
+        options_(std::move(options)),
+        state_(State::SCHEMA),
+        field_inclusion_mask_(),
+        num_required_initial_dictionaries_(0),
+        num_read_initial_dictionaries_(0),
+        dictionary_memo_(),
+        schema_(nullptr),
+        out_schema_(nullptr),
+        stats_(),
+        swap_endian_(false) {}
 
-    RETURN_NOT_OK(UnpackSchemaMessage(*message, options, &dictionary_memo_, 
&schema_,
-                                      &out_schema_, &field_inclusion_mask_,
-                                      &swap_endian_));
+  Status OnMessageDecoded(std::unique_ptr<Message> message) override {
+    ++stats_.num_messages;
+    switch (state_) {
+      case State::SCHEMA:
+        ARROW_RETURN_NOT_OK(OnSchemaMessageDecoded(std::move(message)));
+        break;
+      case State::INITIAL_DICTIONARIES:
+        
ARROW_RETURN_NOT_OK(OnInitialDictionaryMessageDecoded(std::move(message)));
+        break;
+      case State::RECORD_BATCHES:
+        ARROW_RETURN_NOT_OK(OnRecordBatchMessageDecoded(std::move(message)));
+        break;
+      case State::EOS:
+        break;
+    }
     return Status::OK();
   }
 
-  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
-    ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadNext());
-    *batch = std::move(batch_with_metadata.batch);
-    return Status::OK();
+  Status OnEOS() override {
+    state_ = State::EOS;
+    return listener_->OnEOS();
   }
 
-  Result<RecordBatchWithMetadata> ReadNext() override {
-    if (!have_read_initial_dictionaries_) {
-      RETURN_NOT_OK(ReadInitialDictionaries());
-    }
-
-    RecordBatchWithMetadata batch_with_metadata;
-    if (empty_stream_) {
-      // ARROW-6006: Degenerate case where stream contains no data, we do not
-      // bother trying to read a RecordBatch message from the stream
-      return batch_with_metadata;
-    }
+  Listener* raw_listener() const { return listener_.get(); }
 
-    // Continue to read other dictionaries, if any
-    std::unique_ptr<Message> message;
-    ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
+  std::shared_ptr<Schema> schema() const { return out_schema_; }
 
-    while (message != nullptr && message->type() == 
MessageType::DICTIONARY_BATCH) {
-      RETURN_NOT_OK(ReadDictionary(*message));
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-    }
+  ReadStats stats() const { return stats_; }
 
-    if (message == nullptr) {
-      // End of stream
-      return batch_with_metadata;
-    }
+  State state() const { return state_; }
 
-    CHECK_HAS_BODY(*message);
-    ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
-    IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
-    return ReadRecordBatchInternal(*message->metadata(), schema_, 
field_inclusion_mask_,
-                                   context, reader.get());
+  int num_required_initial_dictionaries() const {
+    return num_required_initial_dictionaries_;
   }
 
-  std::shared_ptr<Schema> schema() const override { return out_schema_; }
-
-  ReadStats stats() const override { return stats_; }
+  int num_read_initial_dictionaries() const { return 
num_read_initial_dictionaries_; }
 
  private:
-  Result<std::unique_ptr<Message>> ReadNextMessage() {
-    ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage());
-    if (message) {
-      ++stats_.num_messages;
-      switch (message->type()) {
-        case MessageType::RECORD_BATCH:
-          ++stats_.num_record_batches;
-          break;
-        case MessageType::DICTIONARY_BATCH:
-          ++stats_.num_dictionary_batches;
-          break;
-        default:
-          break;
-      }
+  Status OnSchemaMessageDecoded(std::unique_ptr<Message> message) {
+    RETURN_NOT_OK(UnpackSchemaMessage(*message, options_, &dictionary_memo_, 
&schema_,
+                                      &out_schema_, &field_inclusion_mask_,
+                                      &swap_endian_));
+
+    num_required_initial_dictionaries_ = dictionary_memo_.fields().num_dicts();
+    num_read_initial_dictionaries_ = 0;
+    if (num_required_initial_dictionaries_ == 0) {
+      state_ = State::RECORD_BATCHES;
+      RETURN_NOT_OK(listener_->OnSchemaDecoded(schema_));
+    } else {
+      state_ = State::INITIAL_DICTIONARIES;
+    }
+    return Status::OK();
+  }
+
+  Status OnInitialDictionaryMessageDecoded(std::unique_ptr<Message> message) {
+    if (message->type() != MessageType::DICTIONARY_BATCH) {
+      return Status::Invalid("IPC stream did not have the expected number (",
+                             num_required_initial_dictionaries_,
+                             ") of dictionaries at the start of the stream");
+    }
+    RETURN_NOT_OK(ReadDictionary(*message));
+    num_read_initial_dictionaries_++;
+    if (num_read_initial_dictionaries_ == num_required_initial_dictionaries_) {
+      state_ = State::RECORD_BATCHES;
+      ARROW_RETURN_NOT_OK(listener_->OnSchemaDecoded(schema_));
+    }
+    return Status::OK();
+  }
+
+  Status OnRecordBatchMessageDecoded(std::unique_ptr<Message> message) {
+    if (message->type() == MessageType::DICTIONARY_BATCH) {
+      return ReadDictionary(*message);
+    } else {
+      CHECK_HAS_BODY(*message);
+      ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
+      IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
+      ARROW_ASSIGN_OR_RAISE(
+          auto batch_with_metadata,
+          ReadRecordBatchInternal(*message->metadata(), schema_, 
field_inclusion_mask_,
+                                  context, reader.get()));
+      ++stats_.num_record_batches;
+      return listener_->OnRecordBatchWithMetadataDecoded(batch_with_metadata);
     }
-    return std::move(message);
   }
 
   // Read dictionary from dictionary batch
@@ -938,6 +963,7 @@ class RecordBatchStreamReaderImpl : public 
RecordBatchStreamReader {
     DictionaryKind kind;
     IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
     RETURN_NOT_OK(::arrow::ipc::ReadDictionary(message, context, &kind));
+    ++stats_.num_dictionary_batches;
     switch (kind) {
       case DictionaryKind::New:
         break;
@@ -951,60 +977,86 @@ class RecordBatchStreamReaderImpl : public 
RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream 
with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, 
then
-          /// it may be that the stream has a schema but no actual data. In 
such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide 
what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the 
expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<Schema> out_schema_;
+  ReadStats stats_;
+  bool swap_endian_;
+};
 
-      if (message->type() != MessageType::DICTIONARY_BATCH) {
-        return Status::Invalid("IPC stream did not have the expected number 
(", num_dicts,
-                               ") of dictionaries at the start of the stream");
-      }
-      RETURN_NOT_OK(ReadDictionary(*message));
+// ----------------------------------------------------------------------
+// RecordBatchStreamReader implementation
+
+class RecordBatchStreamReaderImpl : public RecordBatchStreamReader,
+                                    public StreamDecoderInternal {
+ public:
+  RecordBatchStreamReaderImpl(std::unique_ptr<MessageReader> message_reader,
+                              const IpcReadOptions& options)
+      : RecordBatchStreamReader(),
+        StreamDecoderInternal(std::make_shared<CollectListener>(), options),
+        message_reader_(std::move(message_reader)) {}
+
+  Status Init() {
+    // Read schema
+    ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage());
+    if (!message) {
+      return Status::Invalid("Tried reading schema message, was null or length 
0");
     }
+    return OnMessageDecoded(std::move(message));
+  }
 
-    have_read_initial_dictionaries_ = true;
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadNext());
+    *batch = std::move(batch_with_metadata.batch);
     return Status::OK();
   }
 
-  std::unique_ptr<MessageReader> message_reader_;
-  IpcReadOptions options_;
-  std::vector<bool> field_inclusion_mask_;
-
-  bool have_read_initial_dictionaries_ = false;
-
-  // Flag to set in case where we fail to observe all dictionaries in a stream,
-  // and so the reader should not attempt to parse any messages
-  bool empty_stream_ = false;
+  Result<RecordBatchWithMetadata> ReadNext() override {
+    auto collect_listener = checked_cast<CollectListener*>(raw_listener());
+    while (collect_listener->num_record_batches() == 0 &&
+           state() != StreamDecoderInternal::State::EOS) {
+      ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage());
+      if (!message) {  // End of stream
+        if (state() == StreamDecoderInternal::State::INITIAL_DICTIONARIES) {
+          if (num_read_initial_dictionaries() == 0) {
+            // ARROW-6006: If we fail to find any dictionaries in the
+            // stream, then it may be that the stream has a schema
+            // but no actual data. In such case we communicate that
+            // we were unable to find the dictionaries (but there was
+            // no failure otherwise), so the caller can decide what
+            // to do
+            return RecordBatchWithMetadata{nullptr, nullptr};
+          } else {
+            // ARROW-6126, the stream terminated before receiving the
+            // expected number of dictionaries
+            return Status::Invalid(
+                "IPC stream ended without reading the "
+                "expected number (",
+                num_required_initial_dictionaries(), ") of dictionaries");
+          }
+        } else {
+          return RecordBatchWithMetadata{nullptr, nullptr};
+        }
+      }
+      ARROW_RETURN_NOT_OK(OnMessageDecoded(std::move(message)));
+    }
+    return collect_listener->PopRecordBatchWithMetadata();
+  }
 
-  ReadStats stats_;
+  std::shared_ptr<Schema> schema() const override {
+    return StreamDecoderInternal::schema();
+  }
 
-  DictionaryMemo dictionary_memo_;
-  std::shared_ptr<Schema> schema_, out_schema_;
+  ReadStats stats() const override { return StreamDecoderInternal::stats(); }
 
-  bool swap_endian_;
+ private:
+  std::unique_ptr<MessageReader> message_reader_;
 };
 
 // ----------------------------------------------------------------------
@@ -1013,8 +1065,9 @@ class RecordBatchStreamReaderImpl : public 
RecordBatchStreamReader {
 Result<std::shared_ptr<RecordBatchStreamReader>> RecordBatchStreamReader::Open(
     std::unique_ptr<MessageReader> message_reader, const IpcReadOptions& 
options) {
   // Private ctor
-  auto result = std::make_shared<RecordBatchStreamReaderImpl>();
-  RETURN_NOT_OK(result->Open(std::move(message_reader), options));
+  auto result =
+      std::make_shared<RecordBatchStreamReaderImpl>(std::move(message_reader), 
options);
+  RETURN_NOT_OK(result->Init());
   return result;
 }
 
@@ -1907,46 +1960,17 @@ Status 
Listener::OnRecordBatchDecoded(std::shared_ptr<RecordBatch> record_batch)
   return Status::NotImplemented("OnRecordBatchDecoded() callback isn't 
implemented");
 }
 
-class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener {
- private:
-  enum State {
-    SCHEMA,
-    INITIAL_DICTIONARIES,
-    RECORD_BATCHES,
-    EOS,
-  };
+Status Listener::OnRecordBatchWithMetadataDecoded(
+    RecordBatchWithMetadata record_batch_with_metadata) {
+  return OnRecordBatchDecoded(std::move(record_batch_with_metadata.batch));
+}
 
+class StreamDecoder::StreamDecoderImpl : public StreamDecoderInternal {
  public:
   explicit StreamDecoderImpl(std::shared_ptr<Listener> listener, 
IpcReadOptions options)
-      : listener_(std::move(listener)),
-        options_(std::move(options)),
-        state_(State::SCHEMA),
+      : StreamDecoderInternal(std::move(listener), options),
         message_decoder_(std::shared_ptr<StreamDecoderImpl>(this, [](void*) 
{}),
-                         options_.memory_pool),
-        n_required_dictionaries_(0) {}
-
-  Status OnMessageDecoded(std::unique_ptr<Message> message) override {
-    ++stats_.num_messages;
-    switch (state_) {
-      case State::SCHEMA:
-        ARROW_RETURN_NOT_OK(OnSchemaMessageDecoded(std::move(message)));
-        break;
-      case State::INITIAL_DICTIONARIES:
-        
ARROW_RETURN_NOT_OK(OnInitialDictionaryMessageDecoded(std::move(message)));
-        break;
-      case State::RECORD_BATCHES:
-        ARROW_RETURN_NOT_OK(OnRecordBatchMessageDecoded(std::move(message)));
-        break;
-      case State::EOS:
-        break;
-    }
-    return Status::OK();
-  }
-
-  Status OnEOS() override {
-    state_ = State::EOS;
-    return listener_->OnEOS();
-  }
+                         options.memory_pool) {}
 
   Status Consume(const uint8_t* data, int64_t size) {
     return message_decoder_.Consume(data, size);
@@ -1956,88 +1980,10 @@ class StreamDecoder::StreamDecoderImpl : public 
MessageDecoderListener {
     return message_decoder_.Consume(std::move(buffer));
   }
 
-  std::shared_ptr<Schema> schema() const { return out_schema_; }
-
   int64_t next_required_size() const { return 
message_decoder_.next_required_size(); }
 
-  ReadStats stats() const { return stats_; }
-
  private:
-  Status OnSchemaMessageDecoded(std::unique_ptr<Message> message) {
-    RETURN_NOT_OK(UnpackSchemaMessage(*message, options_, &dictionary_memo_, 
&schema_,
-                                      &out_schema_, &field_inclusion_mask_,
-                                      &swap_endian_));
-
-    n_required_dictionaries_ = dictionary_memo_.fields().num_fields();
-    if (n_required_dictionaries_ == 0) {
-      state_ = State::RECORD_BATCHES;
-      RETURN_NOT_OK(listener_->OnSchemaDecoded(schema_));
-    } else {
-      state_ = State::INITIAL_DICTIONARIES;
-    }
-    return Status::OK();
-  }
-
-  Status OnInitialDictionaryMessageDecoded(std::unique_ptr<Message> message) {
-    if (message->type() != MessageType::DICTIONARY_BATCH) {
-      return Status::Invalid("IPC stream did not have the expected number (",
-                             dictionary_memo_.fields().num_fields(),
-                             ") of dictionaries at the start of the stream");
-    }
-    RETURN_NOT_OK(ReadDictionary(*message));
-    n_required_dictionaries_--;
-    if (n_required_dictionaries_ == 0) {
-      state_ = State::RECORD_BATCHES;
-      ARROW_RETURN_NOT_OK(listener_->OnSchemaDecoded(schema_));
-    }
-    return Status::OK();
-  }
-
-  Status OnRecordBatchMessageDecoded(std::unique_ptr<Message> message) {
-    if (message->type() == MessageType::DICTIONARY_BATCH) {
-      return ReadDictionary(*message);
-    } else {
-      CHECK_HAS_BODY(*message);
-      ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
-      IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
-      ARROW_ASSIGN_OR_RAISE(
-          auto batch_with_metadata,
-          ReadRecordBatchInternal(*message->metadata(), schema_, 
field_inclusion_mask_,
-                                  context, reader.get()));
-      ++stats_.num_record_batches;
-      return 
listener_->OnRecordBatchDecoded(std::move(batch_with_metadata.batch));
-    }
-  }
-
-  // Read dictionary from dictionary batch
-  Status ReadDictionary(const Message& message) {
-    DictionaryKind kind;
-    IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
-    RETURN_NOT_OK(::arrow::ipc::ReadDictionary(message, context, &kind));
-    ++stats_.num_dictionary_batches;
-    switch (kind) {
-      case DictionaryKind::New:
-        break;
-      case DictionaryKind::Delta:
-        ++stats_.num_dictionary_deltas;
-        break;
-      case DictionaryKind::Replacement:
-        ++stats_.num_replaced_dictionaries;
-        break;
-    }
-    return Status::OK();
-  }
-
-  std::shared_ptr<Listener> listener_;
-  const IpcReadOptions options_;
-  State state_;
   MessageDecoder message_decoder_;
-  std::vector<bool> field_inclusion_mask_;
-  int n_required_dictionaries_;
-  DictionaryMemo dictionary_memo_;
-  std::shared_ptr<Schema> schema_, out_schema_;
-  ReadStats stats_;
-  bool swap_endian_;
 };
 
 StreamDecoder::StreamDecoder(std::shared_ptr<Listener> listener, 
IpcReadOptions options) {
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index ad7969b31c..edc2560854 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -251,7 +251,8 @@ class ARROW_EXPORT Listener {
   /// \see StreamDecoder
   virtual Status OnEOS();
 
-  /// \brief Called when a record batch is decoded.
+  /// \brief Called when a record batch is decoded and
+  /// OnRecordBatchWithMetadataDecoded() isn't overrided.
   ///
   /// The default implementation just returns
   /// arrow::Status::NotImplemented().
@@ -262,6 +263,19 @@ class ARROW_EXPORT Listener {
   /// \see StreamDecoder
   virtual Status OnRecordBatchDecoded(std::shared_ptr<RecordBatch> 
record_batch);
 
+  /// \brief Called when a record batch with custom metadata is decoded.
+  ///
+  /// The default implementation just calls OnRecordBatchDecoded()
+  /// without custom metadata.
+  ///
+  /// \param[in] record_batch_with_metadata a record batch with custom
+  /// metadata decoded
+  /// \return Status
+  ///
+  /// \see StreamDecoder
+  virtual Status OnRecordBatchWithMetadataDecoded(
+      RecordBatchWithMetadata record_batch_with_metadata);
+
   /// \brief Called when a schema is decoded.
   ///
   /// The default implementation just returns arrow::Status::OK().
@@ -280,7 +294,7 @@ class ARROW_EXPORT Listener {
 /// \since 0.17.0
 class ARROW_EXPORT CollectListener : public Listener {
  public:
-  CollectListener() : schema_(), record_batches_() {}
+  CollectListener() : schema_(), record_batches_(), metadatas_() {}
   virtual ~CollectListener() = default;
 
   Status OnSchemaDecoded(std::shared_ptr<Schema> schema) override {
@@ -288,8 +302,10 @@ class ARROW_EXPORT CollectListener : public Listener {
     return Status::OK();
   }
 
-  Status OnRecordBatchDecoded(std::shared_ptr<RecordBatch> record_batch) 
override {
-    record_batches_.push_back(std::move(record_batch));
+  Status OnRecordBatchWithMetadataDecoded(
+      RecordBatchWithMetadata record_batch_with_metadata) override {
+    record_batches_.push_back(std::move(record_batch_with_metadata.batch));
+    
metadatas_.push_back(std::move(record_batch_with_metadata.custom_metadata));
     return Status::OK();
   }
 
@@ -297,13 +313,43 @@ class ARROW_EXPORT CollectListener : public Listener {
   std::shared_ptr<Schema> schema() const { return schema_; }
 
   /// \return the all decoded record batches
-  std::vector<std::shared_ptr<RecordBatch>> record_batches() const {
+  const std::vector<std::shared_ptr<RecordBatch>>& record_batches() const {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  const std::vector<std::shared_ptr<KeyValueMetadata>>& metadatas() const {
+    return metadatas_;
+  }
+
+  /// \return the number of collected record batches
+  int64_t num_record_batches() const { return record_batches_.size(); }
+
+  /// \return the last decoded record batch and remove it from
+  /// record_batches
+  std::shared_ptr<RecordBatch> PopRecordBatch() {
+    auto record_batch_with_metadata = PopRecordBatchWithMetadata();
+    return std::move(record_batch_with_metadata.batch);
+  }
+
+  /// \return the last decoded record batch with custom metadata and
+  /// remove it from record_batches
+  RecordBatchWithMetadata PopRecordBatchWithMetadata() {
+    RecordBatchWithMetadata record_batch_with_metadata;
+    if (record_batches_.empty()) {
+      return record_batch_with_metadata;
+    }
+    record_batch_with_metadata.batch = std::move(record_batches_.back());
+    record_batch_with_metadata.custom_metadata = std::move(metadatas_.back());
+    record_batches_.pop_back();
+    metadatas_.pop_back();
+    return record_batch_with_metadata;
+  }
+
  private:
   std::shared_ptr<Schema> schema_;
   std::vector<std::shared_ptr<RecordBatch>> record_batches_;
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas_;
 };
 
 /// \brief Push style stream decoder that receives data from user.
diff --git a/cpp/src/arrow/status.cc b/cpp/src/arrow/status.cc
index 168b05df33..368e03cac0 100644
--- a/cpp/src/arrow/status.cc
+++ b/cpp/src/arrow/status.cc
@@ -120,6 +120,24 @@ std::string Status::ToString() const {
   return result;
 }
 
+std::string Status::ToStringWithoutContextLines() const {
+  auto message = ToString();
+#ifdef ARROW_EXTRA_ERROR_CONTEXT
+  while (true) {
+    auto last_new_line_position = message.rfind("\n");
+    if (last_new_line_position == std::string::npos) {
+      break;
+    }
+    // TODO: We may want to check /:\d+ /
+    if (message.find(":", last_new_line_position) == std::string::npos) {
+      break;
+    }
+    message = message.substr(0, last_new_line_position);
+  }
+#endif
+  return message;
+}
+
 void Status::Abort() const { Abort(std::string()); }
 
 void Status::Abort(const std::string& message) const {
diff --git a/cpp/src/arrow/status.h b/cpp/src/arrow/status.h
index 1b9ba28637..ac384fc389 100644
--- a/cpp/src/arrow/status.h
+++ b/cpp/src/arrow/status.h
@@ -314,6 +314,12 @@ class ARROW_EXPORT [[nodiscard]] Status : public 
util::EqualityComparable<Status
   /// The string "OK" is returned for success.
   std::string ToString() const;
 
+  /// \brief Return a string representation of this status without
+  /// context lines suitable for printing.
+  ///
+  /// The string "OK" is returned for success.
+  std::string ToStringWithoutContextLines() const;
+
   /// \brief Return a string representation of the status code, without the 
message
   /// text or POSIX code information.
   std::string CodeAsString() const;
diff --git a/cpp/src/arrow/testing/gtest_util.h 
b/cpp/src/arrow/testing/gtest_util.h
index 13fc0b3e81..e3838bfe4b 100644
--- a/cpp/src/arrow/testing/gtest_util.h
+++ b/cpp/src/arrow/testing/gtest_util.h
@@ -64,7 +64,7 @@
                     ENUM) ", but got "                                         
       \
              << _st.ToString();                                                
       \
     }                                                                          
       \
-    ASSERT_EQ((message), _st.ToString());                                      
       \
+    ASSERT_EQ((message), _st.ToStringWithoutContextLines());                   
       \
   } while (false)
 
 #define EXPECT_RAISES_WITH_MESSAGE_THAT(ENUM, matcher, expr)                   
          \
@@ -73,7 +73,7 @@
     ::arrow::Status _st = ::arrow::internal::GenericToStatus(_res);            
          \
     EXPECT_TRUE(_st.Is##ENUM()) << "Expected '" ARROW_STRINGIFY(expr) "' to 
fail with "  \
                                 << ARROW_STRINGIFY(ENUM) ", but got " << 
_st.ToString(); \
-    EXPECT_THAT(_st.ToString(), (matcher));                                    
          \
+    EXPECT_THAT(_st.ToStringWithoutContextLines(), (matcher));                 
          \
   } while (false)
 
 #define EXPECT_RAISES_WITH_CODE_AND_MESSAGE_THAT(code, matcher, expr) \
@@ -81,7 +81,7 @@
     auto _res = (expr);                                               \
     ::arrow::Status _st = ::arrow::internal::GenericToStatus(_res);   \
     EXPECT_EQ(_st.CodeAsString(), Status::CodeAsString(code));        \
-    EXPECT_THAT(_st.ToString(), (matcher));                           \
+    EXPECT_THAT(_st.ToStringWithoutContextLines(), (matcher));        \
   } while (false)
 
 #define ASSERT_OK(expr)                                                        
      \

Reply via email to