Hi Nick,

What Aljoscha was trying to say is that Flink is not trying to do any
magic. If `KafkaConsumer` - which is being used under the hood of
`FlinkKafkaConsumer` connector - throws an exception, this
exception bubbles up causing the job to failover. If the failure is handled
by the `KafkaConsumer` silently, that's what's happening. As we can in the
TM log that you attached, the latter seems to be happening - note that the
warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
that's not the code we (Flink developers) control.

If you want to change this behaviour, unless someone here on this mailing
list just happens to know the answer, the better place to ask such a
question on the Kafka mailing list. Maybe there is some way to configure
this.

And sorry I don't know much about neither the KafkaConsumer nor the
KafkaBrokers configuration :(

Piotrek

wt., 4 sie 2020 o 22:04 Nick Bendtner <buggi...@gmail.com> napisał(a):

> Hi,
> I don't observe this behaviour though, we use flink 1.7.2 . I stopped
> kafka and zookeeper on all broker nodes. On the flink side, I see the
> messages in the log ( data is obfuscated) . There are no error logs. The
> kafka consumer properties are
>
> 1. "bootstrap.servers"
>
> 2. "zookeeper.connect
>
> 3. "auto.offset.reset"
>
> 4. "group.id"
>
> 5."security.protocol"
>
>
> The flink consumer starts consuming data as soon as the kafka comes back
> up. So I want to know in what scenario/kafka consumer config will the job
> go to failed state after a finite number of restart attempts from
> checkpoint.
>
>
> TM log.
> 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
>                  - [Consumer clientId=consumer-5,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker
> may not be available.
> 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
>                  - [Consumer clientId=consumer-4,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
> yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established. Broker
> may not be available.
> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>                  - [Consumer clientId=consumer-4,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
> yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established. Broker
> may not be available.
> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>                  - [Consumer clientId=consumer-6,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker
> may not be available.
>
> Best,
> Nick
>
> On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>>
>> Flink doesn't do any special failure-handling or retry logic, so it’s up
>> to how the KafkaConsumer is configured via properties. In general Flink
>> doesn’t try to be smart: when something fails an exception fill bubble
>> up that will fail this execution of the job. If checkpoints are enabled
>> this will trigger a restore, this is controlled by the restart strategy.
>> If that eventually gives up the job fill go to “FAILED” and stop.
>>
>> This is the relevant section of the docs:
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html
>>
>> Best,
>> Aljoscha
>>
>> On 15.07.20 17:42, Nick Bendtner wrote:
>> > Hi guys,
>> > I want to know what is the default behavior of Kafka source when a kafka
>> > cluster goes down during streaming. Will the job status go to failing
>> or is
>> > the exception caught and there is a back off before the source tries to
>> > poll for more events ?
>> >
>> >
>> > Best,
>> > Nick.
>> >
>>
>>

Reply via email to