[ https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690719#comment-16690719 ]
ASF GitHub Bot commented on KAFKA-7536: --------------------------------------- guozhangwang opened a new pull request #5923: KAFKA-7536: Initialize TopologyTestDriver with non-null topic URL: https://github.com/apache/kafka/pull/5923 In TopologyTestDriver constructor set non-null topic; and in unit test intentionally turn on caching to verify this case. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TopologyTestDriver cannot pre-populate KTable or GlobalKTable > ------------------------------------------------------------- > > Key: KAFKA-7536 > URL: https://issues.apache.org/jira/browse/KAFKA-7536 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Dmitry Minkovsky > Priority: Minor > > I have a GlobalKTable that's defined as > {code} > GlobalKTable<String, ByteString> userIdsByEmail = topology > .globalTable(USER_IDS_BY_EMAIL.name, > USER_IDS_BY_EMAIL.consumed(), > Materialized.as("user-ids-by-email")); > {code} > And the following test in Spock: > {code} > def topology = // my topology > def driver = new TopologyTestDriver(topology, config()) > def cleanup() { > driver.close() > } > def "create from email request"() { > def store = driver.getKeyValueStore('user-ids-by-email') > store.put('string', ByteString.copyFrom(new byte[0])) > // more, but it fails at the `put` above > {code} > When I run this, I get the following: > {code} > [2018-10-23 19:35:27,055] INFO > (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock > Restoring state for global store user-ids-by-email > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) > at pony.message.MessageWriteStreamsTest.create from mailgun email > request(MessageWriteStreamsTest.groovy:52) > [2018-10-23 19:35:27,189] INFO > (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread > [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. > {code} > The same issue applies to KTable. > I've noticed that I can {{put()}} to the store if I first write to it with > {{driver.pipeInput}}. But otherwise I get the above error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)