raulcd commented on issue #45055:
URL: https://github.com/apache/arrow/issues/45055#issuecomment-3070043076

   I've been playing a little around with this and I am not sure I understand 
the relation with the client trick mentioned on the `TransportMessageWriter` 
and the existing issue.
   I've played around in order to use the same logic that we use on the 
`IPCFormatWriter::WriteDictionaries`:
   
https://github.com/apache/arrow/blob/2bd2e35a97c28cbe32e542d4ba3b8f64e6592e42/cpp/src/arrow/ipc/writer.cc#L1262-L1301
   
   Where I've been able to make the original python example to work by 
computing the delta_dictionary storing the last_dictionary, etcetera (as we do 
on ipc).
   
   Am I missing something more?
   
   ```
   $ python client.py 
      numbers letters
   0       10       a
   1       11       b
   2       12       c
   3       13       d
   4       14       e
   ```
   
   This is the diff I've been playing with. It is not ready yet and just the 
initial investigation but wanted to get some input from @lidavidm as I might be 
missing part of the problem.
   
   ```diff
   diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
   index adbdfb85f2..00f0ac7ae1 100644
   --- a/cpp/src/arrow/flight/server.cc
   +++ b/cpp/src/arrow/flight/server.cc
   @@ -32,6 +32,7 @@
    #include <thread>
    #include <utility>
    
   +#include "arrow/array.h"
    #include "arrow/device.h"
    #include "arrow/flight/transport.h"
    #include "arrow/flight/transport/grpc/grpc_server.h"
   @@ -50,6 +51,18 @@ namespace {
    #  error "atomic ints and atomic pointers not always lock-free!"
    #endif
    
   +bool HasNestedDict(const ArrayData& data) {
   +  if (data.type->id() == Type::DICTIONARY) {
   +    return true;
   +  }
   +  for (const auto& child : data.child_data) {
   +    if (HasNestedDict(*child)) {
   +      return true;
   +    }
   +  }
   +  return false;
   +}
   +
    using ::arrow::internal::SelfPipe;
    using ::arrow::internal::SetSignalHandler;
    using ::arrow::internal::SignalHandler;
   @@ -324,6 +337,20 @@ class RecordBatchStream::RecordBatchStreamImpl {
          payload->ipc_message.metadata = nullptr;
          return Status::OK();
        } else {
   +      std::vector<std::pair<int64_t, std::shared_ptr<Array>>> 
new_dictionaries;
   +      ARROW_ASSIGN_OR_RAISE(new_dictionaries,
   +                            ipc::CollectDictionaries(*current_batch_, 
mapper_));
   +
   +      // Apply the same logic as IPC writer
   +      std::vector<std::pair<int64_t, std::shared_ptr<Array>>> 
delta_dictionaries;
   +      RETURN_NOT_OK(ComputeDeltaDictionaries(new_dictionaries, 
&delta_dictionaries));
   +
   +      if (!delta_dictionaries.empty()) {
   +        dictionaries_ = std::move(delta_dictionaries);
   +        dictionary_index_ = 0;
   +        stage_ = Stage::DICTIONARY;
   +        return GetNextDictionary(payload);
   +      }
          return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_,
                                            &payload->ipc_message);
        }
   @@ -332,6 +359,57 @@ class RecordBatchStream::RecordBatchStreamImpl {
      Status Close() { return reader_->Close(); }
    
     private:
   +Status ComputeDeltaDictionaries(
   +    const std::vector<std::pair<int64_t, std::shared_ptr<Array>>>& 
new_dictionaries,
   +    std::vector<std::pair<int64_t, std::shared_ptr<Array>>>* 
delta_dictionaries) {
   +  delta_dictionaries->clear();
   +
   +  for (const auto& pair : new_dictionaries) {
   +    int64_t dictionary_id = pair.first;
   +    const auto& dictionary = pair.second;
   +
   +    // If a dictionary with this id was already emitted, check if it was 
the same.
   +    auto* last_dictionary = &last_dictionaries_[dictionary_id];
   +    const bool dictionary_exists = (*last_dictionary != nullptr);
   +    int64_t delta_start = 0;
   +    if (dictionary_exists) {
   +      if ((*last_dictionary)->data() == dictionary->data()) {
   +        // Fast shortcut for a common case.
   +        // Same dictionary data by pointer => no need to emit it again
   +        continue;
   +      }
   +      const int64_t last_length = (*last_dictionary)->length();
   +      const int64_t new_length = dictionary->length();
   +      if (new_length == last_length &&
   +          ((*last_dictionary)->Equals(dictionary, equal_options_))) {
   +        // Same dictionary by value => no need to emit it again
   +        // (while this can have a CPU cost, this code path is required
   +        //  for the IPC file format)
   +        continue;
   +      }
   +
   +      // (the read path doesn't support outer dictionary deltas, don't emit 
them)
   +      if (new_length > last_length &&
   +          !HasNestedDict(*dictionary->data()) &&
   +          ((*last_dictionary)
   +               ->RangeEquals(dictionary, 0, last_length, 0, 
equal_options_))) {
   +        // New dictionary starts with the current dictionary
   +        delta_start = last_length;
   +      }
   +    }
   +
   +    if (delta_start) {
   +      auto delta_dict = dictionary->Slice(delta_start);
   +      delta_dictionaries->emplace_back(dictionary_id, delta_dict);
   +    } else {
   +      delta_dictionaries->emplace_back(dictionary_id, dictionary);
   +    }
   +
   +    // Remember dictionary for next batches
   +    *last_dictionary = dictionary;
   +  }
   +  return Status::OK();
   +}
      Status GetNextDictionary(FlightPayload* payload) {
        const auto& it = dictionaries_[dictionary_index_++];
        return ipc::GetDictionaryPayload(it.first, it.second, ipc_options_,
   @@ -344,6 +422,8 @@ class RecordBatchStream::RecordBatchStreamImpl {
      ipc::IpcWriteOptions ipc_options_;
      std::shared_ptr<RecordBatch> current_batch_;
      std::vector<std::pair<int64_t, std::shared_ptr<Array>>> dictionaries_;
   +  std::unordered_map<int64_t, std::shared_ptr<Array>> last_dictionaries_;
   +  const EqualOptions equal_options_ = EqualOptions().nans_equal(true);
    
      // Index of next dictionary to send
      int dictionary_index_ = 0;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to