This current thread is another example of a practical need for pre
combine field.
 "[DISCUSS] split source of kafka partition by count"


On Tue, Apr 4, 2023 at 7:31 AM Vinoth Chandar <vin...@apache.org> wrote:

> Thanks for raising this issue.
>
> Love to use this opp to share more context on why the preCombine field
> exists.
>
>    - As you probably inferred already, we needed to eliminate duplicates,
>    while dealing with out-of-order data (e.g database change records arriving
>    in different orders from two Kafka clusters in two zones). So it was
>    necessary to preCombine by a "event" field, rather than just the arrival
>    time (which is what _hoodie_commit_time is).
>    - This comes from stream processing concepts like
>    https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/ ,
>    which build upon inadequacies in traditional database systems to deal with
>    things like this. At the end of the day, we are solving a "processing"
>    problem IMO with Hudi - Hudi replaces existing batch/streaming pipelines,
>    not OLTP databases. That's at-least the lens we approached it from.
>    - For this to work end-end, it is not sufficient to just precombine
>    within a batch of incoming writes, we also need to consistently apply the
>    same against data in storage. In CoW, we implicitly merge against storage,
>    so its simpler. But for MoR, we simply append records to log files, so we
>    needed to make this a table property - such that queries/compaction can
>    later do the right preCombine. Hope that clarifies the CoW vs MoR
>    differences.
>
> On the issues raised/proposals here.
>
>    1. I think we need some dedicated efforts across the different writer
>    paths to make it easier. probably some lower hanging fruits here. Some of
>    it results from just different authors contributing to different code paths
>    in an OSS project.
>    2. On picking a sane default precombine field. _hoodie_commit_time is
>    a good candidate for preCombine field, as you point out, we would just pick
>    1/many records with the same key arbitrarily, in that scenario. On
>    storage/across commits, we would pick the value with the latest
>    commit_time/last writer wins - which would make queries repeatedly provide
>    the same consistent values as well.  Needs more thought.
>    3. If the user desires to customize this behavior, they could supply a
>    preCombine field that is different. This would be similar to semantics of
>    event time vs arrival order processing in streaming systems. Personally, I
>    need to spend a bit more time digging to come up with an elegant solution
>    here.
>    4. For the proposals on how Hudi could de-duplicate, after the fact
>    that inserts introduced duplicates - I think the current behavior is a bit
>    more condoning than what I'd like tbh. It updates both the records IIRC. I
>    think Hudi should ensure record key uniqueness across different paths and
>    fail the write if it's violated. - if we think of this as in RDBMS lens,
>    that's what would happen, correct?
>
>
> Love to hear your thoughts. If we can file a JIRA or compile JIRAs with
> issues around this, we could discuss out short, long term plans?
>
> Thanks
> Vinoth
>
> On Sat, Apr 1, 2023 at 3:13 PM Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
>
>> Hi Daniel,
>>
>> Thanks for the detailed write-up.
>>
>> I can’t add much to the discussion, other than noting we also recently
>> ran into the related oddity that we don’t need to define a precombine when
>> writing data to a COW table (using Flink), but then trying to use Spark to
>> drop partitions failed because there’s a default precombine field name (set
>> to “ts”), and if that field doesn’t exist then the Spark job fails.
>>
>> — Ken
>>
>>
>> > On Mar 31, 2023, at 1:20 PM, Daniel Kaźmirski <d.kazmir...@gmail.com>
>> wrote:
>> >
>> > Hi all,
>> >
>> > I would like to bring up the topic of how precombine field is used and
>> > what's the purpose of it. I would also like to know what are the plans
>> for
>> > it in the future.
>> >
>> > At first glance precombine filed looks like it's only used to
>> deduplicate
>> > records in incoming batch.
>> > But when digging deeper it looks like it can/is also be used to:
>> > 1. combine records not before but on write to decide if update existing
>> > record (eg with DefaultHoodieRecordPayload)
>> > 2. combine records on read for MoR table to combine log and base files
>> > correctly.
>> > 3. precombine field is required for spark SQL UPDATE, even if user can't
>> > provide duplicates anyways with this sql statement.
>> >
>> > Regarding [3] there's inconsistency as precombine field is not required
>> in
>> > MERGE INTO UPDATE. Underneath UPSERT is switched to INSERT in upsert
>> mode
>> > to update existing records.
>> >
>> > I know that Hudi does a lot of work to ensure PK uniqueness
>> across/within
>> > partitions and there is a need to deduplicate records before write or to
>> > deduplicate existing data if duplicates were introduced eg when using
>> > non-strict insert mode.
>> >
>> > What should then happen in a situation where user does not want or can
>> not
>> > provide a pre-combine field? Then it's on user not to introduce
>> duplicates,
>> > but makes Hudi more generic and easier to use for "SQL" people.
>> >
>> > No precombine is possible for CoW, already, but UPSERT and SQL UPDATE is
>> > not supported (but users can update records using Insert in non-strict
>> mode
>> > or MERGE INTO UPDATE).
>> > There's also a difference between CoW and MoR where for MoR
>> > precombine field is a hard requirement, but is optional for CoW.
>> > (UPDATES with no precombine are also possible in Flink for both CoW and
>> MoR
>> > but not in Spark.)
>> >
>> > Would it make sense to take inspiration from some DBMS systems then (eg
>> > Synapse) to allow updates and upserts when no precombine field is
>> specified?
>> > Scenario:
>> > Say that duplicates were introduced with Insert in non-strict mode, no
>> > precombine field is specified, then we have two options:
>> > option 1) on UPDATE/UPSERT Hudi should deduplicate the existing
>> records, as
>> > there's no precombine field it's expected we don't know which records
>> will
>> > be removed and which will be effectively updated and preserved in the
>> > table. (This can be also achieved by always providing the same value in
>> > precombine field for all records.)
>> > option 2) on UPDATE/UPSERT Hudi should deduplicate the existing
>> records, as
>> > there's no precombine field, record with the latest _hoodie_commit_time
>> is
>> > preserved and updated, other records with the same PK are removed.
>> >
>> > In both cases, deduplication on UPDATE/UPSERT becomes a hard rule
>> > whether we use precombine field or not.
>> >
>> > Then regarding MoR and merging records on read (found this in Hudi
>> format
>> > spec), can it be done by only using _hoodie_commit_time in absence of
>> > precombine field?
>> > If so for both MoR and CoW precombine field can become completely
>> optional?
>> >
>> > I'm of course looking at it more from the user perspective, it would be
>> > nice to know what is and what is not possible from the design and
>> developer
>> > perspective.
>> >
>> > Best Regards,
>> > Daniel Kaźmirski
>>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>

Reply via email to