[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16693853#comment-16693853 ] ASF GitHub Bot commented on KAFKA-7536: --- guozhangwang closed pull request #5923: KAFKA-7536: Initialize TopologyTestDriver with non-null topic URL: https://github.com/apache/kafka/pull/5923 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 0753b2a8e96..af8b073092b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -33,7 +33,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorContext { -static final String NONEXIST_TOPIC = "__null_topic__"; +public static final String NONEXIST_TOPIC = "__null_topic__"; private final TaskId taskId; private final String applicationId; private final StreamsConfig config; diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 2abfd6354be..a11ae6b8df0 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -325,7 +325,7 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store new LogContext() ); globalStateTask.initialize(); -globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders())); +globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders())); } else { globalStateManager = null; globalStateTask = null; @@ -352,7 +352,7 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store task.initializeStateStores(); task.initializeTopology(); context = (InternalProcessorContext) task.context(); -context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders())); +context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders())); } else { task = null; context = null; diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index bead079ce8d..3e95c731d9e 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -889,6 +889,23 @@ private void flushStore() { public void close() {} } +@Test +public void shouldAllowPrePopulatingStatesStoresWithCachingEnabled() { +final Topology topology = new Topology(); +topology.addSource("sourceProcessor", "input-topic"); +topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor"); +topology.addStateStore(Stores.keyValueStoreBuilder( +Stores.inMemoryKeyValueStore("aggStore"), +Serdes.String(), +Serdes.Long()).withCachingEnabled(), // intentionally turn on caching to achieve better test coverage +"aggregator"); + +testDriver = new TopologyTestDriver(topology, config); + +store = testDriver.getKeyValueStore("aggStore"); +store.put("a", 21L); +} + @Test public void shouldCleanUpPersistentStateStoresOnClose() { final Topology topology = new Topology(); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TopologyTestDriver cannot pre-populate KTable or GlobalKTable > - > > Key: KAFKA-7536 > URL: https://issues.apache.org/jira/browse/KAFKA-7536 > Project: K
[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691128#comment-16691128 ] Guozhang Wang commented on KAFKA-7536: -- Yeah I think so. > TopologyTestDriver cannot pre-populate KTable or GlobalKTable > - > > Key: KAFKA-7536 > URL: https://issues.apache.org/jira/browse/KAFKA-7536 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Dmitry Minkovsky >Priority: Minor > > I have a GlobalKTable that's defined as > {code} > GlobalKTable userIdsByEmail = topology >.globalTable(USER_IDS_BY_EMAIL.name, >USER_IDS_BY_EMAIL.consumed(), >Materialized.as("user-ids-by-email")); > {code} > And the following test in Spock: > {code} > def topology = // my topology > def driver = new TopologyTestDriver(topology, config()) > def cleanup() { > driver.close() > } > def "create from email request"() { > def store = driver.getKeyValueStore('user-ids-by-email') > store.put('string', ByteString.copyFrom(new byte[0])) > // more, but it fails at the `put` above > {code} > When I run this, I get the following: > {code} > [2018-10-23 19:35:27,055] INFO > (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock > Restoring state for global store user-ids-by-email > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) > at pony.message.MessageWriteStreamsTest.create from mailgun email > request(MessageWriteStreamsTest.groovy:52) > [2018-10-23 19:35:27,189] INFO > (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread > [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. > {code} > The same issue applies to KTable. > I've noticed that I can {{put()}} to the store if I first write to it with > {{driver.pipeInput}}. But otherwise I get the above error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691010#comment-16691010 ] Matthias J. Sax commented on KAFKA-7536: Does this also affect 1.1.1 release? If yes, we should backport to 1.0 branch, too. > TopologyTestDriver cannot pre-populate KTable or GlobalKTable > - > > Key: KAFKA-7536 > URL: https://issues.apache.org/jira/browse/KAFKA-7536 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Dmitry Minkovsky >Priority: Minor > > I have a GlobalKTable that's defined as > {code} > GlobalKTable userIdsByEmail = topology >.globalTable(USER_IDS_BY_EMAIL.name, >USER_IDS_BY_EMAIL.consumed(), >Materialized.as("user-ids-by-email")); > {code} > And the following test in Spock: > {code} > def topology = // my topology > def driver = new TopologyTestDriver(topology, config()) > def cleanup() { > driver.close() > } > def "create from email request"() { > def store = driver.getKeyValueStore('user-ids-by-email') > store.put('string', ByteString.copyFrom(new byte[0])) > // more, but it fails at the `put` above > {code} > When I run this, I get the following: > {code} > [2018-10-23 19:35:27,055] INFO > (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock > Restoring state for global store user-ids-by-email > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) > at pony.message.MessageWriteStreamsTest.create from mailgun email > request(MessageWriteStreamsTest.groovy:52) > [2018-10-23 19:35:27,189] INFO > (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread > [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. > {code} > The same issue applies to KTable. > I've noticed that I can {{put()}} to the store if I first write to it with > {{driver.pipeInput}}. But otherwise I get the above error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690719#comment-16690719 ] ASF GitHub Bot commented on KAFKA-7536: --- guozhangwang opened a new pull request #5923: KAFKA-7536: Initialize TopologyTestDriver with non-null topic URL: https://github.com/apache/kafka/pull/5923 In TopologyTestDriver constructor set non-null topic; and in unit test intentionally turn on caching to verify this case. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TopologyTestDriver cannot pre-populate KTable or GlobalKTable > - > > Key: KAFKA-7536 > URL: https://issues.apache.org/jira/browse/KAFKA-7536 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Dmitry Minkovsky >Priority: Minor > > I have a GlobalKTable that's defined as > {code} > GlobalKTable userIdsByEmail = topology >.globalTable(USER_IDS_BY_EMAIL.name, >USER_IDS_BY_EMAIL.consumed(), >Materialized.as("user-ids-by-email")); > {code} > And the following test in Spock: > {code} > def topology = // my topology > def driver = new TopologyTestDriver(topology, config()) > def cleanup() { > driver.close() > } > def "create from email request"() { > def store = driver.getKeyValueStore('user-ids-by-email') > store.put('string', ByteString.copyFrom(new byte[0])) > // more, but it fails at the `put` above > {code} > When I run this, I get the following: > {code} > [2018-10-23 19:35:27,055] INFO > (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock > Restoring state for global store user-ids-by-email > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) > at pony.message.MessageWriteStreamsTest.create from mailgun email > request(MessageWriteStreamsTest.groovy:52) > [2018-10-23 19:35:27,189] INFO > (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread > [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. > {code} > The same issue applies to KTable. > I've noticed that I can {{put()}} to the store if I first write to it with > {{driver.pipeInput}}. But otherwise I get the above error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690720#comment-16690720 ] Guozhang Wang commented on KAFKA-7536: -- [~dminkovsky] I've submitted a PR which I think should resolve your issue, please try it out: https://github.com/apache/kafka/pull/5923 > TopologyTestDriver cannot pre-populate KTable or GlobalKTable > - > > Key: KAFKA-7536 > URL: https://issues.apache.org/jira/browse/KAFKA-7536 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Dmitry Minkovsky >Priority: Minor > > I have a GlobalKTable that's defined as > {code} > GlobalKTable userIdsByEmail = topology >.globalTable(USER_IDS_BY_EMAIL.name, >USER_IDS_BY_EMAIL.consumed(), >Materialized.as("user-ids-by-email")); > {code} > And the following test in Spock: > {code} > def topology = // my topology > def driver = new TopologyTestDriver(topology, config()) > def cleanup() { > driver.close() > } > def "create from email request"() { > def store = driver.getKeyValueStore('user-ids-by-email') > store.put('string', ByteString.copyFrom(new byte[0])) > // more, but it fails at the `put` above > {code} > When I run this, I get the following: > {code} > [2018-10-23 19:35:27,055] INFO > (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock > Restoring state for global store user-ids-by-email > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) > at pony.message.MessageWriteStreamsTest.create from mailgun email > request(MessageWriteStreamsTest.groovy:52) > [2018-10-23 19:35:27,189] INFO > (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread > [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. > {code} > The same issue applies to KTable. > I've noticed that I can {{put()}} to the store if I first write to it with > {{driver.pipeInput}}. But otherwise I get the above error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16665271#comment-16665271 ] Dmitry Minkovsky commented on KAFKA-7536: - You're welcome! Thank you for Kafka and Kafka Streams. > TopologyTestDriver cannot pre-populate KTable or GlobalKTable > - > > Key: KAFKA-7536 > URL: https://issues.apache.org/jira/browse/KAFKA-7536 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Dmitry Minkovsky >Priority: Minor > > I have a GlobalKTable that's defined as > {code} > GlobalKTable userIdsByEmail = topology >.globalTable(USER_IDS_BY_EMAIL.name, >USER_IDS_BY_EMAIL.consumed(), >Materialized.as("user-ids-by-email")); > {code} > And the following test in Spock: > {code} > def topology = // my topology > def driver = new TopologyTestDriver(topology, config()) > def cleanup() { > driver.close() > } > def "create from email request"() { > def store = driver.getKeyValueStore('user-ids-by-email') > store.put('string', ByteString.copyFrom(new byte[0])) > // more, but it fails at the `put` above > {code} > When I run this, I get the following: > {code} > [2018-10-23 19:35:27,055] INFO > (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock > Restoring state for global store user-ids-by-email > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) > at pony.message.MessageWriteStreamsTest.create from mailgun email > request(MessageWriteStreamsTest.groovy:52) > [2018-10-23 19:35:27,189] INFO > (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread > [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. > {code} > The same issue applies to KTable. > I've noticed that I can {{put()}} to the store if I first write to it with > {{driver.pipeInput}}. But otherwise I get the above error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662566#comment-16662566 ] Matthias J. Sax commented on KAFKA-7536: Thanks for the information! Really helpful. > TopologyTestDriver cannot pre-populate KTable or GlobalKTable > - > > Key: KAFKA-7536 > URL: https://issues.apache.org/jira/browse/KAFKA-7536 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Dmitry Minkovsky >Priority: Minor > > I have a GlobalKTable that's defined as > {code} > GlobalKTable userIdsByEmail = topology >.globalTable(USER_IDS_BY_EMAIL.name, >USER_IDS_BY_EMAIL.consumed(), >Materialized.as("user-ids-by-email")); > {code} > And the following test in Spock: > {code} > def topology = // my topology > def driver = new TopologyTestDriver(topology, config()) > def cleanup() { > driver.close() > } > def "create from email request"() { > def store = driver.getKeyValueStore('user-ids-by-email') > store.put('string', ByteString.copyFrom(new byte[0])) > // more, but it fails at the `put` above > {code} > When I run this, I get the following: > {code} > [2018-10-23 19:35:27,055] INFO > (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock > Restoring state for global store user-ids-by-email > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) > at pony.message.MessageWriteStreamsTest.create from mailgun email > request(MessageWriteStreamsTest.groovy:52) > [2018-10-23 19:35:27,189] INFO > (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread > [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. > {code} > The same issue applies to KTable. > I've noticed that I can {{put()}} to the store if I first write to it with > {{driver.pipeInput}}. But otherwise I get the above error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)