This current thread is another example of a practical need for pre combine field. "[DISCUSS] split source of kafka partition by count"
On Tue, Apr 4, 2023 at 7:31 AM Vinoth Chandar <vin...@apache.org> wrote: > Thanks for raising this issue. > > Love to use this opp to share more context on why the preCombine field > exists. > > - As you probably inferred already, we needed to eliminate duplicates, > while dealing with out-of-order data (e.g database change records arriving > in different orders from two Kafka clusters in two zones). So it was > necessary to preCombine by a "event" field, rather than just the arrival > time (which is what _hoodie_commit_time is). > - This comes from stream processing concepts like > https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/ , > which build upon inadequacies in traditional database systems to deal with > things like this. At the end of the day, we are solving a "processing" > problem IMO with Hudi - Hudi replaces existing batch/streaming pipelines, > not OLTP databases. That's at-least the lens we approached it from. > - For this to work end-end, it is not sufficient to just precombine > within a batch of incoming writes, we also need to consistently apply the > same against data in storage. In CoW, we implicitly merge against storage, > so its simpler. But for MoR, we simply append records to log files, so we > needed to make this a table property - such that queries/compaction can > later do the right preCombine. Hope that clarifies the CoW vs MoR > differences. > > On the issues raised/proposals here. > > 1. I think we need some dedicated efforts across the different writer > paths to make it easier. probably some lower hanging fruits here. Some of > it results from just different authors contributing to different code paths > in an OSS project. > 2. On picking a sane default precombine field. _hoodie_commit_time is > a good candidate for preCombine field, as you point out, we would just pick > 1/many records with the same key arbitrarily, in that scenario. On > storage/across commits, we would pick the value with the latest > commit_time/last writer wins - which would make queries repeatedly provide > the same consistent values as well. Needs more thought. > 3. If the user desires to customize this behavior, they could supply a > preCombine field that is different. This would be similar to semantics of > event time vs arrival order processing in streaming systems. Personally, I > need to spend a bit more time digging to come up with an elegant solution > here. > 4. For the proposals on how Hudi could de-duplicate, after the fact > that inserts introduced duplicates - I think the current behavior is a bit > more condoning than what I'd like tbh. It updates both the records IIRC. I > think Hudi should ensure record key uniqueness across different paths and > fail the write if it's violated. - if we think of this as in RDBMS lens, > that's what would happen, correct? > > > Love to hear your thoughts. If we can file a JIRA or compile JIRAs with > issues around this, we could discuss out short, long term plans? > > Thanks > Vinoth > > On Sat, Apr 1, 2023 at 3:13 PM Ken Krugler <kkrugler_li...@transpac.com> > wrote: > >> Hi Daniel, >> >> Thanks for the detailed write-up. >> >> I can’t add much to the discussion, other than noting we also recently >> ran into the related oddity that we don’t need to define a precombine when >> writing data to a COW table (using Flink), but then trying to use Spark to >> drop partitions failed because there’s a default precombine field name (set >> to “ts”), and if that field doesn’t exist then the Spark job fails. >> >> — Ken >> >> >> > On Mar 31, 2023, at 1:20 PM, Daniel Kaźmirski <d.kazmir...@gmail.com> >> wrote: >> > >> > Hi all, >> > >> > I would like to bring up the topic of how precombine field is used and >> > what's the purpose of it. I would also like to know what are the plans >> for >> > it in the future. >> > >> > At first glance precombine filed looks like it's only used to >> deduplicate >> > records in incoming batch. >> > But when digging deeper it looks like it can/is also be used to: >> > 1. combine records not before but on write to decide if update existing >> > record (eg with DefaultHoodieRecordPayload) >> > 2. combine records on read for MoR table to combine log and base files >> > correctly. >> > 3. precombine field is required for spark SQL UPDATE, even if user can't >> > provide duplicates anyways with this sql statement. >> > >> > Regarding [3] there's inconsistency as precombine field is not required >> in >> > MERGE INTO UPDATE. Underneath UPSERT is switched to INSERT in upsert >> mode >> > to update existing records. >> > >> > I know that Hudi does a lot of work to ensure PK uniqueness >> across/within >> > partitions and there is a need to deduplicate records before write or to >> > deduplicate existing data if duplicates were introduced eg when using >> > non-strict insert mode. >> > >> > What should then happen in a situation where user does not want or can >> not >> > provide a pre-combine field? Then it's on user not to introduce >> duplicates, >> > but makes Hudi more generic and easier to use for "SQL" people. >> > >> > No precombine is possible for CoW, already, but UPSERT and SQL UPDATE is >> > not supported (but users can update records using Insert in non-strict >> mode >> > or MERGE INTO UPDATE). >> > There's also a difference between CoW and MoR where for MoR >> > precombine field is a hard requirement, but is optional for CoW. >> > (UPDATES with no precombine are also possible in Flink for both CoW and >> MoR >> > but not in Spark.) >> > >> > Would it make sense to take inspiration from some DBMS systems then (eg >> > Synapse) to allow updates and upserts when no precombine field is >> specified? >> > Scenario: >> > Say that duplicates were introduced with Insert in non-strict mode, no >> > precombine field is specified, then we have two options: >> > option 1) on UPDATE/UPSERT Hudi should deduplicate the existing >> records, as >> > there's no precombine field it's expected we don't know which records >> will >> > be removed and which will be effectively updated and preserved in the >> > table. (This can be also achieved by always providing the same value in >> > precombine field for all records.) >> > option 2) on UPDATE/UPSERT Hudi should deduplicate the existing >> records, as >> > there's no precombine field, record with the latest _hoodie_commit_time >> is >> > preserved and updated, other records with the same PK are removed. >> > >> > In both cases, deduplication on UPDATE/UPSERT becomes a hard rule >> > whether we use precombine field or not. >> > >> > Then regarding MoR and merging records on read (found this in Hudi >> format >> > spec), can it be done by only using _hoodie_commit_time in absence of >> > precombine field? >> > If so for both MoR and CoW precombine field can become completely >> optional? >> > >> > I'm of course looking at it more from the user perspective, it would be >> > nice to know what is and what is not possible from the design and >> developer >> > perspective. >> > >> > Best Regards, >> > Daniel Kaźmirski >> >> -------------------------- >> Ken Krugler >> http://www.scaleunlimited.com >> Custom big data solutions >> Flink, Pinot, Solr, Elasticsearch >> >> >> >>