Yes, The current implementation doesn't leverage transactions on publish
like it does for the source on acking and nacking the deliveries, you can
raise a ticket to support exactly once RMQSinks within the community or
implement the logic yourself.

my checkpoints size is increasing. .... can this lead to state build up. As
> checkpoints might need to keep the state so windows can't purge them??


No, In theory after the window is materialized that instance state is
purged, on checkpointing active window states are recorded, Checkpoints
here record a snapshot of the pipeline rather than the whole progress of an
operator however some other operators do have to accumulate state and flush
on checkpoints (similar to the RMQSource in this case).
Debugging why you have an inflating state might need a deep dive on your
data flow and job performance for bottlenecks and also experiment with
different configurations for rocksdb compaction.



Best Regards
Ahmed Hamdy


On Thu, 18 Jul 2024 at 13:52, banu priya <banuke...@gmail.com> wrote:

> Hi Ahmed,
>
> Thanks for the clarification. I see from flink documentation that Kafka
> sinks are transactional and de duplication happens for it..but it is not
> applicable for RMQ sink.
>
> But i have to use RMQ Sink only due to project requirements .
>
> I am facing one more issue i.e. my check points size is increasing. What I
> understand is after tumbling window state is cleared. I had tumbling window
> (that uses processing time and triggers every 2s) and check point interval
> of 10s, can this lead to state build up. As checkpoints might need to keep
> the state so windows can't purge them??
>
>
> Thanks
> Banu
>
>
>
>
> On Thu, 18 Jul, 2024, 5:55 pm Ahmed Hamdy, <hamdy10...@gmail.com> wrote:
>
>> Hi Banu,
>> yes, regarding the RMQSource, it only acknowledges during checkpoint
>> completion, all the messages after the checkpoint till the next checkpoint
>> completion are grouped to be acknowledged together whether that is during
>> the minimum pause or during the start of the next checkpoint. Failure
>> during this periods will have these unacked messages reprocessed again.
>> Best Regards
>> Ahmed Hamdy
>>
>>
>> On Thu, 18 Jul 2024 at 13:20, banu priya <banuke...@gmail.com> wrote:
>>
>>> Hi Ahmed,
>>>
>>> Thanks a lot for your reply.
>>>
>>> I am planning keep both window time and check point interval same ie 10s.
>>>
>>> Minimum pause between check point is 5s. What happens to the events that
>>> are received during this time??
>>>
>>> Will it be acknowledged at the end of next checkpoint?
>>>
>>> Thanks
>>> Banu
>>>
>>>
>>> On Thu, 18 Jul, 2024, 5:34 pm Ahmed Hamdy, <hamdy10...@gmail.com> wrote:
>>>
>>>> Hi Banu,
>>>> This behavior of source is expected, the guarantee of the RMQSource is
>>>> exactly once which is achieved by acknowledging envelopes on checkpoints
>>>> hence the source would never re-read a message after checkpoint even if it
>>>> was still inside the pipeline and not yet passed to sink, eager
>>>> acknowledgment causes risk of data loss on failure and restoring from a
>>>> previous checkpoint hence breaking all delivery guarantees.
>>>> In concept there is no guarantee that a Flink pipeline achieves end to
>>>> end exactly once without an exactly once sink as well (which is not the
>>>> case for RMQSink).
>>>> In your case, reprocessing is bound by the checkpoint interval which is
>>>> 5 minutes, you can make it tighter if it suits your case better.
>>>>
>>>> Best Regards
>>>> Ahmed Hamdy
>>>>
>>>>
>>>> On Thu, 18 Jul 2024 at 11:37, banu priya <banuke...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> Gentle reminder about bow query.
>>>>>
>>>>> Thanks
>>>>> Banu
>>>>>
>>>>> On Tue, 9 Jul, 2024, 1:42 pm banu priya, <banuke...@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have a Flink job with a RMQ source, tumbling windows (fires for
>>>>>> each 2s), an aggregator, then a RMQ sink. Incremental RocksDB 
>>>>>> checkpointing
>>>>>> is enabled with an interval of 5 minutes.
>>>>>>
>>>>>> I was trying to understand Flink failure recovery. My checkpoint X is
>>>>>> started, I have sent one event to my source. As windows are triggered 
>>>>>> every
>>>>>> 2s, my sink is updated with the aggregated result. But when I checked the
>>>>>> RabbitMQ console, my source queue still had unacked messages. (It is
>>>>>> expected and it is as per design
>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/rabbitmq/#rabbitmq-source
>>>>>> ).
>>>>>>
>>>>>> Now I restarted my task manager, as restart happens within the same
>>>>>> checkpoint interval and checkpoint X has not yet completed. The message 
>>>>>> is
>>>>>> not acknowledged and is sent again. Duplicate processing of events 
>>>>>> happens.
>>>>>>
>>>>>> How to avoid these duplicates?
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> Banu
>>>>>>
>>>>>

Reply via email to