Hi team, for the kafka source, when pulling data from kafka, the default 
parallelism is the number of kafka partitions.
There are cases:

Pulling large amount of data from kafka (eg. maxEvents=100000000), but the # of 
kafka partition is not enough, the procedure of the pulling will cost too much 
of time, even worse cause the executor OOM
There is huge data skew between kafka partitions, the procedure of the pulling 
will be blocked by the slowest partition

to solve those cases, I want to add a parameter 
hoodie.deltastreamer.kafka.per.batch.maxEvents to control the maxEvents in one 
kafka batch, default Long.MAX_VALUE means not trun this feature on.
hoodie.deltastreamer.kafka.per.batch.maxEvents  this confiuration will take 
effect after the hoodie.deltastreamer.kafka.source.maxEvents config. 


Here is my POC of the imporvement:
max executor core is 128.
not turn the feature on (hoodie.deltastreamer.kafka.source.maxEvents=50000000)


turn on the feature (hoodie.deltastreamer.kafka.per.batch.maxEvents=200000)


after turn on the feature, the timing of Tagging reduce from 4.4 mins to 1.1 
mins, can be more faster if given more cores.

How do you think? can I file a jira issue for this?

Reply via email to