Yeah, I'll open a PR.
2017-07-12 11:07 GMT-06:00 Matthias J. Sax <matth...@confluent.io>: > Ups. That is definitely a bug in the test. Thanks for pointing it out! > > Do you wanna open a PR it? If not, I can take care, too. > > -Matthias > > On 7/12/17 8:48 AM, Joel Dice . wrote: >> After some debugging, I figured it out. The name of my custom store >> was "mapStore", so the store tried to log changes to a topic with that >> name, but my test case never created such a topic. Unsurprisingly, >> the producer couldn't get metadata for a nonexistent topic, so it >> failed with a timeout. >> >> With this trivial fix, the test passes: >> https://github.com/dicej/kafka-eos-test/commit/0fdf400350e41092f1973a80e9e4196003859ddc >> >> BTW, in EosIntegrationTest, shouldn't this line read "outputTopic," >> instead of "inputTopic,": >> >> https://github.com/apache/kafka/blob/44167399157b06d566401a3f6c17e9ca901a8c20/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java#L195 >> >> As written, the test seems to only verify that data produced to >> inputTopic can be consumed from inputTopic, whereas the intention of >> the test seems to be that data is copied from inputTopic to >> outputTopic, so I would expect it to verify that it actually made it >> to outputTopic. >> >> >> 2017-07-11 16:25 GMT-06:00 Matthias J. Sax <matth...@confluent.io>: >>> Seems Streams cannot connect (or looses connection) to the brokers. Not >>> sure why. >>> >>> You can also have a look here for our own EOS Streams integration test: >>> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java >>> >>> >>> -Matthias >>> >>> >>> On 7/11/17 2:13 PM, Joel Dice . wrote: >>>> Hi all, >>>> >>>> I wrote an integration test to see how Kafka Streams handles exactly >>>> once processing with a custom state store, and it's failing with this >>>> error: >>>> >>>> org.apache.kafka.common.errors.TimeoutException: Failed to update >>>> metadata after 60000 ms >>>> >>>> I'm not sure whether this is a bug in my code, a bug in Kafka, or a >>>> bug in the EmbeddedKafkaCluster class I'm using for testing. >>>> >>>> The test is available here: https://github.com/dicej/kafka-eos-test, >>>> and it can be run using "gradle test". >>>> >>>> The test class itself is >>>> https://github.com/dicej/kafka-eos-test/blob/f892a856d25d6e17e16a6681066a2033b06c5b3f/src/test/java/KafkaIntegrationTest.java. >>>> It's based loosely on the EosIntegrationTest class in the Kafka source >>>> tree. >>>> >>>> This is the full trace: >>>> >>>> Exception in thread >>>> "appId-1-9f749063-1a4d-4498-8df4-8619269e36f3-StreamThread-1" >>>> org.apache.kafka.streams.errors.StreamsException: Exception caught in >>>> process. taskId=0_0, processor=inputTopic, topic=inputTopic, >>>> partition=0, offset=1 >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) >>>> >>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task >>>> [0_0] exception caught when producing >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) >>>> >>>> at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:259) >>>> >>>> at KafkaIntegrationTest$MapStore.put(KafkaIntegrationTest.java:236) >>>> >>>> at >>>> KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:227) >>>> >>>> at >>>> KafkaIntegrationTest$StatisticMachine.process(KafkaIntegrationTest.java:203) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) >>>> >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189) >>>> >>>> ... 3 more >>>> >>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Failed >>>> to update metadata after 60000 ms. >>>> >>>> >>>> Any ideas what might be going wrong? >>>> >>>> Cheers, >>>> Joel >>>> >>> >