raulcd commented on issue #45055: URL: https://github.com/apache/arrow/issues/45055#issuecomment-3070043076
I've been playing a little around with this and I am not sure I understand the relation with the client trick mentioned on the `TransportMessageWriter` and the existing issue. I've played around in order to use the same logic that we use on the `IPCFormatWriter::WriteDictionaries`: https://github.com/apache/arrow/blob/2bd2e35a97c28cbe32e542d4ba3b8f64e6592e42/cpp/src/arrow/ipc/writer.cc#L1262-L1301 Where I've been able to make the original python example to work by computing the delta_dictionary storing the last_dictionary, etcetera (as we do on ipc). Am I missing something more? ``` $ python client.py numbers letters 0 10 a 1 11 b 2 12 c 3 13 d 4 14 e ``` This is the diff I've been playing with. It is not ready yet and just the initial investigation but wanted to get some input from @lidavidm as I might be missing part of the problem. ```diff diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc index adbdfb85f2..00f0ac7ae1 100644 --- a/cpp/src/arrow/flight/server.cc +++ b/cpp/src/arrow/flight/server.cc @@ -32,6 +32,7 @@ #include <thread> #include <utility> +#include "arrow/array.h" #include "arrow/device.h" #include "arrow/flight/transport.h" #include "arrow/flight/transport/grpc/grpc_server.h" @@ -50,6 +51,18 @@ namespace { # error "atomic ints and atomic pointers not always lock-free!" #endif +bool HasNestedDict(const ArrayData& data) { + if (data.type->id() == Type::DICTIONARY) { + return true; + } + for (const auto& child : data.child_data) { + if (HasNestedDict(*child)) { + return true; + } + } + return false; +} + using ::arrow::internal::SelfPipe; using ::arrow::internal::SetSignalHandler; using ::arrow::internal::SignalHandler; @@ -324,6 +337,20 @@ class RecordBatchStream::RecordBatchStreamImpl { payload->ipc_message.metadata = nullptr; return Status::OK(); } else { + std::vector<std::pair<int64_t, std::shared_ptr<Array>>> new_dictionaries; + ARROW_ASSIGN_OR_RAISE(new_dictionaries, + ipc::CollectDictionaries(*current_batch_, mapper_)); + + // Apply the same logic as IPC writer + std::vector<std::pair<int64_t, std::shared_ptr<Array>>> delta_dictionaries; + RETURN_NOT_OK(ComputeDeltaDictionaries(new_dictionaries, &delta_dictionaries)); + + if (!delta_dictionaries.empty()) { + dictionaries_ = std::move(delta_dictionaries); + dictionary_index_ = 0; + stage_ = Stage::DICTIONARY; + return GetNextDictionary(payload); + } return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_, &payload->ipc_message); } @@ -332,6 +359,57 @@ class RecordBatchStream::RecordBatchStreamImpl { Status Close() { return reader_->Close(); } private: +Status ComputeDeltaDictionaries( + const std::vector<std::pair<int64_t, std::shared_ptr<Array>>>& new_dictionaries, + std::vector<std::pair<int64_t, std::shared_ptr<Array>>>* delta_dictionaries) { + delta_dictionaries->clear(); + + for (const auto& pair : new_dictionaries) { + int64_t dictionary_id = pair.first; + const auto& dictionary = pair.second; + + // If a dictionary with this id was already emitted, check if it was the same. + auto* last_dictionary = &last_dictionaries_[dictionary_id]; + const bool dictionary_exists = (*last_dictionary != nullptr); + int64_t delta_start = 0; + if (dictionary_exists) { + if ((*last_dictionary)->data() == dictionary->data()) { + // Fast shortcut for a common case. + // Same dictionary data by pointer => no need to emit it again + continue; + } + const int64_t last_length = (*last_dictionary)->length(); + const int64_t new_length = dictionary->length(); + if (new_length == last_length && + ((*last_dictionary)->Equals(dictionary, equal_options_))) { + // Same dictionary by value => no need to emit it again + // (while this can have a CPU cost, this code path is required + // for the IPC file format) + continue; + } + + // (the read path doesn't support outer dictionary deltas, don't emit them) + if (new_length > last_length && + !HasNestedDict(*dictionary->data()) && + ((*last_dictionary) + ->RangeEquals(dictionary, 0, last_length, 0, equal_options_))) { + // New dictionary starts with the current dictionary + delta_start = last_length; + } + } + + if (delta_start) { + auto delta_dict = dictionary->Slice(delta_start); + delta_dictionaries->emplace_back(dictionary_id, delta_dict); + } else { + delta_dictionaries->emplace_back(dictionary_id, dictionary); + } + + // Remember dictionary for next batches + *last_dictionary = dictionary; + } + return Status::OK(); +} Status GetNextDictionary(FlightPayload* payload) { const auto& it = dictionaries_[dictionary_index_++]; return ipc::GetDictionaryPayload(it.first, it.second, ipc_options_, @@ -344,6 +422,8 @@ class RecordBatchStream::RecordBatchStreamImpl { ipc::IpcWriteOptions ipc_options_; std::shared_ptr<RecordBatch> current_batch_; std::vector<std::pair<int64_t, std::shared_ptr<Array>>> dictionaries_; + std::unordered_map<int64_t, std::shared_ptr<Array>> last_dictionaries_; + const EqualOptions equal_options_ = EqualOptions().nans_equal(true); // Index of next dictionary to send int dictionary_index_ = 0; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org