Hi, Does your implementation read out offset ranges from Kafka partitions? which means - we can create multiple spark input partitions per Kafka partitions? if so, +1 for overall goals here.
How does this affect ordering? Can you think about how/if Hudi write operations can handle potentially out-of-order events being read out? It feels like we can add a JIRA for this anyway. On Thu, Mar 30, 2023 at 10:02 PM 孔维 <18701146...@163.com> wrote: > Hi team, for the kafka source, when pulling data from kafka, the default > parallelism is the number of kafka partitions. > There are cases: > > Pulling large amount of data from kafka (eg. maxEvents=100000000), but the > # of kafka partition is not enough, the procedure of the pulling will cost > too much of time, even worse cause the executor OOM > There is huge data skew between kafka partitions, the procedure of the > pulling will be blocked by the slowest partition > > to solve those cases, I want to add a parameter > hoodie.deltastreamer.kafka.per.batch.maxEvents to control the maxEvents in > one kafka batch, default Long.MAX_VALUE means not trun this feature on. > hoodie.deltastreamer.kafka.per.batch.maxEvents this confiuration will > take effect after the hoodie.deltastreamer.kafka.source.maxEvents config. > > > Here is my POC of the imporvement: > max executor core is 128. > not turn the feature on > (hoodie.deltastreamer.kafka.source.maxEvents=50000000) > > > turn on the feature (hoodie.deltastreamer.kafka.per.batch.maxEvents=200000) > > > after turn on the feature, the timing of Tagging reduce from 4.4 mins to > 1.1 mins, can be more faster if given more cores. > > How do you think? can I file a jira issue for this?