Thanks for the quick response, Smile. I don't use window operators or flatmap. Here is the core logic of my filter, it only iterates on filters list. Will *rebalance() *cause it?
Thanks again. Best regards Rainie SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream = eventStream .rebalance() .process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() { public void processElement( T element, ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context, Collector<SplitterIntermediateRecord<T>> collector) { for (StreamFilter filter : filters) { if (filter.match(element)) { SubstreamConfig substreamConfig = filter.getSubstreamConfig(); SplitterIntermediateRecord<T> result = new SplitterIntermediateRecord<>( substreamConfig.getKafkaCluster(), substreamConfig.getKafkaTopic(), substreamConfig.getCutoverKafkaTopic(), substreamConfig.getCutoverTimestampInMs(), element); collector.collect(result); } } } }) .name("Process-" + eventClass.getSimpleName()); On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com> wrote: > Hi Rainie, > > Could you please provide more information about your processing logic? > Do you use window operators? > If there's no time-based operator in your logic, late arrival data won't be > dropped by default and there might be something wrong with your flat map or > filter operator. Otherwise, you can use sideOutputLateData() to get the > late > data of the window and have a look at them. See [1] for more information > about sideOutputLateData(). > > [1]. > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output > > Regards, > Smile > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >