In principle yes, but I don't see any Beam level code to handle that. I'm a
bit surprised it isn't handled in the Kafka producer layer itself.

On Tue, Sep 13, 2022 at 11:15 AM Evan Galpin <egal...@apache.org> wrote:

> I'm not certain based on the logs where the disconnect is starting.  I
> have seen TimeoutExceptions like that mentioned in the SO issue you linked,
> so if we assume it's starting from the kafka cluster side, my concern is
> that the producers don't seem to be able to gracefully recover.  Given that
> restarting the pipeline (in this case, in Dataflow) makes the issue go
> away, I'm under the impression that producer clients in KafkaIO#write can
> get into a state that they're not able to recover from after experiencing a
> disconnect.  Is graceful recovery after cluster unavailability something
> that would be expected to be supported by KafkaIO today?
>
> Thanks,
> Evan
>
> On Tue, Sep 13, 2022 at 11:07 AM John Casey via user <user@beam.apache.org>
> wrote:
>
>> Googling that error message returned
>> https://stackoverflow.com/questions/71077394/kafka-producer-resetting-the-last-seen-epoch-of-partition-resulting-in-timeout
>> and
>> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402
>>
>> Which suggests that there is some sort of disconnect happening between
>> your pipeline and your kafka instance.
>>
>> Do you see any logs when this disconnect starts, on the Beam or Kafka
>> side of things?
>>
>> On Tue, Sep 13, 2022 at 10:38 AM Evan Galpin <egal...@apache.org> wrote:
>>
>>> Thanks for the quick reply John!  I should also add that the root issue
>>> is not so much the logging, rather that these log messages seem to be
>>> correlated with periods where producers are not able to publish data to
>>> kafka.  The issue of not being able to publish data does not seem to
>>> resolve until restarting or updating the pipeline.
>>>
>>> Here's my publisher config map:
>>>
>>>                 .withProducerConfigUpdates(
>>>                     Map.ofEntries(
>>>                         Map.entry(
>>>                             ProducerConfig.PARTITIONER_CLASS_CONFIG,
>>> DefaultPartitioner.class),
>>>                         Map.entry(
>>>                             ProducerConfig.COMPRESSION_TYPE_CONFIG,
>>> CompressionType.GZIP.name),
>>>                         Map.entry(
>>>
>>> CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
>>> SecurityProtocol.SASL_SSL.name),
>>>                         Map.entry(
>>>                             SaslConfigs.SASL_MECHANISM,
>>> PlainSaslServer.PLAIN_MECHANISM),
>>>                         Map.entry(
>>>                             SaslConfigs.SASL_JAAS_CONFIG,
>>> "org.apache.kafka.common.security.plain.PlainLoginModule required
>>> username=\"<api_key>\" password=\"<api_secret>\";")))
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Tue, Sep 13, 2022 at 10:30 AM John Casey <johnjca...@google.com>
>>> wrote:
>>>
>>>> Hi Evan,
>>>>
>>>> I haven't seen this before. Can you share your Kafka write
>>>> configuration, and any other stack traces that could be relevant?
>>>>
>>>> John
>>>>
>>>> On Tue, Sep 13, 2022 at 10:23 AM Evan Galpin <egal...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I've recently started using the KafkaIO connector as a sink, and am
>>>>> new to Kafka in general.  My kafka clusters are hosted by Confluent Cloud.
>>>>> I'm using Beam SDK 2.41.0.  At least daily, the producers in my Beam
>>>>> pipeline are getting stuck in a loop frantically logging this message:
>>>>>
>>>>> Node n disconnected.
>>>>>
>>>>> Resetting the last seen epoch of partition <topic-partition> to x
>>>>> since the associated topicId changed from null to <topic_id>
>>>>>
>>>>> Updating the running pipeline "resolves" the issue I believe as a
>>>>> result of recreating the Kafka producer clients, but it seems that as-is
>>>>> the KafkaIO producer clients are not resilient to node disconnects.  Might
>>>>> I be missing a configuration option, or are there any known issues like
>>>>> this?
>>>>>
>>>>> Thanks,
>>>>> Evan
>>>>>
>>>>

Reply via email to