Hi,

Does your implementation read out offset ranges from Kafka partitions?
which means - we can create multiple spark input partitions per Kafka
partitions?
if so, +1 for overall goals here.

How does this affect ordering? Can you think about how/if Hudi write
operations can handle potentially out-of-order events being read out?
It feels like we can add a JIRA for this anyway.



On Thu, Mar 30, 2023 at 10:02 PM 孔维 <18701146...@163.com> wrote:

> Hi team, for the kafka source, when pulling data from kafka, the default
> parallelism is the number of kafka partitions.
> There are cases:
>
> Pulling large amount of data from kafka (eg. maxEvents=100000000), but the
> # of kafka partition is not enough, the procedure of the pulling will cost
> too much of time, even worse cause the executor OOM
> There is huge data skew between kafka partitions, the procedure of the
> pulling will be blocked by the slowest partition
>
> to solve those cases, I want to add a parameter
> hoodie.deltastreamer.kafka.per.batch.maxEvents to control the maxEvents in
> one kafka batch, default Long.MAX_VALUE means not trun this feature on.
> hoodie.deltastreamer.kafka.per.batch.maxEvents  this confiuration will
> take effect after the hoodie.deltastreamer.kafka.source.maxEvents config.
>
>
> Here is my POC of the imporvement:
> max executor core is 128.
> not turn the feature on
> (hoodie.deltastreamer.kafka.source.maxEvents=50000000)
>
>
> turn on the feature (hoodie.deltastreamer.kafka.per.batch.maxEvents=200000)
>
>
> after turn on the feature, the timing of Tagging reduce from 4.4 mins to
> 1.1 mins, can be more faster if given more cores.
>
> How do you think? can I file a jira issue for this?

Reply via email to