[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693853#comment-16693853
 ] 

ASF GitHub Bot commented on KAFKA-7536:
---

guozhangwang closed pull request #5923: KAFKA-7536: Initialize 
TopologyTestDriver with non-null topic
URL: https://github.com/apache/kafka/pull/5923
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 0753b2a8e96..af8b073092b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -33,7 +33,7 @@
 
 public abstract class AbstractProcessorContext implements 
InternalProcessorContext {
 
-static final String NONEXIST_TOPIC = "__null_topic__";
+public static final String NONEXIST_TOPIC = "__null_topic__";
 private final TaskId taskId;
 private final String applicationId;
 private final StreamsConfig config;
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 2abfd6354be..a11ae6b8df0 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -325,7 +325,7 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
 new LogContext()
 );
 globalStateTask.initialize();
-globalProcessorContext.setRecordContext(new 
ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
+globalProcessorContext.setRecordContext(new 
ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC, new 
RecordHeaders()));
 } else {
 globalStateManager = null;
 globalStateTask = null;
@@ -352,7 +352,7 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
 task.initializeStateStores();
 task.initializeTopology();
 context = (InternalProcessorContext) task.context();
-context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, 
null, new RecordHeaders()));
+context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, 
ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders()));
 } else {
 task = null;
 context = null;
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index bead079ce8d..3e95c731d9e 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -889,6 +889,23 @@ private void flushStore() {
 public void close() {}
 }
 
+@Test
+public void shouldAllowPrePopulatingStatesStoresWithCachingEnabled() {
+final Topology topology = new Topology();
+topology.addSource("sourceProcessor", "input-topic");
+topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), 
"sourceProcessor");
+topology.addStateStore(Stores.keyValueStoreBuilder(
+Stores.inMemoryKeyValueStore("aggStore"),
+Serdes.String(),
+Serdes.Long()).withCachingEnabled(), // intentionally turn on 
caching to achieve better test coverage
+"aggregator");
+
+testDriver = new TopologyTestDriver(topology, config);
+
+store = testDriver.getKeyValueStore("aggStore");
+store.put("a", 21L);
+}
+
 @Test
 public void shouldCleanUpPersistentStateStoresOnClose() {
 final Topology topology = new Topology();


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TopologyTestDriver cannot pre-populate KTable or GlobalKTable
> -
>
> Key: KAFKA-7536
> URL: https://issues.apache.org/jira/browse/KAFKA-7536
> Project: Kafka
>  

[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable

2018-11-18 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16691128#comment-16691128
 ] 

Guozhang Wang commented on KAFKA-7536:
--

Yeah I think so.

> TopologyTestDriver cannot pre-populate KTable or GlobalKTable
> -
>
> Key: KAFKA-7536
> URL: https://issues.apache.org/jira/browse/KAFKA-7536
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Dmitry Minkovsky
>Priority: Minor
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable userIdsByEmail = topology  
>.globalTable(USER_IDS_BY_EMAIL.name,
>USER_IDS_BY_EMAIL.consumed(),
>Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
> def topology = // my topology
> def driver = new TopologyTestDriver(topology, config())
> def cleanup() {
> driver.close()
> }
> def "create from email request"() {
> def store = driver.getKeyValueStore('user-ids-by-email')
> store.put('string', ByteString.copyFrom(new byte[0]))
> // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO 
> (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock 
> Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
>   at pony.message.MessageWriteStreamsTest.create from mailgun email 
> request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO 
> (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread 
> [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> The same issue applies to KTable.
> I've noticed that I can {{put()}} to the store if I first write to it with 
> {{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable

2018-11-18 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16691010#comment-16691010
 ] 

Matthias J. Sax commented on KAFKA-7536:


Does this also affect 1.1.1 release? If yes, we should backport to 1.0 branch, 
too.

> TopologyTestDriver cannot pre-populate KTable or GlobalKTable
> -
>
> Key: KAFKA-7536
> URL: https://issues.apache.org/jira/browse/KAFKA-7536
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Dmitry Minkovsky
>Priority: Minor
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable userIdsByEmail = topology  
>.globalTable(USER_IDS_BY_EMAIL.name,
>USER_IDS_BY_EMAIL.consumed(),
>Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
> def topology = // my topology
> def driver = new TopologyTestDriver(topology, config())
> def cleanup() {
> driver.close()
> }
> def "create from email request"() {
> def store = driver.getKeyValueStore('user-ids-by-email')
> store.put('string', ByteString.copyFrom(new byte[0]))
> // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO 
> (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock 
> Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
>   at pony.message.MessageWriteStreamsTest.create from mailgun email 
> request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO 
> (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread 
> [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> The same issue applies to KTable.
> I've noticed that I can {{put()}} to the store if I first write to it with 
> {{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable

2018-11-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690719#comment-16690719
 ] 

ASF GitHub Bot commented on KAFKA-7536:
---

guozhangwang opened a new pull request #5923: KAFKA-7536: Initialize 
TopologyTestDriver with non-null topic
URL: https://github.com/apache/kafka/pull/5923
 
 
   In TopologyTestDriver constructor set non-null topic; and in unit test 
intentionally turn on caching to verify this case.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TopologyTestDriver cannot pre-populate KTable or GlobalKTable
> -
>
> Key: KAFKA-7536
> URL: https://issues.apache.org/jira/browse/KAFKA-7536
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Dmitry Minkovsky
>Priority: Minor
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable userIdsByEmail = topology  
>.globalTable(USER_IDS_BY_EMAIL.name,
>USER_IDS_BY_EMAIL.consumed(),
>Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
> def topology = // my topology
> def driver = new TopologyTestDriver(topology, config())
> def cleanup() {
> driver.close()
> }
> def "create from email request"() {
> def store = driver.getKeyValueStore('user-ids-by-email')
> store.put('string', ByteString.copyFrom(new byte[0]))
> // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO 
> (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock 
> Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
>   at pony.message.MessageWriteStreamsTest.create from mailgun email 
> request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO 
> (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread 
> [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> The same issue applies to KTable.
> I've noticed that I can {{put()}} to the store if I first write to it with 
> {{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable

2018-11-17 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690720#comment-16690720
 ] 

Guozhang Wang commented on KAFKA-7536:
--

[~dminkovsky] I've submitted a PR which I think should resolve your issue, 
please try it out: https://github.com/apache/kafka/pull/5923

> TopologyTestDriver cannot pre-populate KTable or GlobalKTable
> -
>
> Key: KAFKA-7536
> URL: https://issues.apache.org/jira/browse/KAFKA-7536
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Dmitry Minkovsky
>Priority: Minor
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable userIdsByEmail = topology  
>.globalTable(USER_IDS_BY_EMAIL.name,
>USER_IDS_BY_EMAIL.consumed(),
>Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
> def topology = // my topology
> def driver = new TopologyTestDriver(topology, config())
> def cleanup() {
> driver.close()
> }
> def "create from email request"() {
> def store = driver.getKeyValueStore('user-ids-by-email')
> store.put('string', ByteString.copyFrom(new byte[0]))
> // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO 
> (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock 
> Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
>   at pony.message.MessageWriteStreamsTest.create from mailgun email 
> request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO 
> (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread 
> [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> The same issue applies to KTable.
> I've noticed that I can {{put()}} to the store if I first write to it with 
> {{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable

2018-10-26 Thread Dmitry Minkovsky (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665271#comment-16665271
 ] 

Dmitry Minkovsky commented on KAFKA-7536:
-

You're welcome! Thank you for Kafka and Kafka Streams.

> TopologyTestDriver cannot pre-populate KTable or GlobalKTable
> -
>
> Key: KAFKA-7536
> URL: https://issues.apache.org/jira/browse/KAFKA-7536
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Dmitry Minkovsky
>Priority: Minor
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable userIdsByEmail = topology  
>.globalTable(USER_IDS_BY_EMAIL.name,
>USER_IDS_BY_EMAIL.consumed(),
>Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
> def topology = // my topology
> def driver = new TopologyTestDriver(topology, config())
> def cleanup() {
> driver.close()
> }
> def "create from email request"() {
> def store = driver.getKeyValueStore('user-ids-by-email')
> store.put('string', ByteString.copyFrom(new byte[0]))
> // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO 
> (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock 
> Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
>   at pony.message.MessageWriteStreamsTest.create from mailgun email 
> request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO 
> (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread 
> [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> The same issue applies to KTable.
> I've noticed that I can {{put()}} to the store if I first write to it with 
> {{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable

2018-10-24 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662566#comment-16662566
 ] 

Matthias J. Sax commented on KAFKA-7536:


Thanks for the information! Really helpful.

> TopologyTestDriver cannot pre-populate KTable or GlobalKTable
> -
>
> Key: KAFKA-7536
> URL: https://issues.apache.org/jira/browse/KAFKA-7536
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Dmitry Minkovsky
>Priority: Minor
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable userIdsByEmail = topology  
>.globalTable(USER_IDS_BY_EMAIL.name,
>USER_IDS_BY_EMAIL.consumed(),
>Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
> def topology = // my topology
> def driver = new TopologyTestDriver(topology, config())
> def cleanup() {
> driver.close()
> }
> def "create from email request"() {
> def store = driver.getKeyValueStore('user-ids-by-email')
> store.put('string', ByteString.copyFrom(new byte[0]))
> // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO 
> (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock 
> Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
>   at pony.message.MessageWriteStreamsTest.create from mailgun email 
> request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO 
> (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread 
> [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> The same issue applies to KTable.
> I've noticed that I can {{put()}} to the store if I first write to it with 
> {{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable

2018-10-24 Thread Dmitry Minkovsky (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662413#comment-16662413
 ] 

Dmitry Minkovsky commented on KAFKA-7536:
-

Thank you.

Yes, disabling cache makes the issue go away.

I tested, and this affects regular KTable as well. Disabling cache makes that 
go away too. 

Not sure how I will work around this, either by abstracting store creation to 
make disabling cache easy for tests, or just by piping input. Looks like it 
will depend on the situation.

> TopologyTestDriver cannot pre-populate KTable
> -
>
> Key: KAFKA-7536
> URL: https://issues.apache.org/jira/browse/KAFKA-7536
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Dmitry Minkovsky
>Priority: Minor
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable userIdsByEmail = topology  
>.globalTable(USER_IDS_BY_EMAIL.name,
>USER_IDS_BY_EMAIL.consumed(),
>Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
> def topology = // my topology
> def driver = new TopologyTestDriver(topology, config())
> def cleanup() {
> driver.close()
> }
> def "create from email request"() {
> def store = driver.getKeyValueStore('user-ids-by-email')
> store.put('string', ByteString.copyFrom(new byte[0]))
> // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO 
> (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock 
> Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
>   at pony.message.MessageWriteStreamsTest.create from mailgun email 
> request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO 
> (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread 
> [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> I've noticed that I can {{put()}} to the store if I first write to it with 
> {{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)