Ya likewise, I'd expect this to be handled in the Kafka code without the
need for special handling by Beam.  I'll reach out to Kafka mailing list as
well and try to get a better understanding of the root issue.  Thanks for
your time so far John! I'll ping this thread with any interesting findings
or insights.

Thanks,
Evan

On Tue, Sep 13, 2022 at 11:38 AM John Casey via user <user@beam.apache.org>
wrote:

> In principle yes, but I don't see any Beam level code to handle that. I'm
> a bit surprised it isn't handled in the Kafka producer layer itself.
>
> On Tue, Sep 13, 2022 at 11:15 AM Evan Galpin <egal...@apache.org> wrote:
>
>> I'm not certain based on the logs where the disconnect is starting.  I
>> have seen TimeoutExceptions like that mentioned in the SO issue you linked,
>> so if we assume it's starting from the kafka cluster side, my concern is
>> that the producers don't seem to be able to gracefully recover.  Given that
>> restarting the pipeline (in this case, in Dataflow) makes the issue go
>> away, I'm under the impression that producer clients in KafkaIO#write can
>> get into a state that they're not able to recover from after experiencing a
>> disconnect.  Is graceful recovery after cluster unavailability something
>> that would be expected to be supported by KafkaIO today?
>>
>> Thanks,
>> Evan
>>
>> On Tue, Sep 13, 2022 at 11:07 AM John Casey via user <
>> user@beam.apache.org> wrote:
>>
>>> Googling that error message returned
>>> https://stackoverflow.com/questions/71077394/kafka-producer-resetting-the-last-seen-epoch-of-partition-resulting-in-timeout
>>> and
>>> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402
>>>
>>> Which suggests that there is some sort of disconnect happening between
>>> your pipeline and your kafka instance.
>>>
>>> Do you see any logs when this disconnect starts, on the Beam or Kafka
>>> side of things?
>>>
>>> On Tue, Sep 13, 2022 at 10:38 AM Evan Galpin <egal...@apache.org> wrote:
>>>
>>>> Thanks for the quick reply John!  I should also add that the root issue
>>>> is not so much the logging, rather that these log messages seem to be
>>>> correlated with periods where producers are not able to publish data to
>>>> kafka.  The issue of not being able to publish data does not seem to
>>>> resolve until restarting or updating the pipeline.
>>>>
>>>> Here's my publisher config map:
>>>>
>>>>                 .withProducerConfigUpdates(
>>>>                     Map.ofEntries(
>>>>                         Map.entry(
>>>>                             ProducerConfig.PARTITIONER_CLASS_CONFIG,
>>>> DefaultPartitioner.class),
>>>>                         Map.entry(
>>>>                             ProducerConfig.COMPRESSION_TYPE_CONFIG,
>>>> CompressionType.GZIP.name),
>>>>                         Map.entry(
>>>>
>>>> CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
>>>> SecurityProtocol.SASL_SSL.name),
>>>>                         Map.entry(
>>>>                             SaslConfigs.SASL_MECHANISM,
>>>> PlainSaslServer.PLAIN_MECHANISM),
>>>>                         Map.entry(
>>>>                             SaslConfigs.SASL_JAAS_CONFIG,
>>>> "org.apache.kafka.common.security.plain.PlainLoginModule required
>>>> username=\"<api_key>\" password=\"<api_secret>\";")))
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>> On Tue, Sep 13, 2022 at 10:30 AM John Casey <johnjca...@google.com>
>>>> wrote:
>>>>
>>>>> Hi Evan,
>>>>>
>>>>> I haven't seen this before. Can you share your Kafka write
>>>>> configuration, and any other stack traces that could be relevant?
>>>>>
>>>>> John
>>>>>
>>>>> On Tue, Sep 13, 2022 at 10:23 AM Evan Galpin <egal...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I've recently started using the KafkaIO connector as a sink, and am
>>>>>> new to Kafka in general.  My kafka clusters are hosted by Confluent 
>>>>>> Cloud.
>>>>>> I'm using Beam SDK 2.41.0.  At least daily, the producers in my Beam
>>>>>> pipeline are getting stuck in a loop frantically logging this message:
>>>>>>
>>>>>> Node n disconnected.
>>>>>>
>>>>>> Resetting the last seen epoch of partition <topic-partition> to x
>>>>>> since the associated topicId changed from null to <topic_id>
>>>>>>
>>>>>> Updating the running pipeline "resolves" the issue I believe as a
>>>>>> result of recreating the Kafka producer clients, but it seems that as-is
>>>>>> the KafkaIO producer clients are not resilient to node disconnects.  
>>>>>> Might
>>>>>> I be missing a configuration option, or are there any known issues like
>>>>>> this?
>>>>>>
>>>>>> Thanks,
>>>>>> Evan
>>>>>>
>>>>>

Reply via email to