Seems Streams cannot connect (or looses connection) to the brokers. Not
sure why.

You can also have a look here for our own EOS Streams integration test:
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java


-Matthias


On 7/11/17 2:13 PM, Joel Dice . wrote:
> Hi all,
> 
> I wrote an integration test to see how Kafka Streams handles exactly
> once processing with a custom state store, and it's failing with this
> error:
> 
>   org.apache.kafka.common.errors.TimeoutException: Failed to update
> metadata after 60000 ms
> 
> I'm not sure whether this is a bug in my code, a bug in Kafka, or a
> bug in the EmbeddedKafkaCluster class I'm using for testing.
> 
> The test is available here: https://github.com/dicej/kafka-eos-test,
> and it can be run using "gradle test".
> 
> The test class itself is
> https://github.com/dicej/kafka-eos-test/blob/f892a856d25d6e17e16a6681066a2033b06c5b3f/src/test/java/KafkaIntegrationTest.java.
> It's based loosely on the EosIntegrationTest class in the Kafka source
> tree.
> 
> This is the full trace:
> 
>     Exception in thread
> "appId-1-9f749063-1a4d-4498-8df4-8619269e36f3-StreamThread-1"
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_0, processor=inputTopic, topic=inputTopic,
> partition=0, offset=1
> 
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
> 
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
> 
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
> 
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
> 
>     Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [0_0] exception caught when producing
> 
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136)
> 
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87)
> 
>         at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:259)
> 
>         at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:236)
> 
>         at 
> KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:227)
> 
>         at 
> KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:203)
> 
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
> 
>         at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> 
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
> 
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
> 
>         at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> 
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
> 
>         ... 3 more
> 
>     Caused by: org.apache.kafka.common.errors.TimeoutException: Failed
> to update metadata after 60000 ms.
> 
> 
> Any ideas what might be going wrong?
> 
> Cheers,
> Joel
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to