There's a few related log lines, but there isn't a full stacktrace as the info originates from a logger statement[1] as opposed to thrown exception. The related log lines are like so:
org.apache.kafka.clients.NetworkClient [Producer clientId=producer-109] Disconnecting from node 10 due to socket connection setup timeout. The timeout value is 11436 ms. [2] and org.apache.kafka.clients.NetworkClient [Producer clientId=producer-109] Node 10 disconnected. [3] [1] https://github.com/apache/kafka/blob/f653cb7b5889fd619ab0e6a25216bd981a9d82bf/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402 [2] https://github.com/apache/kafka/blob/1135f22eaf404fdf76489302648199578876c4ac/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L820 [3] https://github.com/apache/kafka/blob/1135f22eaf404fdf76489302648199578876c4ac/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L937 On Tue, Sep 13, 2022 at 12:17 PM Alexey Romanenko <aromanenko....@gmail.com> wrote: > Do you have by any chance the full stacktrace of this error? > > — > Alexey > > On 13 Sep 2022, at 18:05, Evan Galpin <egal...@apache.org> wrote: > > Ya likewise, I'd expect this to be handled in the Kafka code without the > need for special handling by Beam. I'll reach out to Kafka mailing list as > well and try to get a better understanding of the root issue. Thanks for > your time so far John! I'll ping this thread with any interesting findings > or insights. > > Thanks, > Evan > > On Tue, Sep 13, 2022 at 11:38 AM John Casey via user <user@beam.apache.org> > wrote: > >> In principle yes, but I don't see any Beam level code to handle that. I'm >> a bit surprised it isn't handled in the Kafka producer layer itself. >> >> On Tue, Sep 13, 2022 at 11:15 AM Evan Galpin <egal...@apache.org> wrote: >> >>> I'm not certain based on the logs where the disconnect is starting. I >>> have seen TimeoutExceptions like that mentioned in the SO issue you linked, >>> so if we assume it's starting from the kafka cluster side, my concern is >>> that the producers don't seem to be able to gracefully recover. Given that >>> restarting the pipeline (in this case, in Dataflow) makes the issue go >>> away, I'm under the impression that producer clients in KafkaIO#write can >>> get into a state that they're not able to recover from after experiencing a >>> disconnect. Is graceful recovery after cluster unavailability something >>> that would be expected to be supported by KafkaIO today? >>> >>> Thanks, >>> Evan >>> >>> On Tue, Sep 13, 2022 at 11:07 AM John Casey via user < >>> user@beam.apache.org> wrote: >>> >>>> Googling that error message returned >>>> https://stackoverflow.com/questions/71077394/kafka-producer-resetting-the-last-seen-epoch-of-partition-resulting-in-timeout >>>> and >>>> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402 >>>> >>>> Which suggests that there is some sort of disconnect happening between >>>> your pipeline and your kafka instance. >>>> >>>> Do you see any logs when this disconnect starts, on the Beam or Kafka >>>> side of things? >>>> >>>> On Tue, Sep 13, 2022 at 10:38 AM Evan Galpin <egal...@apache.org> >>>> wrote: >>>> >>>>> Thanks for the quick reply John! I should also add that the root >>>>> issue is not so much the logging, rather that these log messages seem to >>>>> be >>>>> correlated with periods where producers are not able to publish data to >>>>> kafka. The issue of not being able to publish data does not seem to >>>>> resolve until restarting or updating the pipeline. >>>>> >>>>> Here's my publisher config map: >>>>> >>>>> .withProducerConfigUpdates( >>>>> Map.ofEntries( >>>>> Map.entry( >>>>> ProducerConfig.PARTITIONER_CLASS_CONFIG, >>>>> DefaultPartitioner.class), >>>>> Map.entry( >>>>> ProducerConfig.COMPRESSION_TYPE_CONFIG, >>>>> CompressionType.GZIP.name <http://compressiontype.gzip.name/>), >>>>> Map.entry( >>>>> >>>>> CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, >>>>> SecurityProtocol.SASL_SSL.name >>>>> <http://securityprotocol.sasl_ssl.name/>), >>>>> Map.entry( >>>>> SaslConfigs.SASL_MECHANISM, >>>>> PlainSaslServer.PLAIN_MECHANISM), >>>>> Map.entry( >>>>> SaslConfigs.SASL_JAAS_CONFIG, >>>>> "org.apache.kafka.common.security.plain.PlainLoginModule required >>>>> username=\"<api_key>\" password=\"<api_secret>\";"))) >>>>> >>>>> Thanks, >>>>> Evan >>>>> >>>>> On Tue, Sep 13, 2022 at 10:30 AM John Casey <johnjca...@google.com> >>>>> wrote: >>>>> >>>>>> Hi Evan, >>>>>> >>>>>> I haven't seen this before. Can you share your Kafka write >>>>>> configuration, and any other stack traces that could be relevant? >>>>>> >>>>>> John >>>>>> >>>>>> On Tue, Sep 13, 2022 at 10:23 AM Evan Galpin <egal...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> I've recently started using the KafkaIO connector as a sink, and am >>>>>>> new to Kafka in general. My kafka clusters are hosted by Confluent >>>>>>> Cloud. >>>>>>> I'm using Beam SDK 2.41.0. At least daily, the producers in my Beam >>>>>>> pipeline are getting stuck in a loop frantically logging this message: >>>>>>> >>>>>>> Node n disconnected. >>>>>>> >>>>>>> Resetting the last seen epoch of partition <topic-partition> to x >>>>>>> since the associated topicId changed from null to <topic_id> >>>>>>> >>>>>>> Updating the running pipeline "resolves" the issue I believe as a >>>>>>> result of recreating the Kafka producer clients, but it seems that as-is >>>>>>> the KafkaIO producer clients are not resilient to node disconnects. >>>>>>> Might >>>>>>> I be missing a configuration option, or are there any known issues like >>>>>>> this? >>>>>>> >>>>>>> Thanks, >>>>>>> Evan >>>>>>> >>>>>> >