Hi Vinoth, I just want to make sure my issue was clear - it seems like Spark shouldn’t be requiring a precombined field (or checking that it exists) when dropping partitions.
Thanks, — Ken > On Apr 4, 2023, at 7:31 AM, Vinoth Chandar <vin...@apache.org> wrote: > > Thanks for raising this issue. > > Love to use this opp to share more context on why the preCombine field > exists. > > - As you probably inferred already, we needed to eliminate duplicates, > while dealing with out-of-order data (e.g database change records arriving > in different orders from two Kafka clusters in two zones). So it was > necessary to preCombine by a "event" field, rather than just the arrival > time (which is what _hoodie_commit_time is). > - This comes from stream processing concepts like > https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/ , > which build upon inadequacies in traditional database systems to deal with > things like this. At the end of the day, we are solving a "processing" > problem IMO with Hudi - Hudi replaces existing batch/streaming pipelines, > not OLTP databases. That's at-least the lens we approached it from. > - For this to work end-end, it is not sufficient to just precombine > within a batch of incoming writes, we also need to consistently apply the > same against data in storage. In CoW, we implicitly merge against storage, > so its simpler. But for MoR, we simply append records to log files, so we > needed to make this a table property - such that queries/compaction can > later do the right preCombine. Hope that clarifies the CoW vs MoR > differences. > > On the issues raised/proposals here. > > 1. I think we need some dedicated efforts across the different writer > paths to make it easier. probably some lower hanging fruits here. Some of > it results from just different authors contributing to different code paths > in an OSS project. > 2. On picking a sane default precombine field. _hoodie_commit_time is a > good candidate for preCombine field, as you point out, we would just pick > 1/many records with the same key arbitrarily, in that scenario. On > storage/across commits, we would pick the value with the latest > commit_time/last writer wins - which would make queries repeatedly provide > the same consistent values as well. Needs more thought. > 3. If the user desires to customize this behavior, they could supply a > preCombine field that is different. This would be similar to semantics of > event time vs arrival order processing in streaming systems. Personally, I > need to spend a bit more time digging to come up with an elegant solution > here. > 4. For the proposals on how Hudi could de-duplicate, after the fact that > inserts introduced duplicates - I think the current behavior is a bit more > condoning than what I'd like tbh. It updates both the records IIRC. I think > Hudi should ensure record key uniqueness across different paths and fail > the write if it's violated. - if we think of this as in RDBMS lens, that's > what would happen, correct? > > > Love to hear your thoughts. If we can file a JIRA or compile JIRAs with > issues around this, we could discuss out short, long term plans? > > Thanks > Vinoth > > On Sat, Apr 1, 2023 at 3:13 PM Ken Krugler <kkrugler_li...@transpac.com> > wrote: > >> Hi Daniel, >> >> Thanks for the detailed write-up. >> >> I can’t add much to the discussion, other than noting we also recently ran >> into the related oddity that we don’t need to define a precombine when >> writing data to a COW table (using Flink), but then trying to use Spark to >> drop partitions failed because there’s a default precombine field name (set >> to “ts”), and if that field doesn’t exist then the Spark job fails. >> >> — Ken >> >> >>> On Mar 31, 2023, at 1:20 PM, Daniel Kaźmirski <d.kazmir...@gmail.com> >> wrote: >>> >>> Hi all, >>> >>> I would like to bring up the topic of how precombine field is used and >>> what's the purpose of it. I would also like to know what are the plans >> for >>> it in the future. >>> >>> At first glance precombine filed looks like it's only used to deduplicate >>> records in incoming batch. >>> But when digging deeper it looks like it can/is also be used to: >>> 1. combine records not before but on write to decide if update existing >>> record (eg with DefaultHoodieRecordPayload) >>> 2. combine records on read for MoR table to combine log and base files >>> correctly. >>> 3. precombine field is required for spark SQL UPDATE, even if user can't >>> provide duplicates anyways with this sql statement. >>> >>> Regarding [3] there's inconsistency as precombine field is not required >> in >>> MERGE INTO UPDATE. Underneath UPSERT is switched to INSERT in upsert mode >>> to update existing records. >>> >>> I know that Hudi does a lot of work to ensure PK uniqueness across/within >>> partitions and there is a need to deduplicate records before write or to >>> deduplicate existing data if duplicates were introduced eg when using >>> non-strict insert mode. >>> >>> What should then happen in a situation where user does not want or can >> not >>> provide a pre-combine field? Then it's on user not to introduce >> duplicates, >>> but makes Hudi more generic and easier to use for "SQL" people. >>> >>> No precombine is possible for CoW, already, but UPSERT and SQL UPDATE is >>> not supported (but users can update records using Insert in non-strict >> mode >>> or MERGE INTO UPDATE). >>> There's also a difference between CoW and MoR where for MoR >>> precombine field is a hard requirement, but is optional for CoW. >>> (UPDATES with no precombine are also possible in Flink for both CoW and >> MoR >>> but not in Spark.) >>> >>> Would it make sense to take inspiration from some DBMS systems then (eg >>> Synapse) to allow updates and upserts when no precombine field is >> specified? >>> Scenario: >>> Say that duplicates were introduced with Insert in non-strict mode, no >>> precombine field is specified, then we have two options: >>> option 1) on UPDATE/UPSERT Hudi should deduplicate the existing records, >> as >>> there's no precombine field it's expected we don't know which records >> will >>> be removed and which will be effectively updated and preserved in the >>> table. (This can be also achieved by always providing the same value in >>> precombine field for all records.) >>> option 2) on UPDATE/UPSERT Hudi should deduplicate the existing records, >> as >>> there's no precombine field, record with the latest _hoodie_commit_time >> is >>> preserved and updated, other records with the same PK are removed. >>> >>> In both cases, deduplication on UPDATE/UPSERT becomes a hard rule >>> whether we use precombine field or not. >>> >>> Then regarding MoR and merging records on read (found this in Hudi format >>> spec), can it be done by only using _hoodie_commit_time in absence of >>> precombine field? >>> If so for both MoR and CoW precombine field can become completely >> optional? >>> >>> I'm of course looking at it more from the user perspective, it would be >>> nice to know what is and what is not possible from the design and >> developer >>> perspective. >>> >>> Best Regards, >>> Daniel Kaźmirski >> >> -------------------------- >> Ken Krugler >> http://www.scaleunlimited.com >> Custom big data solutions >> Flink, Pinot, Solr, Elasticsearch >> >> >> >> -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch