Hi my flow is Kafka Source -> Transform -> JDBC Sink

Kafka Source is configured as at least once and JDBC prevents duplicates
with unique key constraint and duplicate is logged in separate table. So
the destination data is exactly once.

The duplicates happen every so often, looking at check point history there
was some checkpoints that failed, but the history isn't long enough to go
back and look. I'm guessing I will have to adjust the checkpointing times a
bit...

On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, <becket....@gmail.com> wrote:

> Hi John,
>
> The log message you saw from Kafka consumer simply means the consumer was
> disconnected from the broker that FetchRequest was supposed to be sent to.
> The disconnection can happen in many cases, such as broker down, network
> glitches, etc. The KafkaConsumer will just reconnect and retry sending that
> FetchRequest again. This won't cause duplicate messages in KafkaConsumer or
> Flink. Have you enabled exactly-once semantic for your Kafka sink? If not,
> the downstream might see duplicates in case of Flink failover or occasional
> retry in the KafkaProducer of the Kafka sink.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Oct 22, 2020 at 11:38 PM John Smith <java.dev....@gmail.com>
> wrote:
>
>> Any thoughts this doesn't seem to create duplicates all the time or maybe
>> it's unrelated as we are still seeing the message and there is no
>> duplicates...
>>
>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, <java.dev....@gmail.com>
>> wrote:
>>
>>> And yes my downstream is handling the duplicates in an idempotent way so
>>> we are good on that point. But just curious what the behaviour is on the
>>> source consumer when that error happens.
>>>
>>> On Wed, 21 Oct 2020 at 12:04, John Smith <java.dev....@gmail.com> wrote:
>>>
>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-10-
>>>> 21 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler - [
>>>> Consumer clientId=consumer-2, groupId=xxxxxx-import] Error sending
>>>> fetch request (sessionId=806089934, epoch=INITIAL) to node 0:
>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>
>>>> Obviously it looks like the consumer is getting disconnected and from
>>>> what it seems it's either a Kafka bug on the way it handles the EPOCH or
>>>> possibly version mismatch between client and brokers. That's fine I can
>>>> look at upgrading the client and/or Kafka. But I'm trying to understand
>>>> what happens in terms of the source and the sink. It looks let we get
>>>> duplicates on the sink and I'm guessing it's because the consumer is
>>>> failing and at that point Flink stays on that checkpoint until it can
>>>> reconnect and process that offset and hence the duplicates downstream?
>>>>
>>>

Reply via email to