Yeah, I'll open a PR.

2017-07-12 11:07 GMT-06:00 Matthias J. Sax <matth...@confluent.io>:
> Ups. That is definitely a bug in the test. Thanks for pointing it out!
>
> Do you wanna open a PR it? If not, I can take care, too.
>
> -Matthias
>
> On 7/12/17 8:48 AM, Joel Dice . wrote:
>> After some debugging, I figured it out.  The name of my custom store
>> was "mapStore", so the store tried to log changes to a topic with that
>> name, but my test case never created such a topic.  Unsurprisingly,
>> the producer couldn't get metadata for a nonexistent topic, so it
>> failed with a timeout.
>>
>> With this trivial fix, the test passes:
>> https://github.com/dicej/kafka-eos-test/commit/0fdf400350e41092f1973a80e9e4196003859ddc
>>
>> BTW, in EosIntegrationTest, shouldn't this line read "outputTopic,"
>> instead of "inputTopic,":
>>
>> https://github.com/apache/kafka/blob/44167399157b06d566401a3f6c17e9ca901a8c20/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java#L195
>>
>> As written, the test seems to only verify that data produced to
>> inputTopic can be consumed from inputTopic, whereas the intention of
>> the test seems to be that data is copied from inputTopic to
>> outputTopic, so I would expect it to verify that it actually made it
>> to outputTopic.
>>
>>
>> 2017-07-11 16:25 GMT-06:00 Matthias J. Sax <matth...@confluent.io>:
>>> Seems Streams cannot connect (or looses connection) to the brokers. Not
>>> sure why.
>>>
>>> You can also have a look here for our own EOS Streams integration test:
>>> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 7/11/17 2:13 PM, Joel Dice . wrote:
>>>> Hi all,
>>>>
>>>> I wrote an integration test to see how Kafka Streams handles exactly
>>>> once processing with a custom state store, and it's failing with this
>>>> error:
>>>>
>>>>   org.apache.kafka.common.errors.TimeoutException: Failed to update
>>>> metadata after 60000 ms
>>>>
>>>> I'm not sure whether this is a bug in my code, a bug in Kafka, or a
>>>> bug in the EmbeddedKafkaCluster class I'm using for testing.
>>>>
>>>> The test is available here: https://github.com/dicej/kafka-eos-test,
>>>> and it can be run using "gradle test".
>>>>
>>>> The test class itself is
>>>> https://github.com/dicej/kafka-eos-test/blob/f892a856d25d6e17e16a6681066a2033b06c5b3f/src/test/java/KafkaIntegrationTest.java.
>>>> It's based loosely on the EosIntegrationTest class in the Kafka source
>>>> tree.
>>>>
>>>> This is the full trace:
>>>>
>>>>     Exception in thread
>>>> "appId-1-9f749063-1a4d-4498-8df4-8619269e36f3-StreamThread-1"
>>>> org.apache.kafka.streams.errors.StreamsException: Exception caught in
>>>> process. taskId=0_0, processor=inputTopic, topic=inputTopic,
>>>> partition=0, offset=1
>>>>
>>>>         at 
>>>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
>>>>
>>>>         at 
>>>> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
>>>>
>>>>         at 
>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
>>>>
>>>>         at 
>>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>>>>
>>>>     Caused by: org.apache.kafka.streams.errors.StreamsException: task
>>>> [0_0] exception caught when producing
>>>>
>>>>         at 
>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136)
>>>>
>>>>         at 
>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87)
>>>>
>>>>         at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:259)
>>>>
>>>>         at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:236)
>>>>
>>>>         at 
>>>> KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:227)
>>>>
>>>>         at 
>>>> KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:203)
>>>>
>>>>         at 
>>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
>>>>
>>>>         at 
>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
>>>>
>>>>         at 
>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
>>>>
>>>>         at 
>>>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
>>>>
>>>>         at 
>>>> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>>>>
>>>>         at 
>>>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
>>>>
>>>>         ... 3 more
>>>>
>>>>     Caused by: org.apache.kafka.common.errors.TimeoutException: Failed
>>>> to update metadata after 60000 ms.
>>>>
>>>>
>>>> Any ideas what might be going wrong?
>>>>
>>>> Cheers,
>>>> Joel
>>>>
>>>
>

Reply via email to