Hi all,

I wrote an integration test to see how Kafka Streams handles exactly
once processing with a custom state store, and it's failing with this
error:

  org.apache.kafka.common.errors.TimeoutException: Failed to update
metadata after 60000 ms

I'm not sure whether this is a bug in my code, a bug in Kafka, or a
bug in the EmbeddedKafkaCluster class I'm using for testing.

The test is available here: https://github.com/dicej/kafka-eos-test,
and it can be run using "gradle test".

The test class itself is
https://github.com/dicej/kafka-eos-test/blob/f892a856d25d6e17e16a6681066a2033b06c5b3f/src/test/java/KafkaIntegrationTest.java.
It's based loosely on the EosIntegrationTest class in the Kafka source
tree.

This is the full trace:

    Exception in thread
"appId-1-9f749063-1a4d-4498-8df4-8619269e36f3-StreamThread-1"
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=0_0, processor=inputTopic, topic=inputTopic,
partition=0, offset=1

        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)

        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)

    Caused by: org.apache.kafka.streams.errors.StreamsException: task
[0_0] exception caught when producing

        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136)

        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87)

        at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:259)

        at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:236)

        at 
KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:227)

        at 
KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:203)

        at 
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)

        at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)

        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)

        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)

        at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)

        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)

        ... 3 more

    Caused by: org.apache.kafka.common.errors.TimeoutException: Failed
to update metadata after 60000 ms.


Any ideas what might be going wrong?

Cheers,
Joel

Reply via email to