Rainie,

A restart after a failure can cause data loss if you aren't using
checkpointing, or if you experience a transaction timeout.

A manual restart can also lead to data loss, depending on how you manage
the offsets, transactions, and other state during the restart. What
happened in this case?

David

On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <raini...@pinterest.com> wrote:

> Thanks Yun and David.
> There were some tasks that got restarted. We configured the restart policy
> and the job didn't fail.
> Will task restart cause data loss?
>
> Thanks
> Rainie
>
>
> On Mon, Mar 8, 2021 at 10:42 AM David Anderson <dander...@apache.org>
> wrote:
>
>> Rainie,
>>
>> Were there any failures/restarts, or is this discrepancy observed without
>> any disruption to the processing?
>>
>> Regards,
>> David
>>
>> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <raini...@pinterest.com> wrote:
>>
>>> Thanks for the quick response, Smile.
>>> I don't use window operators or flatmap.
>>> Here is the core logic of my filter, it only iterates on filters list.
>>> Will *rebalance() *cause it?
>>>
>>> Thanks again.
>>> Best regards
>>> Rainie
>>>
>>> SingleOutputStreamOperator<SplitterIntermediateRecord<T>> 
>>> matchedRecordsStream =
>>>     eventStream
>>>         .rebalance()
>>>         .process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
>>>           public void processElement(
>>>               T element,
>>>               ProcessFunction<T, SplitterIntermediateRecord<T>>.Context 
>>> context,
>>>               Collector<SplitterIntermediateRecord<T>> collector) {
>>>             for (StreamFilter filter : filters) {
>>>               if (filter.match(element)) {
>>>                 SubstreamConfig substreamConfig = 
>>> filter.getSubstreamConfig();
>>>                 SplitterIntermediateRecord<T> result = new 
>>> SplitterIntermediateRecord<>(
>>>                     substreamConfig.getKafkaCluster(),
>>>                     substreamConfig.getKafkaTopic(),
>>>                     substreamConfig.getCutoverKafkaTopic(),
>>>                     substreamConfig.getCutoverTimestampInMs(),
>>>                     element);
>>>                 collector.collect(result);
>>>               }
>>>             }
>>>           }
>>>         })
>>>         .name("Process-" + eventClass.getSimpleName());
>>>
>>>
>>> On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com> wrote:
>>>
>>>> Hi Rainie,
>>>>
>>>> Could you please provide more information about your processing logic?
>>>> Do you use window operators?
>>>> If there's no time-based operator in your logic, late arrival data
>>>> won't be
>>>> dropped by default and there might be something wrong with your flat
>>>> map or
>>>> filter operator. Otherwise, you can use sideOutputLateData() to get the
>>>> late
>>>> data of the window and have a look at them. See [1] for more information
>>>> about sideOutputLateData().
>>>>
>>>> [1].
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>
>>>> Regards,
>>>> Smile
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>

Reply via email to