After some debugging, I figured it out.  The name of my custom store
was "mapStore", so the store tried to log changes to a topic with that
name, but my test case never created such a topic.  Unsurprisingly,
the producer couldn't get metadata for a nonexistent topic, so it
failed with a timeout.

With this trivial fix, the test passes:
https://github.com/dicej/kafka-eos-test/commit/0fdf400350e41092f1973a80e9e4196003859ddc

BTW, in EosIntegrationTest, shouldn't this line read "outputTopic,"
instead of "inputTopic,":

https://github.com/apache/kafka/blob/44167399157b06d566401a3f6c17e9ca901a8c20/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java#L195

As written, the test seems to only verify that data produced to
inputTopic can be consumed from inputTopic, whereas the intention of
the test seems to be that data is copied from inputTopic to
outputTopic, so I would expect it to verify that it actually made it
to outputTopic.


2017-07-11 16:25 GMT-06:00 Matthias J. Sax <matth...@confluent.io>:
> Seems Streams cannot connect (or looses connection) to the brokers. Not
> sure why.
>
> You can also have a look here for our own EOS Streams integration test:
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
>
>
> -Matthias
>
>
> On 7/11/17 2:13 PM, Joel Dice . wrote:
>> Hi all,
>>
>> I wrote an integration test to see how Kafka Streams handles exactly
>> once processing with a custom state store, and it's failing with this
>> error:
>>
>>   org.apache.kafka.common.errors.TimeoutException: Failed to update
>> metadata after 60000 ms
>>
>> I'm not sure whether this is a bug in my code, a bug in Kafka, or a
>> bug in the EmbeddedKafkaCluster class I'm using for testing.
>>
>> The test is available here: https://github.com/dicej/kafka-eos-test,
>> and it can be run using "gradle test".
>>
>> The test class itself is
>> https://github.com/dicej/kafka-eos-test/blob/f892a856d25d6e17e16a6681066a2033b06c5b3f/src/test/java/KafkaIntegrationTest.java.
>> It's based loosely on the EosIntegrationTest class in the Kafka source
>> tree.
>>
>> This is the full trace:
>>
>>     Exception in thread
>> "appId-1-9f749063-1a4d-4498-8df4-8619269e36f3-StreamThread-1"
>> org.apache.kafka.streams.errors.StreamsException: Exception caught in
>> process. taskId=0_0, processor=inputTopic, topic=inputTopic,
>> partition=0, offset=1
>>
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
>>
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
>>
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
>>
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>>
>>     Caused by: org.apache.kafka.streams.errors.StreamsException: task
>> [0_0] exception caught when producing
>>
>>         at 
>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136)
>>
>>         at 
>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87)
>>
>>         at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:259)
>>
>>         at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:236)
>>
>>         at 
>> KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:227)
>>
>>         at 
>> KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:203)
>>
>>         at 
>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
>>
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
>>
>>         at 
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
>>
>>         at 
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
>>
>>         at 
>> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>>
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
>>
>>         ... 3 more
>>
>>     Caused by: org.apache.kafka.common.errors.TimeoutException: Failed
>> to update metadata after 60000 ms.
>>
>>
>> Any ideas what might be going wrong?
>>
>> Cheers,
>> Joel
>>
>

Reply via email to