Hi all. 

The point I want to highlight is that minibatch join could potentially yield 
incomplete changelog which existing jobs are not supposed to be. For example, 
the scenario that joins two CDC sources after de-duplicating them and the 
output would be used for audit analysis could not accept incomplete changelog. 
While the minibatch processing itself would not introduce any problem. 

The internal behavior of minibatch processing is not well-defined now. I don't 
think reusing the minibatch option for minibatch join is problematic, but 
precise control is necessary to mitigate the risk of generating incomplete 
changelog within minibatch. 

Controlling the behavior on changelog within minibatch should be a global 
option. Therefore, I propose introducing a new option 
'table.exec.mini-batch.compact-changes-enabled' to precisely control changelog 
compaction within minibatch. Then we deprecate the option 
'table.exec.deduplicate.mini-batch.compact-changes-enabled' . The deduplicate 
operator would fall back to follow the newly introduced option and the 
minibatch join would follow it as well. 


> 2024年1月12日 16:30,Jane Chan <qingyue....@gmail.com> 写道:
> 
> Hi shuai,
> 
> Thanks for the update! Regarding the newly introduced configuration, I hold
> the same concern with Benchao and Xuyang.
> 
> First of all, in most cases, the fact that users choose to enable
> mini-batch configuration indicates they are aware of the trade-off between
> throughput and completeness of the changelog.
> And if we finally adopt this configuration solely to avoid state
> incompatibility, does it mean that we will need to introduce a new
> configuration for every future operator's mini-batch optimization, similar
> to what we did today?
> 
> Best,
> Jane
> 
> On Fri, Jan 12, 2024 at 1:45 PM Xuyang <xyzhong...@163.com> wrote:
> 
>> Hi, Xu Shuai. Thanks for driving this flip.
>> 
>> 
>> The CDC message amplification of cascade join has always been a problem
>> for users. Judging from the
>> nexmark results, this optimization is very meaningful. I just have the
>> same doubts as Benchao, why can't we
>> use minibatch join as the default behavior when the user turns on
>> minibatch?
>> 
>> 
>>> Although the semantic of changelog emitted by the Join operator is
>> eventual consistency, the change might
>> not be supposed for the downstream of the job which requires details of
>> changelog.
>> 
>> 
>> I think if the user adds the minibatch options to his job to enable
>> minibatch, he should know that flink will reduce
>> the amount of data sent to downstream by folding CDC messages as much as
>> possible. In scenarios where all
>> details of CDC records need to be retained, such as just synchronizing
>> data with jobs from one db to another db,
>> users have no reason to enable minibatch.
>> 
>> 
>> The only scenario I can think of that requires adding this independent
>> minibatch join option is to ensure that the state
>> is compatible between multiple versions, but we have not promised users
>> state compatibility during cross-version upgrades.
>> 
>> 
>> Maybe we need to figure it out why does the
>> 'table.exec.deduplicate.mini-batch.compact-changes-enabled' option need to
>> be added to deduplicate operator? I think this is the same reason as
>> adding a separate parameter to join to control CDC message folding.
>> 
>> 
>> 
>> 
>> --
>> 
>>    Best!
>>    Xuyang
>> 
>> 
>> 
>> 
>> 
>> 在 2024-01-11 16:19:30,"Benchao Li" <libenc...@apache.org> 写道:
>>>> the change might not be supposed for the downstream of the job which
>> requires details of changelog
>>> 
>>> Could you elaborate on this a bit? I've never met such kinds of
>>> requirements before, I'm curious what is the scenario that requires
>>> this.
>>> 
>>> shuai xu <xushuai...@gmail.com> 于2024年1月11日周四 13:08写道:
>>>> 
>>>> Thanks for your response, Benchao.
>>>> 
>>>> Here is my thought on the newly added option.
>>>> Users' current jobs are running on a version without minibatch join. If
>> the existing option to enable minibatch join is utilized, then when users'
>> jobs are migrated to the new version, the internal behavior of the join
>> operation within the jobs will change. Although the semantic of changelog
>> emitted by the Join operator is eventual consistency, the change might not
>> be supposed for the downstream of the job which requires details of
>> changelog. This newly added option also refers to
>> 'table.exec.deduplicate.mini-batch.compact-changes-enabled'.
>>>> 
>>>> As for the implementation,The new operator shares the state of the
>> original operator and it merely has an additional minibatch for storing
>> records to do some optimization. The storage remains consistent, and there
>> is minor modification to the computational logic.
>>>> 
>>>> Best,
>>>> Xu Shuai
>>>> 
>>>>> 2024年1月10日 22:56,Benchao Li <libenc...@apache.org> 写道:
>>>>> 
>>>>> Thanks shuai for driving this, mini-batch Join is a very useful
>>>>> optimization, +1 for the general idea.
>>>>> 
>>>>> Regarding the configuration
>>>>> "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really
>>>>> necessary. The semantic of changelog emitted by the Join operator is
>>>>> eventual consistency, so there is no much difference between original
>>>>> Join and mini-batch Join from this aspect. Besides, introducing more
>>>>> options would make it more complex for users, harder to understand and
>>>>> maintain, which we should be careful about.
>>>>> 
>>>>> One thing about the implementation, could you make the new operator
>>>>> share the same state definition with the original one?
>>>>> 
>>>>> shuai xu <xushuai...@gmail.com> 于2024年1月10日周三 21:23写道:
>>>>>> 
>>>>>> Hi devs,
>>>>>> 
>>>>>> I’d like to start a discussion on FLIP-415: Introduce a new join
>> operator to support minibatch[1].
>>>>>> 
>>>>>> Currently, when performing cascading connections in Flink, there is
>> a pain point of record amplification. Every record join operator receives
>> would trigger join process. However, if records of +I and -D matches , they
>> could be folded to reduce two times of join process. Besides, records of
>> -U +U might output 4 records in which two records are redundant when
>> encountering outer join .
>>>>>> 
>>>>>> To address this issue, this FLIP introduces a new
>> MiniBatchStreamingJoinOperator to achieve batch processing which could
>> reduce number of outputting redundant messages and avoid unnecessary join
>> processes.
>>>>>> A new option is added to control the operator to avoid influencing
>> existing jobs.
>>>>>> 
>>>>>> Please find more details in the FLIP wiki document [1]. Looking
>>>>>> forward to your feedback.
>>>>>> 
>>>>>> [1]
>>>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
>>>>>> 
>>>>>> Best,
>>>>>> Xu Shuai
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> 
>>>>> Best,
>>>>> Benchao Li
>>>> 
>>> 
>>> 
>>> --
>>> 
>>> Best,
>>> Benchao Li
>> 

Best,
Xu Shuai

Reply via email to