I've been working with IPC files lately so I took another look at this and
it was much easier than I expected.

> This suggests the addition of a new data structure within the
> WriteDictionaries loop, likely as an attribute of IpcFormatWriter
> like `last_dictionaries_` that contains a set of values output for
> each dictionary ID to enable fast lookups? (i.e. avoid O(N^2)).

> Does this like a plausible implementation? If not, is there
> something I have missed/a better suggestion? If so, would
> anyone be willing to help with this?

I'm a little confused by this comment.  last_dictionaries_ is *already* a
structure that contains the set of values output for each dictionary ID.
The rest of your reasoning / explanation matches my understanding as well.

To fix the file writer to output delta dictionaries all we need to do is
move the check around a little[1].  This now matches the spec:

> Further more, it is invalid to have more than onenon-delta dictionary
batch per dictionary ID

For dictionary reading we, again, already have a snazzy DictionaryMemo
which knows how to append delta dictionaries so all we need to do is change
the check around a little[2].

The following simple python example works for me[3].

It's probably too late for the RC0 cutoff but I'll add some test cases on
Monday and will try and get it included should we need to build another RC.

[1]
https://github.com/apache/arrow/pull/12160/files#diff-1b1d9dca9fdea7624e22f017b8762c4919edf57c2cf43c15d59b8a5e8e1b38a5R1092
[2]
https://github.com/apache/arrow/pull/12160/files#diff-e992169684aea9845ac776ada4cbb2b5dc711b49e5a3fbc6046c92299e1aefceR1057
[3] https://gist.github.com/westonpace/31b4e99f94ba03491644174a0c92a620

On Fri, Jan 7, 2022 at 1:18 AM Sam Davis <[email protected]> wrote:

> Following up on this, I've tried to get up to speed on the IPC File Format
> writing of dictionaries and I think the following would work to make the
> code conformant to the IPC File Format specification:
>
> In the WriteDictionaries [1] inner loop, if a dictionary has already been
> output and emit_dictionary_deltas is True then it should work out which
> values in the new Array - containing the current dictionary values - have
> already been output and only emit those that haven't (by creating a new
> Array containing them to pass to the payload creation and writing?).
>
> This suggests the addition of a new data structure within the
> WriteDictionaries loop, likely as an attribute of IpcFormatWriter like
> `last_dictionaries_` that contains a set of values output for each
> dictionary ID to enable fast lookups? (i.e. avoid O(N^2)).
>
> Does this like a plausible implementation? If not, is there something I
> have missed/a better suggestion? If so, would anyone be willing to help
> with this?
>
> [1]
> https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060
>
> ---
>
> If it helps others, my reasoning to get to this point is as follows:
>
> The IpcFormatWriter handles writing record batches to the IPC format
> (non-streaming)
>
>
> https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L982
>
> ```
> class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter {
>  public:
>   // A RecordBatchWriter implementation that writes to a IpcPayloadWriter.
> ```
>
> Writing out a table is - ignoring unified dictionaries - iterating over
> RecordBatchs and writing them out using `WriteRecordBatch`.
>
>
> https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1002
>
> ```
>   Status WriteRecordBatch(const RecordBatch& batch) override {
> ```
>
> One of the first things this does is to write out the dictionaties
> contained
> within the batch:
>
>
> https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1009
>
> ```
>     RETURN_NOT_OK(WriteDictionaries(batch));
> ```
>
> This is what we will need to edit to ensure that dictionary writing is
> handled
> correctly.
>
> Recall that a RecordBatch is a "collection of equal-length arrays matching
> a
> particular Schema. A record batch is a table-like data structure that is
> semantically a sequence of fields, each a contiguous Arrow array".
>
> Each of the fields can be of dictionary type (which can also be nested), so
> there may be more one than one dictionary to write for a batch.
>
> WriteDictionaries is defined further down:
>
>
> https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1056
>
> ```
>   Status WriteDictionaries(const RecordBatch& batch) {
> ```
>
> The first thing it does is call CollectDictionaries:
>
> ```
>     ARROW_ASSIGN_OR_RAISE(const auto dictionaries,
> CollectDictionaries(batch, mapper_));
> ```
>
> `mapper_` is a DictionaryFieldMapper (recall: maps FieldPaths to dictionary
> IDs)
>
> CollectDictionaries is declared in the header here:
>
>
> https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.h#L150
>
> ```
> // For writing: collect dictionary entries to write to the IPC stream, in
> order
> // (i.e. inner dictionaries before dependent outer dictionaries).
> ARROW_EXPORT
> Result<DictionaryVector> CollectDictionaries(const RecordBatch& batch,
>                                              const DictionaryFieldMapper&
> mapper);
> ```
>
> and defined here:
>
>
> https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L383
>
> ```
> Result<DictionaryVector> CollectDictionaries(const RecordBatch& batch,
>                                              const DictionaryFieldMapper&
> mapper) {
>   DictionaryCollector collector{mapper, {}};
>   RETURN_NOT_OK(collector.Collect(batch));
>   return std::move(collector.dictionaries_);
> }
> ```
>
> It creates a DictionaryCollector, passing in the mapper, and then calls
> Collect
> and returning the result.
>
> The DictionaryCollector is defined here:
>
>
> https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L297
>
> ```
> struct DictionaryCollector {
>   const DictionaryFieldMapper& mapper_;
>   DictionaryVector dictionaries_;
> ```
>
> with `Collect` defined here:
>
>
> https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L336
>
> ```
>   Status Collect(const RecordBatch& batch) {
>     FieldPosition position;
>     const Schema& schema = *batch.schema();
>     dictionaries_.reserve(mapper_.num_fields());
>
>     for (int i = 0; i < schema.num_fields(); ++i) {
>       RETURN_NOT_OK(Visit(position.child(i), schema.field(i),
> batch.column(i).get()));
>     }
>     return Status::OK();
>   }
> ```
>
> FieldPosition is, it seems, a reverse LinkedList (each node stores a
> pointer to
> its parent) where the nodes record their index (like FieldPath) and depth.
> It
> has a method to walk the list and construct a vector containing the
> indices.
>
> It then grabs a reference to the schema and reserves enough space in
> dictionaries - recall: a vector of tuples (int, Array) - for all the fields
> (optimisation to reduce allocations?).
>
> Then for each field in the schema, it calls `Visit` with a new
> FieldPosition
> (via the call to `position.child(i)`, the field at position i in the
> schema,
> and the RecordBatch data.
>
> Visit is defined here:
>
>
> https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L310
>
> ```
>   Status Visit(const FieldPosition& position, const
> std::shared_ptr<Field>& field, const Array* array) {
> ```
>
> The result of this is that `dictionaries_` contains all the tuples of (id,
> Array) where array contains the dictionary values (e.g. strings).
>
> Going right back up the stack to `WriteDictionaries` we now iterate over
> each
> of these dictionaries:
>
>
> https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060
>
> ```
>     for (const auto& pair : dictionaries) {
>       int64_t dictionary_id = pair.first;
>       const auto& dictionary = pair.second;
> ```
>
> If a dictionary with this ID has already been output, then the code checks
> whether it is the same to avoid writing repeats twice.
>
> ```
>       // If a dictionary with this id was already emitted, check if it was
> the same.
>       auto* last_dictionary = &last_dictionaries_[dictionary_id];
>       const bool dictionary_exists = (*last_dictionary != nullptr);
>       int64_t delta_start = 0;
>       if (dictionary_exists) {
>         if ((*last_dictionary)->data() == dictionary->data()) {
>           // Fast shortcut for a common case.
>           // Same dictionary data by pointer => no need to emit it again
>           continue;
>         }
>         const int64_t last_length = (*last_dictionary)->length();
>         const int64_t new_length = dictionary->length();
>         if (new_length == last_length &&
>             ((*last_dictionary)->Equals(dictionary, equal_options))) {
>           // Same dictionary by value => no need to emit it again
>           // (while this can have a CPU cost, this code path is required
>           //  for the IPC file format)
>           continue;
>         }
> ```
>
> If either of these equality conditions trigger, then we continue on to
> check
> the next dictionary.
>
> If it hasn't been previously output then we might either be able to write a
> replacement or a delta.
>
> Currently the code throws an error in both of these cases if we're writing
> to
> the IPC file format - this goes against the spec.
>
> If the current dictionary is "just" an extension of the previous
> dictionary and
> deltas are enabled then it marks where the new starting position is.
>
> Given this, it creates a payload - delta or not - and writes it, ensuring
> to
> update the stats data structure.
>
> ```
>       IpcPayload payload;
>       if (delta_start) {
>         RETURN_NOT_OK(GetDictionaryPayload(dictionary_id,
> /*is_delta=*/true,
>                                            dictionary->Slice(delta_start),
> options_,
>                                            &payload));
>       } else {
>         RETURN_NOT_OK(
>             GetDictionaryPayload(dictionary_id, dictionary, options_,
> &payload));
>       }
>       RETURN_NOT_OK(WritePayload(payload));
>       ++stats_.num_dictionary_batches;
>       if (dictionary_exists) {
>         if (delta_start) {
>           ++stats_.num_dictionary_deltas;
>         } else {
>           ++stats_.num_replaced_dictionaries;
>         }
>       }
> ```
>
> <https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060>
> arrow/writer.cc at 91e3ac53e2e21736ce6291d73fc37da6fa21259d · apache/arrow
> <https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060>
> Apache Arrow is a multi-language toolbox for accelerated data interchange
> and in-memory processing - arrow/writer.cc at
> 91e3ac53e2e21736ce6291d73fc37da6fa21259d · apache/arrow
> github.com
> **
>
> ------------------------------
> *From:* Weston Pace <[email protected]>
> *Sent:* 27 July 2021 19:52
> *To:* [email protected] <[email protected]>
> *Subject:* Re: [PyArrow] DictionaryArray isDelta Support
>
> I'm able to verify what Sam is seeing. It appears Arrow does not
> support dictionary deltas in the file format. However, from my
> reading of the spec it does indeed seem proper deltas should be
> allowed. A small patch[1] allowed me to write delta dictionaries in
> the file format but then reading fails at [2] which seems more
> explicit that dictionary deltas of any kind were not originally
> supported with the file format. I think the fix for read will
> probably be a bit more involved and require some good testing. I've
> opened ARROW-13467 for further discussion.
>
> [1]
> https://github.com/apache/arrow/commit/54df8581a9d825664785fc406950204e345a5b3b
> [2]
> https://github.com/apache/arrow/blob/81ff679c47754692224f655dab32cc0936bb5f55/cpp/src/arrow/ipc/reader.cc#L1062
> [3] https://issues.apache.org/jira/browse/ARROW-13467
>
> On Sun, Jul 25, 2021 at 10:12 PM Sam Davis <[email protected]>
> wrote:
> >
> > Hi Wes,
> >
> > Yes, that is exactly it. For the file format, the spec dictates that it
> should be possible to output deltas but currently this is not possible. An
> `ArrowInvalid` error is thrown.
> >
> > Example code:
> >
> > ```
> > import pandas as pd
> > import pyarrow as pa
> >
> > print(pa.__version__)
> >
> > schema = pa.schema([
> > ("foo", pa.dictionary(pa.int16(), pa.string()))
> > ])
> >
> > pd1 = pd.DataFrame({"foo": pd.Categorical(["a"], categories=["a", "b"])})
> > b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
> >
> > pd2 = pd.DataFrame({"foo": pd.Categorical(["a", "bbbb"],
> categories=["a", "b", "bbbb"])})
> > b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
> >
> > options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
> >
> > with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=schema,
> options=options) as writer:
> > writer.write(b1)
> >
> > writer.write(b2)
> > ```
> >
> > Best,
> >
> > Sam
> > ________________________________
> > From: Wes McKinney <[email protected]>
> > Sent: 24 July 2021 01:43
> > To: [email protected] <[email protected]>
> > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> >
> > If I'm interpreting you correctly, the issue is that every dictionary
> > must be a prefix of a common dictionary for the delta logic to work.
> > So if the first batch has
> >
> > "a", "b"
> >
> > then in the next batch
> >
> > "a", "b", "c" is OK and will emit a delta
> > "b", "a", "c" is not and will trigger this error
> >
> > If we wanted to allow for deltas coming from unordered dictionaries as
> > an option, that could be implemented in theory but it not super
> > trivial
> >
> > On Fri, Jul 23, 2021 at 9:25 AM Sam Davis <[email protected]>
> wrote:
> > >
> > > For reference, I think this check in the C++ code triggers regardless
> of whether the delta option is turned on:
> > >
> > >
> https://github.com/apache/arrow/blob/e0401123736c85283e527797a113a3c38c0915f2/cpp/src/arrow/ipc/writer.cc#L1066
> > > ________________________________
> > > From: Sam Davis <[email protected]>
> > > Sent: 23 July 2021 14:43
> > > To: [email protected] <[email protected]>
> > > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> > >
> > > Yes I know this as quoted in the spec. What I am wondering is for the
> file format how can I write deltas out using PyArrow?
> > >
> > > The previous example was a trivial version of reality.
> > >
> > > More concretely, say I want to write 100e6 rows out in multiple
> RecordBatches to a non-streaming file format using PyArrow. I do not want
> to do a complete pass ahead of time to compute the full set of strings for
> the relevant columns and would therefore like to dump out deltas when new
> strings appear in a given column. Is this possible?
> > >
> > > In the example code ideally this would "just" add on the delta
> containing the dictionary difference of it and the previous batches. I'm
> happy as a user to maintain the full set of categories seen thus far and
> tell PyArrow what the delta is if necessary.
> > > ________________________________
> > > From: Wes McKinney <[email protected]>
> > > Sent: 23 July 2021 14:36
> > > To: [email protected] <[email protected]>
> > > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> > >
> > > Dictionary replacements aren't supported in the file format, only
> > > deltas. Your use case is a replacement, not a delta. You could use the
> > > stream format instead.
> > >
> > > On Fri, Jul 23, 2021 at 8:32 AM Sam Davis <[email protected]>
> wrote:
> > > >
> > > > Hey Wes,
> > > >
> > > > Thanks, I had not spotted this before! It doesn't seem to change the
> behaviour with `pa.ipc.new_file` however. Maybe I'm using it incorrectly?
> > > >
> > > > ```
> > > > import pandas as pd
> > > > import pyarrow as pa
> > > >
> > > > print(pa.__version__)
> > > >
> > > > schema = pa.schema([
> > > > ("foo", pa.dictionary(pa.int16(), pa.string()))
> > > > ])
> > > >
> > > > pd1 = pd.DataFrame({"foo": pd.Categorical(["aaaa"],
> categories=["a"*i for i in range(64)])})
> > > > b1 = pa.RecordBatch.from_pandas(pd1, schema=schema)
> > > >
> > > > pd2 = pd.DataFrame({"foo": pd.Categorical(["aaaa"],
> categories=["b"*i for i in range(64)])})
> > > > b2 = pa.RecordBatch.from_pandas(pd2, schema=schema)
> > > >
> > > > options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
> > > >
> > > > with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=b1.schema,
> options=options) as writer:
> > > > writer.write(b1)
> > > > writer.write(b2)
> > > > ```
> > > >
> > > > Version printed: 4.0.1
> > > >
> > > > Sam
> > > > ________________________________
> > > > From: Wes McKinney <[email protected]>
> > > > Sent: 23 July 2021 14:24
> > > > To: [email protected] <[email protected]>
> > > > Subject: Re: [PyArrow] DictionaryArray isDelta Support
> > > >
> > > > hi Sam
> > > >
> > > > On Fri, Jul 23, 2021 at 8:15 AM Sam Davis <
> [email protected]> wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > We want to write out RecordBatches of data, where one or more
> columns in a batch has a `pa.string()` column encoded as a
> `pa.dictionary(pa.intX(), pa.string()` as the column only contains a
> handful of unique values.
> > > > >
> > > > > However, PyArrow seems to lack support for writing these batches
> out to either the streaming or (non-streaming) file format.
> > > > >
> > > > > When attempting to write two distinct batches the following error
> message is triggered:
> > > > >
> > > > > > ArrowInvalid: Dictionary replacement detected when writing IPC
> file format. Arrow IPC files only support a single dictionary for a given
> field across all batches.
> > > > >
> > > > > I believe this message is false and that support is possible based
> on reading the spec:
> > > > >
> > > > > > Dictionaries are written in the stream and file formats as a
> sequence of record batches...
> > > > > > ...
> > > > > > The dictionary isDelta flag allows existing dictionaries to be
> expanded for future record batch materializations. A dictionary batch with
> isDelta set indicates that its vector should be concatenated with those of
> any previous batches with the same id. In a stream which encodes one
> column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with
> a delta dictionary batch could take the form:
> > > > >
> > > > > ```
> > > > > <SCHEMA>
> > > > > <DICTIONARY 0>
> > > > > (0) "A"
> > > > > (1) "B"
> > > > > (2) "C"
> > > > >
> > > > > <RECORD BATCH 0>
> > > > > 0
> > > > > 1
> > > > > 2
> > > > > 1
> > > > >
> > > > > <DICTIONARY 0 DELTA>
> > > > > (3) "D"
> > > > > (4) "E"
> > > > >
> > > > > <RECORD BATCH 1>
> > > > > 3
> > > > > 2
> > > > > 4
> > > > > 0
> > > > > EOS
> > > > > ```
> > > > >
> > > > > > Alternatively, if isDelta is set to false, then the dictionary
> replaces the existing dictionary for the same ID. Using the same example as
> above, an alternate encoding could be:
> > > > >
> > > > > ```
> > > > > <SCHEMA>
> > > > > <DICTIONARY 0>
> > > > > (0) "A"
> > > > > (1) "B"
> > > > > (2) "C"
> > > > >
> > > > > <RECORD BATCH 0>
> > > > > 0
> > > > > 1
> > > > > 2
> > > > > 1
> > > > >
> > > > > <DICTIONARY 0>
> > > > > (0) "A"
> > > > > (1) "C"
> > > > > (2) "D"
> > > > > (3) "E"
> > > > >
> > > > > <RECORD BATCH 1>
> > > > > 2
> > > > > 1
> > > > > 3
> > > > > 0
> > > > > EOS
> > > > > ```
> > > > >
> > > > > It also specifies in the IPC File Format (non-streaming) section:
> > > > >
> > > > > > In the file format, there is no requirement that dictionary keys
> should be defined in a DictionaryBatch before they are used in a
> RecordBatch, as long as the keys are defined somewhere in the file. Further
> more, it is invalid to have more than one non-delta dictionary batch per
> dictionary ID (i.e. dictionary replacement is not supported). Delta
> dictionaries are applied in the order they appear in the file footer.
> > > > >
> > > > > So for the non-streaming format multiple non-delta dictionaries
> are not supported but one non-delta followed by delta dictionaries should
> be.
> > > > >
> > > > > Is it possible to do this in PyArrow? If so, how? If not, how easy
> would it be to add? Is it currently possible via C++ and therefore can I
> write a Cython or similar extension that will let me do this now without
> waiting for a release?
> > > > >
> > > >
> > > > In pyarrow (3.0.0 or later), you need to opt into emitting dictionary
> > > > deltas using pyarrow.ipc.IpcWriteOptions. Can you show your code?
> > > >
> > > >
> https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc
> > > >
> > > > > Best,
> > > > >
> > > > > Sam
> > > > > IMPORTANT NOTICE: The information transmitted is intended only for
> the person or entity to which it is addressed and may contain confidential
> and/or privileged material. Any review, re-transmission, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient is prohibited. If you
> received this in error, please contact the sender and delete the material
> from any computer. Although we routinely screen for viruses, addressees
> should check this e-mail and any attachment for viruses. We make no
> warranty as to absence of viruses in this e-mail or any attachments.
> > > > IMPORTANT NOTICE: The information transmitted is intended only for
> the person or entity to which it is addressed and may contain confidential
> and/or privileged material. Any review, re-transmission, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient is prohibited. If you
> received this in error, please contact the sender and delete the material
> from any computer. Although we routinely screen for viruses, addressees
> should check this e-mail and any attachment for viruses. We make no
> warranty as to absence of viruses in this e-mail or any attachments.
> > > IMPORTANT NOTICE: The information transmitted is intended only for the
> person or entity to which it is addressed and may contain confidential
> and/or privileged material. Any review, re-transmission, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient is prohibited. If you
> received this in error, please contact the sender and delete the material
> from any computer. Although we routinely screen for viruses, addressees
> should check this e-mail and any attachment for viruses. We make no
> warranty as to absence of viruses in this e-mail or any attachments.
> > IMPORTANT NOTICE: The information transmitted is intended only for the
> person or entity to which it is addressed and may contain confidential
> and/or privileged material. Any review, re-transmission, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient is prohibited. If you
> received this in error, please contact the sender and delete the material
> from any computer. Although we routinely screen for viruses, addressees
> should check this e-mail and any attachment for viruses. We make no
> warranty as to absence of viruses in this e-mail or any attachments.
> IMPORTANT NOTICE: The information transmitted is intended only for the
> person or entity to which it is addressed and may contain confidential
> and/or privileged material. Any review, re-transmission, dissemination or
> other use of, or taking of any action in reliance upon, this information by
> persons or entities other than the intended recipient is prohibited. If you
> received this in error, please contact the sender and delete the material
> from any computer. Although we routinely screen for viruses, addressees
> should check this e-mail and any attachment for viruses. We make no
> warranty as to absence of viruses in this e-mail or any attachments.
>

Reply via email to