Look forward to this! could really help backfill/rebootstrap scenarios. On Tue, Apr 4, 2023 at 9:18 AM Vinoth Chandar <[email protected]> wrote:
> Thinking out loud. > > 1. For insert operations, it should not matter anyway. > 2. For upsert etc, the preCombine would handle the ordering problems. > > Is that what you are saying? I feel we don't want to leak any Kafka > specific logic or force use of special payloads etc. thoughts? > > I assigned the jira to you and also made you a contributor. So in future, > you can self-assign. > > On Mon, Apr 3, 2023 at 7:08 PM 孔维 <[email protected]> wrote: > >> Hi, >> >> >> Yea, we can create multiple spark input partitions per Kafka partition. >> >> >> I think the write operations can handle the potentially out-of-order >> events, because before writing we need to preCombine the incoming events >> using source-ordering-field and we also need to combineAndGetUpdateValue >> with records on storage. From a business perspective, we use the combine >> logic to keep our data correct. And hudi does not require any guarantees >> about the ordering of kafka events. >> >> >> I already filed one JIRA[https://issues.apache.org/jira/browse/HUDI-6019], >> could you help assign the JIRA to me? >> >> >> >> >> >> >> >> At 2023-04-03 23:27:13, "Vinoth Chandar" <[email protected]> wrote: >> >Hi, >> > >> >Does your implementation read out offset ranges from Kafka partitions? >> >which means - we can create multiple spark input partitions per Kafka >> >partitions? >> >if so, +1 for overall goals here. >> > >> >How does this affect ordering? Can you think about how/if Hudi write >> >operations can handle potentially out-of-order events being read out? >> >It feels like we can add a JIRA for this anyway. >> > >> > >> > >> >On Thu, Mar 30, 2023 at 10:02 PM 孔维 <[email protected]> wrote: >> > >> >> Hi team, for the kafka source, when pulling data from kafka, the >> default >> >> parallelism is the number of kafka partitions. >> >> There are cases: >> >> >> >> Pulling large amount of data from kafka (eg. maxEvents=100000000), but >> the >> >> # of kafka partition is not enough, the procedure of the pulling will >> cost >> >> too much of time, even worse cause the executor OOM >> >> There is huge data skew between kafka partitions, the procedure of the >> >> pulling will be blocked by the slowest partition >> >> >> >> to solve those cases, I want to add a parameter >> >> hoodie.deltastreamer.kafka.per.batch.maxEvents to control the >> maxEvents in >> >> one kafka batch, default Long.MAX_VALUE means not trun this feature on. >> >> hoodie.deltastreamer.kafka.per.batch.maxEvents this confiuration will >> >> take effect after the hoodie.deltastreamer.kafka.source.maxEvents >> config. >> >> >> >> >> >> Here is my POC of the imporvement: >> >> max executor core is 128. >> >> not turn the feature on >> >> (hoodie.deltastreamer.kafka.source.maxEvents=50000000) >> >> >> >> >> >> turn on the feature >> (hoodie.deltastreamer.kafka.per.batch.maxEvents=200000) >> >> >> >> >> >> after turn on the feature, the timing of Tagging reduce from 4.4 mins >> to >> >> 1.1 mins, can be more faster if given more cores. >> >> >> >> How do you think? can I file a jira issue for this? >> >
