Look forward to this! could really help backfill/rebootstrap scenarios.

On Tue, Apr 4, 2023 at 9:18 AM Vinoth Chandar <vin...@apache.org> wrote:

> Thinking out loud.
>
> 1. For insert operations, it should not matter anyway.
> 2. For upsert etc, the preCombine would handle the ordering problems.
>
> Is that what you are saying? I feel we don't want to leak any Kafka
> specific logic or force use of special payloads etc. thoughts?
>
> I assigned the jira to you and also made you a contributor. So in future,
> you can self-assign.
>
> On Mon, Apr 3, 2023 at 7:08 PM 孔维 <18701146...@163.com> wrote:
>
>> Hi,
>>
>>
>> Yea, we can create multiple spark input partitions per Kafka partition.
>>
>>
>> I think the write operations can handle the potentially out-of-order
>> events, because before writing we need to preCombine the incoming events
>> using source-ordering-field and we also need to combineAndGetUpdateValue
>> with records on storage. From a business perspective, we use the combine
>> logic to keep our data correct. And hudi does not require any guarantees
>> about the ordering of kafka events.
>>
>>
>> I already filed one JIRA[https://issues.apache.org/jira/browse/HUDI-6019],
>> could you help assign the JIRA to me?
>>
>>
>>
>>
>>
>>
>>
>> At 2023-04-03 23:27:13, "Vinoth Chandar" <vin...@apache.org> wrote:
>> >Hi,
>> >
>> >Does your implementation read out offset ranges from Kafka partitions?
>> >which means - we can create multiple spark input partitions per Kafka
>> >partitions?
>> >if so, +1 for overall goals here.
>> >
>> >How does this affect ordering? Can you think about how/if Hudi write
>> >operations can handle potentially out-of-order events being read out?
>> >It feels like we can add a JIRA for this anyway.
>> >
>> >
>> >
>> >On Thu, Mar 30, 2023 at 10:02 PM 孔维 <18701146...@163.com> wrote:
>> >
>> >> Hi team, for the kafka source, when pulling data from kafka, the
>> default
>> >> parallelism is the number of kafka partitions.
>> >> There are cases:
>> >>
>> >> Pulling large amount of data from kafka (eg. maxEvents=100000000), but
>> the
>> >> # of kafka partition is not enough, the procedure of the pulling will
>> cost
>> >> too much of time, even worse cause the executor OOM
>> >> There is huge data skew between kafka partitions, the procedure of the
>> >> pulling will be blocked by the slowest partition
>> >>
>> >> to solve those cases, I want to add a parameter
>> >> hoodie.deltastreamer.kafka.per.batch.maxEvents to control the
>> maxEvents in
>> >> one kafka batch, default Long.MAX_VALUE means not trun this feature on.
>> >> hoodie.deltastreamer.kafka.per.batch.maxEvents  this confiuration will
>> >> take effect after the hoodie.deltastreamer.kafka.source.maxEvents
>> config.
>> >>
>> >>
>> >> Here is my POC of the imporvement:
>> >> max executor core is 128.
>> >> not turn the feature on
>> >> (hoodie.deltastreamer.kafka.source.maxEvents=50000000)
>> >>
>> >>
>> >> turn on the feature
>> (hoodie.deltastreamer.kafka.per.batch.maxEvents=200000)
>> >>
>> >>
>> >> after turn on the feature, the timing of Tagging reduce from 4.4 mins
>> to
>> >> 1.1 mins, can be more faster if given more cores.
>> >>
>> >> How do you think? can I file a jira issue for this?
>>
>

Reply via email to