Peter,

The Spark procedure is implemented by CreateChangelogViewProcedure.java
<https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java>.
This was already added by Yufei in Iceberg 1.2.0.
ChangelogIterator
<https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java>
is
a base class that contains static methods such as the removeNetCarryovers I
mentioned; RemoveNetCarryoverIterator
<https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java>
is
a subclass that computes the net changes.
These are Spark specific as they work with iterators of
org.apache.spark.sql.Row.

BaseIncrementalChangelogScan
<https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java>
is
a common building block that can be used by other engines than Spark; it
powers the Spark ChangelogRowReader. In the engines, whether Spark or Flink
or some other, choices can be made or made available for what records to
show. However, as a building block, I think we need the generation of all
the changes for each snapshot in the requested range. If you have ideas for
expanding the API of BaseIncrementalChangelogScan so that refinements of
what records to emit can be pushed down to it, I'd be interested in hearing
them. (They will be beyond the scope of my current PR, I think.)

- Wing Yew


On Thu, Aug 22, 2024 at 11:51 AM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> That's good info. I didn't know that we already have the Spark procedure
> at hand.
> How does Spark calculate the `changelog_view`? Do we already have an
> implementation at hand somewhere? Could it be reused?
>
> Anyways, if we want to reuse the new changelogscan for the changelog_view
> as well, then I agree that we need to provide a solution for (b). I think
> that (a)/net_changes is also important as streaming readers for the table
> are often not interested in the intermediate states, just in the final
> changes. And (a) could result in far fewer records which means better
> performance, lower resource usage.
>
> Steve Zhang <hongyue_zh...@apple.com.invalid> ezt írta (időpont: 2024.
> aug. 22., Cs, 19:47):
>
>> Yeah agree on this, I think for changelogscan to convert per snapshot
>> scan to tasks the option b with complete history is the right way. While
>> there shall be an option to configure if net/squashed changes are desired.
>>
>> Also, In spark create_catalog_view, the net changes and compute update
>> cannot be used together.
>>
>> Thanks,
>> Steve Zhang
>>
>>
>>
>> On Aug 22, 2024, at 8:50 AM, Steven Wu <stevenz...@gmail.com> wrote:
>>
>> >  It should emit changes for each snapshot in the requested range.
>>
>> Wing Yew has a good point here. +1
>>
>> On Thu, Aug 22, 2024 at 8:46 AM Wing Yew Poon <wyp...@cloudera.com.invalid>
>> wrote:
>>
>>> First, thank you all for your responses to my question.
>>>
>>> For Peter's question, I believe that (b) is the correct behavior. It is
>>> also the current behavior when using copy-on-write (deletes and updates are
>>> still supported but not using delete files). A changelog scan is an
>>> incremental scan over multiple snapshots. It should emit changes for each
>>> snapshot in the requested range. Spark provides additional functionality on
>>> top of the changelog scan, to produce net changes for the requested range.
>>> See
>>> https://iceberg.apache.org/docs/latest/spark-procedures/#create_changelog_view.
>>> Basically the create_changelog_view procedure uses a changelog scan (read
>>> the changelog table, i.e., <table>.changes) to get a DataFrame which is
>>> saved to a temporary Spark view which can then be queried; if net_changes
>>> is true, only the net changes are produced for this temporary view. This
>>> functionality uses ChangelogIterator.removeNetCarryovers (which is in
>>> Spark).
>>>
>>>
>>> On Thu, Aug 22, 2024 at 7:51 AM Steven Wu <stevenz...@gmail.com> wrote:
>>>
>>>> Peter, good question. In this case, (b) is the complete change history.
>>>> (a) is the squashed version.
>>>>
>>>> I would probably check how other changelog systems deal with this
>>>> scenario.
>>>>
>>>> On Thu, Aug 22, 2024 at 3:49 AM Péter Váry <peter.vary.apa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Technically different, but somewhat similar question:
>>>>>
>>>>> What is the expected behaviour when the `IncrementalScan` is created
>>>>> for not a single snapshot, but for multiple snapshots?
>>>>> S1 added PK1-V1
>>>>> S2 updated PK1-V1 to PK1-V1b (removed PK1-V1 and added PK1-V1b)
>>>>> S3 updated PK1-V1b to PK1-V1c (removed PK1-V1b and added PK1-V1c)
>>>>>
>>>>> Let's say we have
>>>>> *IncrementalScan.fromSnapshotInclusive(S1).toSnapshot(S3)*.
>>>>> Or we need to return:
>>>>> (a)
>>>>> - PK1,V1c,INSERTED
>>>>>
>>>>> Or is it ok, to return:
>>>>> (b)
>>>>> - PK1,V1,INSERTED
>>>>> - PK1,V1,DELETED
>>>>> - PK1,V1b,INSERTED
>>>>> - PK1,V1b,DELETED
>>>>> - PK1,V1c,INSERTED
>>>>>
>>>>> I think the (a) is the correct behaviour.
>>>>>
>>>>> Thanks,
>>>>> Peter
>>>>>
>>>>> Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2024. aug. 21.,
>>>>> Sze, 22:27):
>>>>>
>>>>>> Agree with everyone that option (a) is the correct behavior.
>>>>>>
>>>>>> On Wed, Aug 21, 2024 at 11:57 AM Steve Zhang
>>>>>> <hongyue_zh...@apple.com.invalid> wrote:
>>>>>>
>>>>>>> I agree that option (a) is what user expects for row level changes.
>>>>>>>
>>>>>>> I feel the added deletes in given snapshots provides a PK of DELETED
>>>>>>> entry, existing deletes are used to read together with data files to 
>>>>>>> find
>>>>>>> DELETED value (V1b) and result of columns.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Steve Zhang
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Aug 20, 2024, at 6:06 PM, Wing Yew Poon
>>>>>>> <wyp...@cloudera.com.INVALID> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have a PR open to add changelog support for the case where delete
>>>>>>> files are present (https://github.com/apache/iceberg/pull/10935). I
>>>>>>> have a question about what the changelog should emit in the following
>>>>>>> scenario:
>>>>>>>
>>>>>>> The table has a schema with a primary key/identifier column PK and
>>>>>>> additional column V.
>>>>>>> In snapshot 1, we write a data file DF1 with rows
>>>>>>> PK1, V1
>>>>>>> PK2, V2
>>>>>>> etc.
>>>>>>> In snapshot 2, we write an equality delete file ED1 with PK=PK1, and
>>>>>>> new data file DF2 with rows
>>>>>>> PK1, V1b
>>>>>>> (possibly other rows)
>>>>>>> In snapshot 3, we write an equality delete file ED2 with PK=PK1, and
>>>>>>> new data file DF3 with rows
>>>>>>> PK1, V1c
>>>>>>> (possibly other rows)
>>>>>>>
>>>>>>> Thus, in snapshot 2 and snapshot 3, we update the row identified by
>>>>>>> PK1 with new values by using an equality delete and writing new data for
>>>>>>> the row.
>>>>>>> These are the files present in snapshot 3:
>>>>>>> DF1 (sequence number 1)
>>>>>>> DF2 (sequence number 2)
>>>>>>> DF3 (sequence number 3)
>>>>>>> ED1 (sequence number 2)
>>>>>>> ED2 (sequence number 3)
>>>>>>>
>>>>>>> The question I have is what should the changelog emit for snapshot 3?
>>>>>>> For snapshot 1, the changelog should emit a row for each row in DF1
>>>>>>> as INSERTED.
>>>>>>> For snapshot 2, it should emit a row for PK1, V1 as DELETED; and a
>>>>>>> row for PK1, V1b as INSERTED.
>>>>>>> For snapshot 3, I see two possibilities:
>>>>>>> (a)
>>>>>>> PK1,V1b,DELETED
>>>>>>> PK1,V1c,INSERTED
>>>>>>>
>>>>>>> (b)
>>>>>>> PK1,V1,DELETED
>>>>>>> PK1,V1b,DELETED
>>>>>>> PK1,V1c,INSERTED
>>>>>>>
>>>>>>> The interpretation for (b) is that both ED1 and ED2 apply to DF1,
>>>>>>> with ED1 being an existing delete file and ED2 being an added delete 
>>>>>>> file
>>>>>>> for it. We discount ED1 and apply ED2 and get a DELETED row for PK1,V1.
>>>>>>> ED2 also applies to DF2, from which we get a DELETED row for PK1,
>>>>>>> V1b.
>>>>>>>
>>>>>>> The interpretation for (a) is that ED1 is an existing delete file
>>>>>>> for DF1 and in snapshot 3, the row PK1,V1 already does not exist before 
>>>>>>> the
>>>>>>> snapshot. Thus we do emit a row for it. (We can think of it as ED1 is
>>>>>>> already applied to DF1, and we only consider any additional rows that 
>>>>>>> get
>>>>>>> deleted when ED2 is applied.)
>>>>>>>
>>>>>>> I lean towards (a), as I think it is more reflective of net changes.
>>>>>>> I am interested to hear what folks think.
>>>>>>>
>>>>>>> Thank you,
>>>>>>> Wing Yew
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>

Reply via email to