Rainie,

Were there any failures/restarts, or is this discrepancy observed without
any disruption to the processing?

Regards,
David

On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <raini...@pinterest.com> wrote:

> Thanks for the quick response, Smile.
> I don't use window operators or flatmap.
> Here is the core logic of my filter, it only iterates on filters list.
> Will *rebalance() *cause it?
>
> Thanks again.
> Best regards
> Rainie
>
> SingleOutputStreamOperator<SplitterIntermediateRecord<T>> 
> matchedRecordsStream =
>     eventStream
>         .rebalance()
>         .process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
>           public void processElement(
>               T element,
>               ProcessFunction<T, SplitterIntermediateRecord<T>>.Context 
> context,
>               Collector<SplitterIntermediateRecord<T>> collector) {
>             for (StreamFilter filter : filters) {
>               if (filter.match(element)) {
>                 SubstreamConfig substreamConfig = filter.getSubstreamConfig();
>                 SplitterIntermediateRecord<T> result = new 
> SplitterIntermediateRecord<>(
>                     substreamConfig.getKafkaCluster(),
>                     substreamConfig.getKafkaTopic(),
>                     substreamConfig.getCutoverKafkaTopic(),
>                     substreamConfig.getCutoverTimestampInMs(),
>                     element);
>                 collector.collect(result);
>               }
>             }
>           }
>         })
>         .name("Process-" + eventClass.getSimpleName());
>
>
> On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com> wrote:
>
>> Hi Rainie,
>>
>> Could you please provide more information about your processing logic?
>> Do you use window operators?
>> If there's no time-based operator in your logic, late arrival data won't
>> be
>> dropped by default and there might be something wrong with your flat map
>> or
>> filter operator. Otherwise, you can use sideOutputLateData() to get the
>> late
>> data of the window and have a look at them. See [1] for more information
>> about sideOutputLateData().
>>
>> [1].
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>
>> Regards,
>> Smile
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Reply via email to