[ https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16426277#comment-16426277 ]
Matthias J. Sax commented on KAFKA-6742: ---------------------------------------- Your description of the JIRA is a little unclear to me: bq. This junit test simply fails: What does this exactly mean? Is there an exception? Or does an assertion fail (if yet, which one)? > TopologyTestDriver error when dealing with stores from GlobalKTable > ------------------------------------------------------------------- > > Key: KAFKA-6742 > URL: https://issues.apache.org/jira/browse/KAFKA-6742 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Valentino Proietti > Priority: Minor > > {color:#ff0000}This junit test simply fails:{color} > @Test > *public* *void* globalTable() { > StreamsBuilder builder = *new* StreamsBuilder(); > @SuppressWarnings("unused") > *final* KTable<String,String> localTable = builder > .table("local", > Consumed._with_(Serdes._String_(), Serdes._String_()), > Materialized._as_("localStore")) > ; > @SuppressWarnings("unused") > *final* GlobalKTable<String,String> globalTable = builder > .globalTable("global", > Consumed._with_(Serdes._String_(), Serdes._String_()), > Materialized._as_("globalStore")) > ; > // > Properties props = *new* Properties(); > props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test"); > props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost"); > TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), > props); > // > *final* KeyValueStore<String,String> localStore = > testDriver.getKeyValueStore("localStore"); > Assert._assertNotNull_(localStore); > Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore")); > // > *final* KeyValueStore<String,String> globalStore = > testDriver.getKeyValueStore("globalStore"); > Assert._assertNotNull_(globalStore); > Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore")); > // > *final* ConsumerRecordFactory<String,String> crf = *new* > ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer()); > testDriver.pipeInput(crf.create("local", "one", "TheOne")); > testDriver.pipeInput(crf.create("global", "one", "TheOne")); > // > Assert._assertEquals_("TheOne", localStore.get("one")); > Assert._assertEquals_("TheOne", globalStore.get("one")); > > > {color:#ff0000}to make it work I had to modify the TopologyTestDriver class > as follow:{color} > ... > *public* Map<String, StateStore> getAllStateStores() { > // final Map<String, StateStore> allStores = new HashMap<>(); > // for (final String storeName : > internalTopologyBuilder.allStateStoreName()) > { // allStores.put(storeName, ((ProcessorContextImpl) > task.context()).getStateMgr().getStore(storeName)); // } > // return allStores; > {color:#ff0000}// *FIXME*{color} > *final* ProcessorStateManager psm = ((ProcessorContextImpl) > task.context()).getStateMgr(); > *final* Map<String, StateStore> allStores = *new* HashMap<>(); > *for* (*final* String storeName : > internalTopologyBuilder.allStateStoreName()) { > StateStore res = psm.getStore(storeName); > if (res == null) > res = psm.getGlobalStore(storeName); > allStores.put(storeName, res); > } > *return* allStores; > } > ... > *public* StateStore getStateStore(*final* String name) { > // return ((ProcessorContextImpl) > task.context()).getStateMgr().getStore(name); > {color:#ff0000}// *FIXME*{color} > *final* ProcessorStateManager psm = ((ProcessorContextImpl) > task.context()).getStateMgr(); > StateStore res = psm.getStore(name); > *if* (res == *null*) > res = psm.getGlobalStore(name); > *return* res; > } > > {color:#ff0000}moreover I think it would be very useful to make the internal > MockProducer public for testing cases where a producer is used along side > with the "normal" stream processing by adding the method:{color} > /** > * *@return* records sent with this producer are automatically streamed > to the topology. > */ > *public* *final* Producer<*byte*[], *byte*[]> getProducer() { > return producer; > } > > {color:#ff0000}unfortunately this introduces another problem that could be > verified by adding the following lines to the previous junit test:{color} > ... > ** > // > ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); > // just to serialize keys and values > testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, > cr.timestamp(), cr.key(), cr.value())); > testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, > cr.timestamp(), cr.key(), cr.value())); > testDriver.advanceWallClockTime(0); > Assert._assertEquals_("TheOne", localStore.get("one")); > Assert._assertEquals_("Second", localStore.get("two")); > Assert._assertEquals_("TheOne", globalStore.get("one")); > Assert._assertEquals_("Second", globalStore.get("two")); > } > > {color:#ff0000}that could be fixed with:{color} > > *private* *void* captureOutputRecords() { > // Capture all the records sent to the producer ... > *final* List<ProducerRecord<*byte*[], *byte*[]>> output = > producer.history(); > producer.clear(); > *for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) { > Queue<ProducerRecord<*byte*[], *byte*[]>> outputRecords = > outputRecordsByTopic.get(record.topic()); > *if* (outputRecords == *null*) > { outputRecords = *new* LinkedList<>(); > outputRecordsByTopic.put(record.topic(), outputRecords); } > outputRecords.add(record); > > // Forward back into the topology if the produced record is to an > internal or a source topic ... > *final* String outputTopicName = record.topic(); > *if* (internalTopics.contains(outputTopicName) || > processorTopology.sourceTopics().contains(outputTopicName) > || globalPartitionsByTopic.containsKey(outputTopicName)) { > {color:#ff0000}// *FIXME*{color} > *final* *byte*[] serializedKey = record.key(); > *final* *byte*[] serializedValue = record.value(); > > pipeInput(*new* ConsumerRecord<>( > outputTopicName, > -1, > -1L, > record.timestamp(), > TimestampType.*_CREATE_TIME_*, > 0L, > serializedKey == *null* ? 0 : serializedKey.length, > serializedValue == *null* ? 0 : serializedValue.length, > serializedKey, > serializedValue)); > } > } > } > > > > *Thank you* -- This message was sent by Atlassian JIRA (v7.6.3#76005)