There's a few related log lines, but there isn't a full stacktrace as the
info originates from a logger statement[1] as opposed to thrown exception.
The related log lines are like so:

org.apache.kafka.clients.NetworkClient [Producer clientId=producer-109]
Disconnecting from node 10 due to socket connection setup timeout. The
timeout value is 11436 ms.    [2]

and

org.apache.kafka.clients.NetworkClient [Producer clientId=producer-109]
Node 10 disconnected.    [3]

[1]
https://github.com/apache/kafka/blob/f653cb7b5889fd619ab0e6a25216bd981a9d82bf/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402
[2]
https://github.com/apache/kafka/blob/1135f22eaf404fdf76489302648199578876c4ac/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L820
[3]
https://github.com/apache/kafka/blob/1135f22eaf404fdf76489302648199578876c4ac/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L937

On Tue, Sep 13, 2022 at 12:17 PM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> Do you have by any chance the full stacktrace of this error?
>
> —
> Alexey
>
> On 13 Sep 2022, at 18:05, Evan Galpin <egal...@apache.org> wrote:
>
> Ya likewise, I'd expect this to be handled in the Kafka code without the
> need for special handling by Beam.  I'll reach out to Kafka mailing list as
> well and try to get a better understanding of the root issue.  Thanks for
> your time so far John! I'll ping this thread with any interesting findings
> or insights.
>
> Thanks,
> Evan
>
> On Tue, Sep 13, 2022 at 11:38 AM John Casey via user <user@beam.apache.org>
> wrote:
>
>> In principle yes, but I don't see any Beam level code to handle that. I'm
>> a bit surprised it isn't handled in the Kafka producer layer itself.
>>
>> On Tue, Sep 13, 2022 at 11:15 AM Evan Galpin <egal...@apache.org> wrote:
>>
>>> I'm not certain based on the logs where the disconnect is starting.  I
>>> have seen TimeoutExceptions like that mentioned in the SO issue you linked,
>>> so if we assume it's starting from the kafka cluster side, my concern is
>>> that the producers don't seem to be able to gracefully recover.  Given that
>>> restarting the pipeline (in this case, in Dataflow) makes the issue go
>>> away, I'm under the impression that producer clients in KafkaIO#write can
>>> get into a state that they're not able to recover from after experiencing a
>>> disconnect.  Is graceful recovery after cluster unavailability something
>>> that would be expected to be supported by KafkaIO today?
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Tue, Sep 13, 2022 at 11:07 AM John Casey via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> Googling that error message returned
>>>> https://stackoverflow.com/questions/71077394/kafka-producer-resetting-the-last-seen-epoch-of-partition-resulting-in-timeout
>>>> and
>>>> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402
>>>>
>>>> Which suggests that there is some sort of disconnect happening between
>>>> your pipeline and your kafka instance.
>>>>
>>>> Do you see any logs when this disconnect starts, on the Beam or Kafka
>>>> side of things?
>>>>
>>>> On Tue, Sep 13, 2022 at 10:38 AM Evan Galpin <egal...@apache.org>
>>>> wrote:
>>>>
>>>>> Thanks for the quick reply John!  I should also add that the root
>>>>> issue is not so much the logging, rather that these log messages seem to 
>>>>> be
>>>>> correlated with periods where producers are not able to publish data to
>>>>> kafka.  The issue of not being able to publish data does not seem to
>>>>> resolve until restarting or updating the pipeline.
>>>>>
>>>>> Here's my publisher config map:
>>>>>
>>>>>                 .withProducerConfigUpdates(
>>>>>                     Map.ofEntries(
>>>>>                         Map.entry(
>>>>>                             ProducerConfig.PARTITIONER_CLASS_CONFIG,
>>>>> DefaultPartitioner.class),
>>>>>                         Map.entry(
>>>>>                             ProducerConfig.COMPRESSION_TYPE_CONFIG,
>>>>> CompressionType.GZIP.name <http://compressiontype.gzip.name/>),
>>>>>                         Map.entry(
>>>>>
>>>>> CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
>>>>> SecurityProtocol.SASL_SSL.name
>>>>> <http://securityprotocol.sasl_ssl.name/>),
>>>>>                         Map.entry(
>>>>>                             SaslConfigs.SASL_MECHANISM,
>>>>> PlainSaslServer.PLAIN_MECHANISM),
>>>>>                         Map.entry(
>>>>>                             SaslConfigs.SASL_JAAS_CONFIG,
>>>>> "org.apache.kafka.common.security.plain.PlainLoginModule required
>>>>> username=\"<api_key>\" password=\"<api_secret>\";")))
>>>>>
>>>>> Thanks,
>>>>> Evan
>>>>>
>>>>> On Tue, Sep 13, 2022 at 10:30 AM John Casey <johnjca...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Evan,
>>>>>>
>>>>>> I haven't seen this before. Can you share your Kafka write
>>>>>> configuration, and any other stack traces that could be relevant?
>>>>>>
>>>>>> John
>>>>>>
>>>>>> On Tue, Sep 13, 2022 at 10:23 AM Evan Galpin <egal...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I've recently started using the KafkaIO connector as a sink, and am
>>>>>>> new to Kafka in general.  My kafka clusters are hosted by Confluent 
>>>>>>> Cloud.
>>>>>>> I'm using Beam SDK 2.41.0.  At least daily, the producers in my Beam
>>>>>>> pipeline are getting stuck in a loop frantically logging this message:
>>>>>>>
>>>>>>> Node n disconnected.
>>>>>>>
>>>>>>> Resetting the last seen epoch of partition <topic-partition> to x
>>>>>>> since the associated topicId changed from null to <topic_id>
>>>>>>>
>>>>>>> Updating the running pipeline "resolves" the issue I believe as a
>>>>>>> result of recreating the Kafka producer clients, but it seems that as-is
>>>>>>> the KafkaIO producer clients are not resilient to node disconnects.  
>>>>>>> Might
>>>>>>> I be missing a configuration option, or are there any known issues like
>>>>>>> this?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Evan
>>>>>>>
>>>>>>
>

Reply via email to