I've been working with IPC files lately so I took another look at this and it was much easier than I expected.
> This suggests the addition of a new data structure within the > WriteDictionaries loop, likely as an attribute of IpcFormatWriter > like `last_dictionaries_` that contains a set of values output for > each dictionary ID to enable fast lookups? (i.e. avoid O(N^2)). > Does this like a plausible implementation? If not, is there > something I have missed/a better suggestion? If so, would > anyone be willing to help with this? I'm a little confused by this comment. last_dictionaries_ is *already* a structure that contains the set of values output for each dictionary ID. The rest of your reasoning / explanation matches my understanding as well. To fix the file writer to output delta dictionaries all we need to do is move the check around a little[1]. This now matches the spec: > Further more, it is invalid to have more than onenon-delta dictionary batch per dictionary ID For dictionary reading we, again, already have a snazzy DictionaryMemo which knows how to append delta dictionaries so all we need to do is change the check around a little[2]. The following simple python example works for me[3]. It's probably too late for the RC0 cutoff but I'll add some test cases on Monday and will try and get it included should we need to build another RC. [1] https://github.com/apache/arrow/pull/12160/files#diff-1b1d9dca9fdea7624e22f017b8762c4919edf57c2cf43c15d59b8a5e8e1b38a5R1092 [2] https://github.com/apache/arrow/pull/12160/files#diff-e992169684aea9845ac776ada4cbb2b5dc711b49e5a3fbc6046c92299e1aefceR1057 [3] https://gist.github.com/westonpace/31b4e99f94ba03491644174a0c92a620 On Fri, Jan 7, 2022 at 1:18 AM Sam Davis <[email protected]> wrote: > Following up on this, I've tried to get up to speed on the IPC File Format > writing of dictionaries and I think the following would work to make the > code conformant to the IPC File Format specification: > > In the WriteDictionaries [1] inner loop, if a dictionary has already been > output and emit_dictionary_deltas is True then it should work out which > values in the new Array - containing the current dictionary values - have > already been output and only emit those that haven't (by creating a new > Array containing them to pass to the payload creation and writing?). > > This suggests the addition of a new data structure within the > WriteDictionaries loop, likely as an attribute of IpcFormatWriter like > `last_dictionaries_` that contains a set of values output for each > dictionary ID to enable fast lookups? (i.e. avoid O(N^2)). > > Does this like a plausible implementation? If not, is there something I > have missed/a better suggestion? If so, would anyone be willing to help > with this? > > [1] > https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060 > > --- > > If it helps others, my reasoning to get to this point is as follows: > > The IpcFormatWriter handles writing record batches to the IPC format > (non-streaming) > > > https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L982 > > ``` > class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { > public: > // A RecordBatchWriter implementation that writes to a IpcPayloadWriter. > ``` > > Writing out a table is - ignoring unified dictionaries - iterating over > RecordBatchs and writing them out using `WriteRecordBatch`. > > > https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1002 > > ``` > Status WriteRecordBatch(const RecordBatch& batch) override { > ``` > > One of the first things this does is to write out the dictionaties > contained > within the batch: > > > https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1009 > > ``` > RETURN_NOT_OK(WriteDictionaries(batch)); > ``` > > This is what we will need to edit to ensure that dictionary writing is > handled > correctly. > > Recall that a RecordBatch is a "collection of equal-length arrays matching > a > particular Schema. A record batch is a table-like data structure that is > semantically a sequence of fields, each a contiguous Arrow array". > > Each of the fields can be of dictionary type (which can also be nested), so > there may be more one than one dictionary to write for a batch. > > WriteDictionaries is defined further down: > > > https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1056 > > ``` > Status WriteDictionaries(const RecordBatch& batch) { > ``` > > The first thing it does is call CollectDictionaries: > > ``` > ARROW_ASSIGN_OR_RAISE(const auto dictionaries, > CollectDictionaries(batch, mapper_)); > ``` > > `mapper_` is a DictionaryFieldMapper (recall: maps FieldPaths to dictionary > IDs) > > CollectDictionaries is declared in the header here: > > > https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.h#L150 > > ``` > // For writing: collect dictionary entries to write to the IPC stream, in > order > // (i.e. inner dictionaries before dependent outer dictionaries). > ARROW_EXPORT > Result<DictionaryVector> CollectDictionaries(const RecordBatch& batch, > const DictionaryFieldMapper& > mapper); > ``` > > and defined here: > > > https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L383 > > ``` > Result<DictionaryVector> CollectDictionaries(const RecordBatch& batch, > const DictionaryFieldMapper& > mapper) { > DictionaryCollector collector{mapper, {}}; > RETURN_NOT_OK(collector.Collect(batch)); > return std::move(collector.dictionaries_); > } > ``` > > It creates a DictionaryCollector, passing in the mapper, and then calls > Collect > and returning the result. > > The DictionaryCollector is defined here: > > > https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L297 > > ``` > struct DictionaryCollector { > const DictionaryFieldMapper& mapper_; > DictionaryVector dictionaries_; > ``` > > with `Collect` defined here: > > > https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L336 > > ``` > Status Collect(const RecordBatch& batch) { > FieldPosition position; > const Schema& schema = *batch.schema(); > dictionaries_.reserve(mapper_.num_fields()); > > for (int i = 0; i < schema.num_fields(); ++i) { > RETURN_NOT_OK(Visit(position.child(i), schema.field(i), > batch.column(i).get())); > } > return Status::OK(); > } > ``` > > FieldPosition is, it seems, a reverse LinkedList (each node stores a > pointer to > its parent) where the nodes record their index (like FieldPath) and depth. > It > has a method to walk the list and construct a vector containing the > indices. > > It then grabs a reference to the schema and reserves enough space in > dictionaries - recall: a vector of tuples (int, Array) - for all the fields > (optimisation to reduce allocations?). > > Then for each field in the schema, it calls `Visit` with a new > FieldPosition > (via the call to `position.child(i)`, the field at position i in the > schema, > and the RecordBatch data. > > Visit is defined here: > > > https://github.com/apache/arrow/blob/6b0248d5b5502a1cf76d5853d15be9844b84522c/cpp/src/arrow/ipc/dictionary.cc#L310 > > ``` > Status Visit(const FieldPosition& position, const > std::shared_ptr<Field>& field, const Array* array) { > ``` > > The result of this is that `dictionaries_` contains all the tuples of (id, > Array) where array contains the dictionary values (e.g. strings). > > Going right back up the stack to `WriteDictionaries` we now iterate over > each > of these dictionaries: > > > https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060 > > ``` > for (const auto& pair : dictionaries) { > int64_t dictionary_id = pair.first; > const auto& dictionary = pair.second; > ``` > > If a dictionary with this ID has already been output, then the code checks > whether it is the same to avoid writing repeats twice. > > ``` > // If a dictionary with this id was already emitted, check if it was > the same. > auto* last_dictionary = &last_dictionaries_[dictionary_id]; > const bool dictionary_exists = (*last_dictionary != nullptr); > int64_t delta_start = 0; > if (dictionary_exists) { > if ((*last_dictionary)->data() == dictionary->data()) { > // Fast shortcut for a common case. > // Same dictionary data by pointer => no need to emit it again > continue; > } > const int64_t last_length = (*last_dictionary)->length(); > const int64_t new_length = dictionary->length(); > if (new_length == last_length && > ((*last_dictionary)->Equals(dictionary, equal_options))) { > // Same dictionary by value => no need to emit it again > // (while this can have a CPU cost, this code path is required > // for the IPC file format) > continue; > } > ``` > > If either of these equality conditions trigger, then we continue on to > check > the next dictionary. > > If it hasn't been previously output then we might either be able to write a > replacement or a delta. > > Currently the code throws an error in both of these cases if we're writing > to > the IPC file format - this goes against the spec. > > If the current dictionary is "just" an extension of the previous > dictionary and > deltas are enabled then it marks where the new starting position is. > > Given this, it creates a payload - delta or not - and writes it, ensuring > to > update the stats data structure. > > ``` > IpcPayload payload; > if (delta_start) { > RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, > /*is_delta=*/true, > dictionary->Slice(delta_start), > options_, > &payload)); > } else { > RETURN_NOT_OK( > GetDictionaryPayload(dictionary_id, dictionary, options_, > &payload)); > } > RETURN_NOT_OK(WritePayload(payload)); > ++stats_.num_dictionary_batches; > if (dictionary_exists) { > if (delta_start) { > ++stats_.num_dictionary_deltas; > } else { > ++stats_.num_replaced_dictionaries; > } > } > ``` > > <https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060> > arrow/writer.cc at 91e3ac53e2e21736ce6291d73fc37da6fa21259d · apache/arrow > <https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/ipc/writer.cc#L1060> > Apache Arrow is a multi-language toolbox for accelerated data interchange > and in-memory processing - arrow/writer.cc at > 91e3ac53e2e21736ce6291d73fc37da6fa21259d · apache/arrow > github.com > ** > > ------------------------------ > *From:* Weston Pace <[email protected]> > *Sent:* 27 July 2021 19:52 > *To:* [email protected] <[email protected]> > *Subject:* Re: [PyArrow] DictionaryArray isDelta Support > > I'm able to verify what Sam is seeing. It appears Arrow does not > support dictionary deltas in the file format. However, from my > reading of the spec it does indeed seem proper deltas should be > allowed. A small patch[1] allowed me to write delta dictionaries in > the file format but then reading fails at [2] which seems more > explicit that dictionary deltas of any kind were not originally > supported with the file format. I think the fix for read will > probably be a bit more involved and require some good testing. I've > opened ARROW-13467 for further discussion. > > [1] > https://github.com/apache/arrow/commit/54df8581a9d825664785fc406950204e345a5b3b > [2] > https://github.com/apache/arrow/blob/81ff679c47754692224f655dab32cc0936bb5f55/cpp/src/arrow/ipc/reader.cc#L1062 > [3] https://issues.apache.org/jira/browse/ARROW-13467 > > On Sun, Jul 25, 2021 at 10:12 PM Sam Davis <[email protected]> > wrote: > > > > Hi Wes, > > > > Yes, that is exactly it. For the file format, the spec dictates that it > should be possible to output deltas but currently this is not possible. An > `ArrowInvalid` error is thrown. > > > > Example code: > > > > ``` > > import pandas as pd > > import pyarrow as pa > > > > print(pa.__version__) > > > > schema = pa.schema([ > > ("foo", pa.dictionary(pa.int16(), pa.string())) > > ]) > > > > pd1 = pd.DataFrame({"foo": pd.Categorical(["a"], categories=["a", "b"])}) > > b1 = pa.RecordBatch.from_pandas(pd1, schema=schema) > > > > pd2 = pd.DataFrame({"foo": pd.Categorical(["a", "bbbb"], > categories=["a", "b", "bbbb"])}) > > b2 = pa.RecordBatch.from_pandas(pd2, schema=schema) > > > > options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True) > > > > with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=schema, > options=options) as writer: > > writer.write(b1) > > > > writer.write(b2) > > ``` > > > > Best, > > > > Sam > > ________________________________ > > From: Wes McKinney <[email protected]> > > Sent: 24 July 2021 01:43 > > To: [email protected] <[email protected]> > > Subject: Re: [PyArrow] DictionaryArray isDelta Support > > > > If I'm interpreting you correctly, the issue is that every dictionary > > must be a prefix of a common dictionary for the delta logic to work. > > So if the first batch has > > > > "a", "b" > > > > then in the next batch > > > > "a", "b", "c" is OK and will emit a delta > > "b", "a", "c" is not and will trigger this error > > > > If we wanted to allow for deltas coming from unordered dictionaries as > > an option, that could be implemented in theory but it not super > > trivial > > > > On Fri, Jul 23, 2021 at 9:25 AM Sam Davis <[email protected]> > wrote: > > > > > > For reference, I think this check in the C++ code triggers regardless > of whether the delta option is turned on: > > > > > > > https://github.com/apache/arrow/blob/e0401123736c85283e527797a113a3c38c0915f2/cpp/src/arrow/ipc/writer.cc#L1066 > > > ________________________________ > > > From: Sam Davis <[email protected]> > > > Sent: 23 July 2021 14:43 > > > To: [email protected] <[email protected]> > > > Subject: Re: [PyArrow] DictionaryArray isDelta Support > > > > > > Yes I know this as quoted in the spec. What I am wondering is for the > file format how can I write deltas out using PyArrow? > > > > > > The previous example was a trivial version of reality. > > > > > > More concretely, say I want to write 100e6 rows out in multiple > RecordBatches to a non-streaming file format using PyArrow. I do not want > to do a complete pass ahead of time to compute the full set of strings for > the relevant columns and would therefore like to dump out deltas when new > strings appear in a given column. Is this possible? > > > > > > In the example code ideally this would "just" add on the delta > containing the dictionary difference of it and the previous batches. I'm > happy as a user to maintain the full set of categories seen thus far and > tell PyArrow what the delta is if necessary. > > > ________________________________ > > > From: Wes McKinney <[email protected]> > > > Sent: 23 July 2021 14:36 > > > To: [email protected] <[email protected]> > > > Subject: Re: [PyArrow] DictionaryArray isDelta Support > > > > > > Dictionary replacements aren't supported in the file format, only > > > deltas. Your use case is a replacement, not a delta. You could use the > > > stream format instead. > > > > > > On Fri, Jul 23, 2021 at 8:32 AM Sam Davis <[email protected]> > wrote: > > > > > > > > Hey Wes, > > > > > > > > Thanks, I had not spotted this before! It doesn't seem to change the > behaviour with `pa.ipc.new_file` however. Maybe I'm using it incorrectly? > > > > > > > > ``` > > > > import pandas as pd > > > > import pyarrow as pa > > > > > > > > print(pa.__version__) > > > > > > > > schema = pa.schema([ > > > > ("foo", pa.dictionary(pa.int16(), pa.string())) > > > > ]) > > > > > > > > pd1 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], > categories=["a"*i for i in range(64)])}) > > > > b1 = pa.RecordBatch.from_pandas(pd1, schema=schema) > > > > > > > > pd2 = pd.DataFrame({"foo": pd.Categorical(["aaaa"], > categories=["b"*i for i in range(64)])}) > > > > b2 = pa.RecordBatch.from_pandas(pd2, schema=schema) > > > > > > > > options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True) > > > > > > > > with pa.ipc.new_file("/tmp/sdavis_tmp.arrow", schema=b1.schema, > options=options) as writer: > > > > writer.write(b1) > > > > writer.write(b2) > > > > ``` > > > > > > > > Version printed: 4.0.1 > > > > > > > > Sam > > > > ________________________________ > > > > From: Wes McKinney <[email protected]> > > > > Sent: 23 July 2021 14:24 > > > > To: [email protected] <[email protected]> > > > > Subject: Re: [PyArrow] DictionaryArray isDelta Support > > > > > > > > hi Sam > > > > > > > > On Fri, Jul 23, 2021 at 8:15 AM Sam Davis < > [email protected]> wrote: > > > > > > > > > > Hi, > > > > > > > > > > We want to write out RecordBatches of data, where one or more > columns in a batch has a `pa.string()` column encoded as a > `pa.dictionary(pa.intX(), pa.string()` as the column only contains a > handful of unique values. > > > > > > > > > > However, PyArrow seems to lack support for writing these batches > out to either the streaming or (non-streaming) file format. > > > > > > > > > > When attempting to write two distinct batches the following error > message is triggered: > > > > > > > > > > > ArrowInvalid: Dictionary replacement detected when writing IPC > file format. Arrow IPC files only support a single dictionary for a given > field across all batches. > > > > > > > > > > I believe this message is false and that support is possible based > on reading the spec: > > > > > > > > > > > Dictionaries are written in the stream and file formats as a > sequence of record batches... > > > > > > ... > > > > > > The dictionary isDelta flag allows existing dictionaries to be > expanded for future record batch materializations. A dictionary batch with > isDelta set indicates that its vector should be concatenated with those of > any previous batches with the same id. In a stream which encodes one > column, the list of strings ["A", "B", "C", "B", "D", "C", "E", "A"], with > a delta dictionary batch could take the form: > > > > > > > > > > ``` > > > > > <SCHEMA> > > > > > <DICTIONARY 0> > > > > > (0) "A" > > > > > (1) "B" > > > > > (2) "C" > > > > > > > > > > <RECORD BATCH 0> > > > > > 0 > > > > > 1 > > > > > 2 > > > > > 1 > > > > > > > > > > <DICTIONARY 0 DELTA> > > > > > (3) "D" > > > > > (4) "E" > > > > > > > > > > <RECORD BATCH 1> > > > > > 3 > > > > > 2 > > > > > 4 > > > > > 0 > > > > > EOS > > > > > ``` > > > > > > > > > > > Alternatively, if isDelta is set to false, then the dictionary > replaces the existing dictionary for the same ID. Using the same example as > above, an alternate encoding could be: > > > > > > > > > > ``` > > > > > <SCHEMA> > > > > > <DICTIONARY 0> > > > > > (0) "A" > > > > > (1) "B" > > > > > (2) "C" > > > > > > > > > > <RECORD BATCH 0> > > > > > 0 > > > > > 1 > > > > > 2 > > > > > 1 > > > > > > > > > > <DICTIONARY 0> > > > > > (0) "A" > > > > > (1) "C" > > > > > (2) "D" > > > > > (3) "E" > > > > > > > > > > <RECORD BATCH 1> > > > > > 2 > > > > > 1 > > > > > 3 > > > > > 0 > > > > > EOS > > > > > ``` > > > > > > > > > > It also specifies in the IPC File Format (non-streaming) section: > > > > > > > > > > > In the file format, there is no requirement that dictionary keys > should be defined in a DictionaryBatch before they are used in a > RecordBatch, as long as the keys are defined somewhere in the file. Further > more, it is invalid to have more than one non-delta dictionary batch per > dictionary ID (i.e. dictionary replacement is not supported). Delta > dictionaries are applied in the order they appear in the file footer. > > > > > > > > > > So for the non-streaming format multiple non-delta dictionaries > are not supported but one non-delta followed by delta dictionaries should > be. > > > > > > > > > > Is it possible to do this in PyArrow? If so, how? If not, how easy > would it be to add? Is it currently possible via C++ and therefore can I > write a Cython or similar extension that will let me do this now without > waiting for a release? > > > > > > > > > > > > > In pyarrow (3.0.0 or later), you need to opt into emitting dictionary > > > > deltas using pyarrow.ipc.IpcWriteOptions. Can you show your code? > > > > > > > > > https://github.com/apache/arrow/commit/8d76312dd397ebe07b71531f6d23b8caa76703dc > > > > > > > > > Best, > > > > > > > > > > Sam > > > > > IMPORTANT NOTICE: The information transmitted is intended only for > the person or entity to which it is addressed and may contain confidential > and/or privileged material. Any review, re-transmission, dissemination or > other use of, or taking of any action in reliance upon, this information by > persons or entities other than the intended recipient is prohibited. If you > received this in error, please contact the sender and delete the material > from any computer. Although we routinely screen for viruses, addressees > should check this e-mail and any attachment for viruses. We make no > warranty as to absence of viruses in this e-mail or any attachments. > > > > IMPORTANT NOTICE: The information transmitted is intended only for > the person or entity to which it is addressed and may contain confidential > and/or privileged material. Any review, re-transmission, dissemination or > other use of, or taking of any action in reliance upon, this information by > persons or entities other than the intended recipient is prohibited. If you > received this in error, please contact the sender and delete the material > from any computer. Although we routinely screen for viruses, addressees > should check this e-mail and any attachment for viruses. We make no > warranty as to absence of viruses in this e-mail or any attachments. > > > IMPORTANT NOTICE: The information transmitted is intended only for the > person or entity to which it is addressed and may contain confidential > and/or privileged material. Any review, re-transmission, dissemination or > other use of, or taking of any action in reliance upon, this information by > persons or entities other than the intended recipient is prohibited. If you > received this in error, please contact the sender and delete the material > from any computer. Although we routinely screen for viruses, addressees > should check this e-mail and any attachment for viruses. We make no > warranty as to absence of viruses in this e-mail or any attachments. > > IMPORTANT NOTICE: The information transmitted is intended only for the > person or entity to which it is addressed and may contain confidential > and/or privileged material. Any review, re-transmission, dissemination or > other use of, or taking of any action in reliance upon, this information by > persons or entities other than the intended recipient is prohibited. If you > received this in error, please contact the sender and delete the material > from any computer. Although we routinely screen for viruses, addressees > should check this e-mail and any attachment for viruses. We make no > warranty as to absence of viruses in this e-mail or any attachments. > IMPORTANT NOTICE: The information transmitted is intended only for the > person or entity to which it is addressed and may contain confidential > and/or privileged material. Any review, re-transmission, dissemination or > other use of, or taking of any action in reliance upon, this information by > persons or entities other than the intended recipient is prohibited. If you > received this in error, please contact the sender and delete the material > from any computer. Although we routinely screen for viruses, addressees > should check this e-mail and any attachment for viruses. We make no > warranty as to absence of viruses in this e-mail or any attachments. >
