Kafka source is configured as AT_LEAST_ONCE and the JDBC sink handles
duplicates with unique key/constraint and logs duplicates in a separate SQL
table. And essentially it gives us EXACTLY_ONCE semantics.

That's not a problem, it works great!

1- I was curious if that specific Kafka message was the cause of the
duplicates, but if I understand correctly Becket it's not the source of the
duplicates and I wanted to confirm that.
2- I started monitoring checkpoints on average they are 100ms, during peak
we started seeing checkpoints takie 20s-40s+... My checkpoint is configed
as follows:
     - env.enableCheckpointing(60000);
     -
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
     -
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
     - env.getCheckpointConfig().setCheckpointTimeout(60000);
     - env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
3- Based on above it's possible that the sink takes longer than 60seconds
sometimes...
    - Looking at adjusting timeouts.
    - Looking at reducing the load of the sink and reduce how long it takes
in general.

On Tue, 3 Nov 2020 at 10:49, Robert Metzger <rmetz...@apache.org> wrote:

> How did you configure the Kafka source as at least once? Afaik the source
> is always exactly-once (as long as there aren't any restarts).
>
> Are you seeing the duplicates in the context of restarts of the Flink job?
>
> On Tue, Nov 3, 2020 at 1:54 AM John Smith <java.dev....@gmail.com> wrote:
>
>> Sorry, got confused with your reply... Does the message "Error sending
>> fetch request" cause retries/duplicates down stream or it doesn't?
>>
>> I'm guessing it's even before the source can even send anything
>> downstream...
>>
>>
>> On Sat, 31 Oct 2020 at 09:10, John Smith <java.dev....@gmail.com> wrote:
>>
>>> Hi my flow is Kafka Source -> Transform -> JDBC Sink
>>>
>>> Kafka Source is configured as at least once and JDBC prevents duplicates
>>> with unique key constraint and duplicate is logged in separate table. So
>>> the destination data is exactly once.
>>>
>>> The duplicates happen every so often, looking at check point history
>>> there was some checkpoints that failed, but the history isn't long enough
>>> to go back and look. I'm guessing I will have to adjust the checkpointing
>>> times a bit...
>>>
>>> On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, <becket....@gmail.com>
>>> wrote:
>>>
>>>> Hi John,
>>>>
>>>> The log message you saw from Kafka consumer simply means the consumer
>>>> was disconnected from the broker that FetchRequest was supposed to be sent
>>>> to. The disconnection can happen in many cases, such as broker down,
>>>> network glitches, etc. The KafkaConsumer will just reconnect and retry
>>>> sending that FetchRequest again. This won't cause duplicate messages in
>>>> KafkaConsumer or Flink. Have you enabled exactly-once semantic for your
>>>> Kafka sink? If not, the downstream might see duplicates in case of Flink
>>>> failover or occasional retry in the KafkaProducer of the Kafka sink.
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Thu, Oct 22, 2020 at 11:38 PM John Smith <java.dev....@gmail.com>
>>>> wrote:
>>>>
>>>>> Any thoughts this doesn't seem to create duplicates all the time or
>>>>> maybe it's unrelated as we are still seeing the message and there is no
>>>>> duplicates...
>>>>>
>>>>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, <java.dev....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> And yes my downstream is handling the duplicates in an idempotent way
>>>>>> so we are good on that point. But just curious what the behaviour is on 
>>>>>> the
>>>>>> source consumer when that error happens.
>>>>>>
>>>>>> On Wed, 21 Oct 2020 at 12:04, John Smith <java.dev....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-
>>>>>>> 10-21 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler
>>>>>>> - [Consumer clientId=consumer-2, groupId=xxxxxx-import] Error
>>>>>>> sending fetch request (sessionId=806089934, epoch=INITIAL) to node 0:
>>>>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>>>>
>>>>>>> Obviously it looks like the consumer is getting disconnected and
>>>>>>> from what it seems it's either a Kafka bug on the way it handles the 
>>>>>>> EPOCH
>>>>>>> or possibly version mismatch between client and brokers. That's fine I 
>>>>>>> can
>>>>>>> look at upgrading the client and/or Kafka. But I'm trying to understand
>>>>>>> what happens in terms of the source and the sink. It looks let we get
>>>>>>> duplicates on the sink and I'm guessing it's because the consumer is
>>>>>>> failing and at that point Flink stays on that checkpoint until it can
>>>>>>> reconnect and process that offset and hence the duplicates downstream?
>>>>>>>
>>>>>>

Reply via email to