Thanks for raising this issue.

Love to use this opp to share more context on why the preCombine field
exists.

   - As you probably inferred already, we needed to eliminate duplicates,
   while dealing with out-of-order data (e.g database change records arriving
   in different orders from two Kafka clusters in two zones). So it was
   necessary to preCombine by a "event" field, rather than just the arrival
   time (which is what _hoodie_commit_time is).
   - This comes from stream processing concepts like
   https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/ ,
   which build upon inadequacies in traditional database systems to deal with
   things like this. At the end of the day, we are solving a "processing"
   problem IMO with Hudi - Hudi replaces existing batch/streaming pipelines,
   not OLTP databases. That's at-least the lens we approached it from.
   - For this to work end-end, it is not sufficient to just precombine
   within a batch of incoming writes, we also need to consistently apply the
   same against data in storage. In CoW, we implicitly merge against storage,
   so its simpler. But for MoR, we simply append records to log files, so we
   needed to make this a table property - such that queries/compaction can
   later do the right preCombine. Hope that clarifies the CoW vs MoR
   differences.

On the issues raised/proposals here.

   1. I think we need some dedicated efforts across the different writer
   paths to make it easier. probably some lower hanging fruits here. Some of
   it results from just different authors contributing to different code paths
   in an OSS project.
   2. On picking a sane default precombine field. _hoodie_commit_time is a
   good candidate for preCombine field, as you point out, we would just pick
   1/many records with the same key arbitrarily, in that scenario. On
   storage/across commits, we would pick the value with the latest
   commit_time/last writer wins - which would make queries repeatedly provide
   the same consistent values as well.  Needs more thought.
   3. If the user desires to customize this behavior, they could supply a
   preCombine field that is different. This would be similar to semantics of
   event time vs arrival order processing in streaming systems. Personally, I
   need to spend a bit more time digging to come up with an elegant solution
   here.
   4. For the proposals on how Hudi could de-duplicate, after the fact that
   inserts introduced duplicates - I think the current behavior is a bit more
   condoning than what I'd like tbh. It updates both the records IIRC. I think
   Hudi should ensure record key uniqueness across different paths and fail
   the write if it's violated. - if we think of this as in RDBMS lens, that's
   what would happen, correct?


Love to hear your thoughts. If we can file a JIRA or compile JIRAs with
issues around this, we could discuss out short, long term plans?

Thanks
Vinoth

On Sat, Apr 1, 2023 at 3:13 PM Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Hi Daniel,
>
> Thanks for the detailed write-up.
>
> I can’t add much to the discussion, other than noting we also recently ran
> into the related oddity that we don’t need to define a precombine when
> writing data to a COW table (using Flink), but then trying to use Spark to
> drop partitions failed because there’s a default precombine field name (set
> to “ts”), and if that field doesn’t exist then the Spark job fails.
>
> — Ken
>
>
> > On Mar 31, 2023, at 1:20 PM, Daniel Kaźmirski <d.kazmir...@gmail.com>
> wrote:
> >
> > Hi all,
> >
> > I would like to bring up the topic of how precombine field is used and
> > what's the purpose of it. I would also like to know what are the plans
> for
> > it in the future.
> >
> > At first glance precombine filed looks like it's only used to deduplicate
> > records in incoming batch.
> > But when digging deeper it looks like it can/is also be used to:
> > 1. combine records not before but on write to decide if update existing
> > record (eg with DefaultHoodieRecordPayload)
> > 2. combine records on read for MoR table to combine log and base files
> > correctly.
> > 3. precombine field is required for spark SQL UPDATE, even if user can't
> > provide duplicates anyways with this sql statement.
> >
> > Regarding [3] there's inconsistency as precombine field is not required
> in
> > MERGE INTO UPDATE. Underneath UPSERT is switched to INSERT in upsert mode
> > to update existing records.
> >
> > I know that Hudi does a lot of work to ensure PK uniqueness across/within
> > partitions and there is a need to deduplicate records before write or to
> > deduplicate existing data if duplicates were introduced eg when using
> > non-strict insert mode.
> >
> > What should then happen in a situation where user does not want or can
> not
> > provide a pre-combine field? Then it's on user not to introduce
> duplicates,
> > but makes Hudi more generic and easier to use for "SQL" people.
> >
> > No precombine is possible for CoW, already, but UPSERT and SQL UPDATE is
> > not supported (but users can update records using Insert in non-strict
> mode
> > or MERGE INTO UPDATE).
> > There's also a difference between CoW and MoR where for MoR
> > precombine field is a hard requirement, but is optional for CoW.
> > (UPDATES with no precombine are also possible in Flink for both CoW and
> MoR
> > but not in Spark.)
> >
> > Would it make sense to take inspiration from some DBMS systems then (eg
> > Synapse) to allow updates and upserts when no precombine field is
> specified?
> > Scenario:
> > Say that duplicates were introduced with Insert in non-strict mode, no
> > precombine field is specified, then we have two options:
> > option 1) on UPDATE/UPSERT Hudi should deduplicate the existing records,
> as
> > there's no precombine field it's expected we don't know which records
> will
> > be removed and which will be effectively updated and preserved in the
> > table. (This can be also achieved by always providing the same value in
> > precombine field for all records.)
> > option 2) on UPDATE/UPSERT Hudi should deduplicate the existing records,
> as
> > there's no precombine field, record with the latest _hoodie_commit_time
> is
> > preserved and updated, other records with the same PK are removed.
> >
> > In both cases, deduplication on UPDATE/UPSERT becomes a hard rule
> > whether we use precombine field or not.
> >
> > Then regarding MoR and merging records on read (found this in Hudi format
> > spec), can it be done by only using _hoodie_commit_time in absence of
> > precombine field?
> > If so for both MoR and CoW precombine field can become completely
> optional?
> >
> > I'm of course looking at it more from the user perspective, it would be
> > nice to know what is and what is not possible from the design and
> developer
> > perspective.
> >
> > Best Regards,
> > Daniel Kaźmirski
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

Reply via email to