Byron Nikolaidis created KAFKA-4408:
---------------------------------------
Summary: KTable doesn't work with ProcessorTopologyTestDriver in
Kafka 0.10.1.0
Key: KAFKA-4408
URL: https://issues.apache.org/jira/browse/KAFKA-4408
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 0.10.1.0
Environment: Linux
Reporter: Byron Nikolaidis
In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with
KTables. The below test code worked fine under Kafka 0.10.0.1 but now produces
this error:
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException:
task [0_0] Could not find partition info for topic: alertInputTopic
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
at
org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:174)
at
mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41)
{code}
package mil.navy.icap.kafka.streams.processor.track;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
public class ProcessorDriverTest2 {
public static void main(String[] args) throws IOException,
InterruptedException {
System.out.println("ProcessorDriverTest2");
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "ProcessorDriverTest2");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
StreamsConfig streamsConfig = new StreamsConfig(props);
// topology
KStreamBuilder kstreamBuilder = new KStreamBuilder();
StringSerde stringSerde = new StringSerde();
KTable<String, String> table = kstreamBuilder.table(stringSerde,
stringSerde, "alertInputTopic");
table.to(stringSerde, stringSerde, "alertOutputTopic");
// create test driver
ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver(
streamsConfig,
kstreamBuilder,
"alertStore");
StringSerializer serializer = new StringSerializer();
StringDeserializer deserializer = new StringDeserializer();
// send data to input topic
testDriver.process("alertInputTopic",
"the Key", "the Value", serializer, serializer);
// read data from output topic
ProducerRecord<String, String> rec = testDriver.readOutput("alertOutputTopic",
deserializer, deserializer);
System.out.println("rec: " + rec);
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)