[ https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16661567#comment-16661567 ]
Matthias J. Sax commented on KAFKA-7536: ---------------------------------------- Another workaround might be, to disable caching for the global store? Can you try this? > TopologyTestDriver cannot pre-populate GlobalKTable > --------------------------------------------------- > > Key: KAFKA-7536 > URL: https://issues.apache.org/jira/browse/KAFKA-7536 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Dmitry Minkovsky > Priority: Minor > > I have a GlobalKTable that's defined as > {code} > GlobalKTable<String, ByteString> userIdsByEmail = topology > .globalTable(USER_IDS_BY_EMAIL.name, > USER_IDS_BY_EMAIL.consumed(), > Materialized.as("user-ids-by-email")); > {code} > And the following test in Spock: > {code} > def topology = // my topology > def driver = new TopologyTestDriver(topology, config()) > def cleanup() { > driver.close() > } > def "create from email request"() { > def store = driver.getKeyValueStore('user-ids-by-email') > store.put('string', ByteString.copyFrom(new byte[0])) > // more, but it fails at the `put` above > {code} > When I run this, I get the following: > {code} > [2018-10-23 19:35:27,055] INFO > (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock > Restoring state for global store user-ids-by-email > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) > at pony.message.MessageWriteStreamsTest.create from mailgun email > request(MessageWriteStreamsTest.groovy:52) > [2018-10-23 19:35:27,189] INFO > (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread > [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. > {code} > I've noticed that I can {{put()}} to the store if I first write to it with > {{driver.pipeInput}}. But otherwise I get the above error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)