Hamidreza Afzali created KAFKA-4828:
---------------------------------------

             Summary: ProcessorTopologyTestDriver does not work when using 
.through()
                 Key: KAFKA-4828
                 URL: https://issues.apache.org/jira/browse/KAFKA-4828
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.2.0
            Reporter: Hamidreza Afzali
            Assignee: Hamidreza Afzali


*Problem:*

ProcessorTopologyTestDriver does not work when testing a topology that uses 
through().

{code}
org.apache.kafka.streams.errors.StreamsException: Store count2's change log 
(count2-topic) does not contain partition 1
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81)
{code}

*Example:*

{code}
object Topology1 {

  def main(args: Array[String]): Unit = {

    val inputTopic = "input"
    val stateStore = "count"
    val stateStore2 = "count2"
    val outputTopic2 = "count2-topic"
    val inputs = Seq[(String, Integer)](("A", 1), ("A", 2))

    val props = new Properties
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")

    val builder = new KStreamBuilder
    builder.stream(Serdes.String, Serdes.Integer, inputTopic)
      .groupByKey(Serdes.String, Serdes.Integer)
      .count(stateStore)
      .through(Serdes.String, Serdes.Long, outputTopic2, stateStore2)

    val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
builder, stateStore, stateStore2)
    inputs.foreach {
      case (key, value) => {
        driver.process(inputTopic, key, value, Serdes.String.serializer, 
Serdes.Integer.serializer)
        val record = driver.readOutput(outputTopic2, 
Serdes.String.deserializer, Serdes.Long.deserializer)
        println(record)
      }
    }
  }
}
{code}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to