Thinking out loud. 1. For insert operations, it should not matter anyway. 2. For upsert etc, the preCombine would handle the ordering problems.
Is that what you are saying? I feel we don't want to leak any Kafka specific logic or force use of special payloads etc. thoughts? I assigned the jira to you and also made you a contributor. So in future, you can self-assign. On Mon, Apr 3, 2023 at 7:08 PM 孔维 <18701146...@163.com> wrote: > Hi, > > > Yea, we can create multiple spark input partitions per Kafka partition. > > > I think the write operations can handle the potentially out-of-order > events, because before writing we need to preCombine the incoming events > using source-ordering-field and we also need to combineAndGetUpdateValue > with records on storage. From a business perspective, we use the combine > logic to keep our data correct. And hudi does not require any guarantees > about the ordering of kafka events. > > > I already filed one JIRA[https://issues.apache.org/jira/browse/HUDI-6019], > could you help assign the JIRA to me? > > > > > > > > At 2023-04-03 23:27:13, "Vinoth Chandar" <vin...@apache.org> wrote: > >Hi, > > > >Does your implementation read out offset ranges from Kafka partitions? > >which means - we can create multiple spark input partitions per Kafka > >partitions? > >if so, +1 for overall goals here. > > > >How does this affect ordering? Can you think about how/if Hudi write > >operations can handle potentially out-of-order events being read out? > >It feels like we can add a JIRA for this anyway. > > > > > > > >On Thu, Mar 30, 2023 at 10:02 PM 孔维 <18701146...@163.com> wrote: > > > >> Hi team, for the kafka source, when pulling data from kafka, the default > >> parallelism is the number of kafka partitions. > >> There are cases: > >> > >> Pulling large amount of data from kafka (eg. maxEvents=100000000), but > the > >> # of kafka partition is not enough, the procedure of the pulling will > cost > >> too much of time, even worse cause the executor OOM > >> There is huge data skew between kafka partitions, the procedure of the > >> pulling will be blocked by the slowest partition > >> > >> to solve those cases, I want to add a parameter > >> hoodie.deltastreamer.kafka.per.batch.maxEvents to control the maxEvents > in > >> one kafka batch, default Long.MAX_VALUE means not trun this feature on. > >> hoodie.deltastreamer.kafka.per.batch.maxEvents this confiuration will > >> take effect after the hoodie.deltastreamer.kafka.source.maxEvents > config. > >> > >> > >> Here is my POC of the imporvement: > >> max executor core is 128. > >> not turn the feature on > >> (hoodie.deltastreamer.kafka.source.maxEvents=50000000) > >> > >> > >> turn on the feature > (hoodie.deltastreamer.kafka.per.batch.maxEvents=200000) > >> > >> > >> after turn on the feature, the timing of Tagging reduce from 4.4 mins to > >> 1.1 mins, can be more faster if given more cores. > >> > >> How do you think? can I file a jira issue for this? >