Hamidreza Afzali created KAFKA-4828:
---------------------------------------
Summary: ProcessorTopologyTestDriver does not work when using
.through()
Key: KAFKA-4828
URL: https://issues.apache.org/jira/browse/KAFKA-4828
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 0.10.2.0
Reporter: Hamidreza Afzali
Assignee: Hamidreza Afzali
*Problem:*
ProcessorTopologyTestDriver does not work when testing a topology that uses
through().
{code}
org.apache.kafka.streams.errors.StreamsException: Store count2's change log
(count2-topic) does not contain partition 1
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81)
{code}
*Example:*
{code}
object Topology1 {
def main(args: Array[String]): Unit = {
val inputTopic = "input"
val stateStore = "count"
val stateStore2 = "count2"
val outputTopic2 = "count2-topic"
val inputs = Seq[(String, Integer)](("A", 1), ("A", 2))
val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
val builder = new KStreamBuilder
builder.stream(Serdes.String, Serdes.Integer, inputTopic)
.groupByKey(Serdes.String, Serdes.Integer)
.count(stateStore)
.through(Serdes.String, Serdes.Long, outputTopic2, stateStore2)
val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props),
builder, stateStore, stateStore2)
inputs.foreach {
case (key, value) => {
driver.process(inputTopic, key, value, Serdes.String.serializer,
Serdes.Integer.serializer)
val record = driver.readOutput(outputTopic2,
Serdes.String.deserializer, Serdes.Long.deserializer)
println(record)
}
}
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)