This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 41e0879586 GH-36407: [C++] Add
arrow::ipc::Listener::OnSchemaDecoded(schema, filtered_schema) (#36533)
41e0879586 is described below
commit 41e08795864b96f8fbbc9c42975d82e3bb0bb6d1
Author: Sutou Kouhei <[email protected]>
AuthorDate: Sat Jul 8 03:50:15 2023 +0900
GH-36407: [C++] Add arrow::ipc::Listener::OnSchemaDecoded(schema,
filtered_schema) (#36533)
### Rationale for this change
The current `arrow::ipc::Listener::OnSchemaDecoded()` receives only the
original schema. But we have filtered schema in decode phase. Filtered schema
is useful for some cases. So we should pass the filtered schema too.
### What changes are included in this PR?
This adds `arrow::ipc::Listener::OnSchemaDecoded(schema, filtered_schema)`.
The default implementation of it just calls `OnSchemaDecoded(schema)` to keep
backward compatibility.
This also adds `arrow::ipc::CollectListener::filtered_schema()`.
### Are these changes tested?
Yes.
### Are there any user-facing changes?
Yes.
* Closes: #36407
Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
---
cpp/src/arrow/ipc/reader.cc | 17 +++++++++++------
cpp/src/arrow/ipc/reader.h | 27 +++++++++++++++++++++++++--
2 files changed, 36 insertions(+), 8 deletions(-)
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 6bd9d33023..eadba69f05 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -871,7 +871,7 @@ class StreamDecoderInternal : public MessageDecoderListener
{
num_read_initial_dictionaries_(0),
dictionary_memo_(),
schema_(nullptr),
- out_schema_(nullptr),
+ filtered_schema_(nullptr),
stats_(),
swap_endian_(false) {}
@@ -900,7 +900,7 @@ class StreamDecoderInternal : public MessageDecoderListener
{
Listener* raw_listener() const { return listener_.get(); }
- std::shared_ptr<Schema> schema() const { return out_schema_; }
+ std::shared_ptr<Schema> schema() const { return filtered_schema_; }
ReadStats stats() const { return stats_; }
@@ -915,14 +915,14 @@ class StreamDecoderInternal : public
MessageDecoderListener {
private:
Status OnSchemaMessageDecoded(std::unique_ptr<Message> message) {
RETURN_NOT_OK(UnpackSchemaMessage(*message, options_, &dictionary_memo_,
&schema_,
- &out_schema_, &field_inclusion_mask_,
+ &filtered_schema_,
&field_inclusion_mask_,
&swap_endian_));
num_required_initial_dictionaries_ = dictionary_memo_.fields().num_dicts();
num_read_initial_dictionaries_ = 0;
if (num_required_initial_dictionaries_ == 0) {
state_ = State::RECORD_BATCHES;
- RETURN_NOT_OK(listener_->OnSchemaDecoded(schema_));
+ RETURN_NOT_OK(listener_->OnSchemaDecoded(schema_, filtered_schema_));
} else {
state_ = State::INITIAL_DICTIONARIES;
}
@@ -939,7 +939,7 @@ class StreamDecoderInternal : public MessageDecoderListener
{
num_read_initial_dictionaries_++;
if (num_read_initial_dictionaries_ == num_required_initial_dictionaries_) {
state_ = State::RECORD_BATCHES;
- ARROW_RETURN_NOT_OK(listener_->OnSchemaDecoded(schema_));
+ ARROW_RETURN_NOT_OK(listener_->OnSchemaDecoded(schema_,
filtered_schema_));
}
return Status::OK();
}
@@ -987,7 +987,7 @@ class StreamDecoderInternal : public MessageDecoderListener
{
int num_read_initial_dictionaries_;
DictionaryMemo dictionary_memo_;
std::shared_ptr<Schema> schema_;
- std::shared_ptr<Schema> out_schema_;
+ std::shared_ptr<Schema> filtered_schema_;
ReadStats stats_;
bool swap_endian_;
};
@@ -1959,6 +1959,11 @@ Status Listener::OnEOS() { return Status::OK(); }
Status Listener::OnSchemaDecoded(std::shared_ptr<Schema> schema) { return
Status::OK(); }
+Status Listener::OnSchemaDecoded(std::shared_ptr<Schema> schema,
+ std::shared_ptr<Schema> filtered_schema) {
+ return OnSchemaDecoded(std::move(schema));
+}
+
Status Listener::OnRecordBatchDecoded(std::shared_ptr<RecordBatch>
record_batch) {
return Status::NotImplemented("OnRecordBatchDecoded() callback isn't
implemented");
}
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index edc2560854..1698abd14b 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -273,6 +273,8 @@ class ARROW_EXPORT Listener {
/// \return Status
///
/// \see StreamDecoder
+ ///
+ /// \since 13.0.0
virtual Status OnRecordBatchWithMetadataDecoded(
RecordBatchWithMetadata record_batch_with_metadata);
@@ -285,6 +287,21 @@ class ARROW_EXPORT Listener {
///
/// \see StreamDecoder
virtual Status OnSchemaDecoded(std::shared_ptr<Schema> schema);
+
+ /// \brief Called when a schema is decoded.
+ ///
+ /// The default implementation just calls OnSchemaDecoded(schema)
+ /// (without filtered_schema) to keep backward compatibility.
+ ///
+ /// \param[in] schema a schema decoded
+ /// \param[in] filtered_schema a filtered schema that only has read fields
+ /// \return Status
+ ///
+ /// \see StreamDecoder
+ ///
+ /// \since 13.0.0
+ virtual Status OnSchemaDecoded(std::shared_ptr<Schema> schema,
+ std::shared_ptr<Schema> filtered_schema);
};
/// \brief Collect schema and record batches decoded by StreamDecoder.
@@ -294,11 +311,13 @@ class ARROW_EXPORT Listener {
/// \since 0.17.0
class ARROW_EXPORT CollectListener : public Listener {
public:
- CollectListener() : schema_(), record_batches_(), metadatas_() {}
+ CollectListener() : schema_(), filtered_schema_(), record_batches_(),
metadatas_() {}
virtual ~CollectListener() = default;
- Status OnSchemaDecoded(std::shared_ptr<Schema> schema) override {
+ Status OnSchemaDecoded(std::shared_ptr<Schema> schema,
+ std::shared_ptr<Schema> filtered_schema) override {
schema_ = std::move(schema);
+ filtered_schema_ = std::move(filtered_schema);
return Status::OK();
}
@@ -312,6 +331,9 @@ class ARROW_EXPORT CollectListener : public Listener {
/// \return the decoded schema
std::shared_ptr<Schema> schema() const { return schema_; }
+ /// \return the filtered schema
+ std::shared_ptr<Schema> filtered_schema() const { return filtered_schema_; }
+
/// \return the all decoded record batches
const std::vector<std::shared_ptr<RecordBatch>>& record_batches() const {
return record_batches_;
@@ -348,6 +370,7 @@ class ARROW_EXPORT CollectListener : public Listener {
private:
std::shared_ptr<Schema> schema_;
+ std::shared_ptr<Schema> filtered_schema_;
std::vector<std::shared_ptr<RecordBatch>> record_batches_;
std::vector<std::shared_ptr<KeyValueMetadata>> metadatas_;
};