[ 
https://issues.apache.org/jira/browse/KAFKA-4828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4828 started by Hamidreza Afzali.
-----------------------------------------------
> ProcessorTopologyTestDriver does not work when using .through()
> ---------------------------------------------------------------
>
>                 Key: KAFKA-4828
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4828
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Hamidreza Afzali
>            Assignee: Hamidreza Afzali
>              Labels: unit-test
>
> *Problem:*
> ProcessorTopologyTestDriver does not work when testing a topology that uses 
> through().
> {code}
> org.apache.kafka.streams.errors.StreamsException: Store count2's change log 
> (count2-topic) does not contain partition 1
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81)
> {code}
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
>     val inputTopic = "input"
>     val stateStore = "count"
>     val stateStore2 = "count2"
>     val outputTopic2 = "count2-topic"
>     val inputs = Seq[(String, Integer)](("A", 1), ("A", 2))
>     val props = new Properties
>     props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
>     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
>     val builder = new KStreamBuilder
>     builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>       .groupByKey(Serdes.String, Serdes.Integer)
>       .count(stateStore)
>       .through(Serdes.String, Serdes.Long, outputTopic2, stateStore2)
>     val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore, stateStore2)
>     inputs.foreach {
>       case (key, value) => {
>         driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
>         val record = driver.readOutput(outputTopic2, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
>         println(record)
>       }
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to