Hi all, I wrote an integration test to see how Kafka Streams handles exactly once processing with a custom state store, and it's failing with this error:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms I'm not sure whether this is a bug in my code, a bug in Kafka, or a bug in the EmbeddedKafkaCluster class I'm using for testing. The test is available here: https://github.com/dicej/kafka-eos-test, and it can be run using "gradle test". The test class itself is https://github.com/dicej/kafka-eos-test/blob/f892a856d25d6e17e16a6681066a2033b06c5b3f/src/test/java/KafkaIntegrationTest.java. It's based loosely on the EosIntegrationTest class in the Kafka source tree. This is the full trace: Exception in thread "appId-1-9f749063-1a4d-4498-8df4-8619269e36f3-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=inputTopic, topic=inputTopic, partition=0, offset=1 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203) at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:259) at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:236) at KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:227) at KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:203) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189) ... 3 more Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. Any ideas what might be going wrong? Cheers, Joel