In principle yes, but I don't see any Beam level code to handle that. I'm a bit surprised it isn't handled in the Kafka producer layer itself.
On Tue, Sep 13, 2022 at 11:15 AM Evan Galpin <egal...@apache.org> wrote: > I'm not certain based on the logs where the disconnect is starting. I > have seen TimeoutExceptions like that mentioned in the SO issue you linked, > so if we assume it's starting from the kafka cluster side, my concern is > that the producers don't seem to be able to gracefully recover. Given that > restarting the pipeline (in this case, in Dataflow) makes the issue go > away, I'm under the impression that producer clients in KafkaIO#write can > get into a state that they're not able to recover from after experiencing a > disconnect. Is graceful recovery after cluster unavailability something > that would be expected to be supported by KafkaIO today? > > Thanks, > Evan > > On Tue, Sep 13, 2022 at 11:07 AM John Casey via user <user@beam.apache.org> > wrote: > >> Googling that error message returned >> https://stackoverflow.com/questions/71077394/kafka-producer-resetting-the-last-seen-epoch-of-partition-resulting-in-timeout >> and >> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402 >> >> Which suggests that there is some sort of disconnect happening between >> your pipeline and your kafka instance. >> >> Do you see any logs when this disconnect starts, on the Beam or Kafka >> side of things? >> >> On Tue, Sep 13, 2022 at 10:38 AM Evan Galpin <egal...@apache.org> wrote: >> >>> Thanks for the quick reply John! I should also add that the root issue >>> is not so much the logging, rather that these log messages seem to be >>> correlated with periods where producers are not able to publish data to >>> kafka. The issue of not being able to publish data does not seem to >>> resolve until restarting or updating the pipeline. >>> >>> Here's my publisher config map: >>> >>> .withProducerConfigUpdates( >>> Map.ofEntries( >>> Map.entry( >>> ProducerConfig.PARTITIONER_CLASS_CONFIG, >>> DefaultPartitioner.class), >>> Map.entry( >>> ProducerConfig.COMPRESSION_TYPE_CONFIG, >>> CompressionType.GZIP.name), >>> Map.entry( >>> >>> CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, >>> SecurityProtocol.SASL_SSL.name), >>> Map.entry( >>> SaslConfigs.SASL_MECHANISM, >>> PlainSaslServer.PLAIN_MECHANISM), >>> Map.entry( >>> SaslConfigs.SASL_JAAS_CONFIG, >>> "org.apache.kafka.common.security.plain.PlainLoginModule required >>> username=\"<api_key>\" password=\"<api_secret>\";"))) >>> >>> Thanks, >>> Evan >>> >>> On Tue, Sep 13, 2022 at 10:30 AM John Casey <johnjca...@google.com> >>> wrote: >>> >>>> Hi Evan, >>>> >>>> I haven't seen this before. Can you share your Kafka write >>>> configuration, and any other stack traces that could be relevant? >>>> >>>> John >>>> >>>> On Tue, Sep 13, 2022 at 10:23 AM Evan Galpin <egal...@apache.org> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I've recently started using the KafkaIO connector as a sink, and am >>>>> new to Kafka in general. My kafka clusters are hosted by Confluent Cloud. >>>>> I'm using Beam SDK 2.41.0. At least daily, the producers in my Beam >>>>> pipeline are getting stuck in a loop frantically logging this message: >>>>> >>>>> Node n disconnected. >>>>> >>>>> Resetting the last seen epoch of partition <topic-partition> to x >>>>> since the associated topicId changed from null to <topic_id> >>>>> >>>>> Updating the running pipeline "resolves" the issue I believe as a >>>>> result of recreating the Kafka producer clients, but it seems that as-is >>>>> the KafkaIO producer clients are not resilient to node disconnects. Might >>>>> I be missing a configuration option, or are there any known issues like >>>>> this? >>>>> >>>>> Thanks, >>>>> Evan >>>>> >>>>