[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-08-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16573856#comment-16573856
 ] 

ASF GitHub Bot commented on KAFKA-7158:
---

guozhangwang closed pull request #5466: KAFKA-7158: Add unit test for window 
store range queries
URL: https://github.com/apache/kafka/pull/5466
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 3bcb1a29b24..0e7f88ad63c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -16,18 +16,29 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -35,6 +46,8 @@
 import org.junit.Test;
 
 import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
 
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static 
org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
@@ -83,6 +96,97 @@ public void closeStore() {
 cachingStore.close();
 }
 
+@Test
+public void shouldNotReturnDuplicatesInRanges() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final StoreBuilder> storeBuilder = 
Stores.windowStoreBuilder(
+Stores.persistentWindowStore("store-name", 360L, 6L, 
false),
+Serdes.String(),
+Serdes.String())
+.withCachingEnabled();
+
+builder.addStateStore(storeBuilder);
+
+builder.stream(topic,
+Consumed.with(Serdes.String(), Serdes.String()))
+.transform(() -> new Transformer>() {
+private WindowStore store;
+private int numRecordsProcessed;
+
+@Override
+public void init(final ProcessorContext processorContext) {
+this.store = (WindowStore) 
processorContext.getStateStore("store-name");
+int count = 0;
+
+final KeyValueIterator, String> all = 
store.all();
+while (all.hasNext()) {
+count++;
+all.next();
+}
+
+assertThat(count, equalTo(0));
+}
+
+@Override
+public KeyValue transform(final String key, 
final String value) {
+int count = 0;
+
+final KeyValueIterator, String> all = 
store.all();
+while (all.hasNext()) {
+count++;
+all.next();
+}
+assertThat(count, equalTo(numRecordsProcessed));
+
+store.put(value, value);
+
+numRecordsProcessed++;
+
+return new KeyValue<>(key, value);
+}
+
+@Override
+public void close() {
+
+}
+}, "store-name");
+
+final String bootstrapServers = "localhost:9092";
+final Properties streamsConfiguration = new Properties

[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-08-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570769#comment-16570769
 ] 

ASF GitHub Bot commented on KAFKA-7158:
---

guozhangwang opened a new pull request #5466: [NOT MERGE] KAFKA-7158: Add unit 
test for window store range queries
URL: https://github.com/apache/kafka/pull/5466
 
 
   While debugging the reported issue, I found that our current unit test lacks 
coverage to actually expose the underlying root cause.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Duplicates when searching kafka stream state store with caching
> ---
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Christian Henry
>Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue transform(byte[] key, String guid) {
> try (WindowStoreIterator iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [1, 2, ... 9] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 1 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("1", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 6, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> -- next() returns 6
> and 
> -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> -- next() returns 6
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-08-04 Thread Christian Henry (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569352#comment-16569352
 ] 

Christian Henry commented on KAFKA-7158:


Ah, glad you were able to reproduce it and that there's already a valid fix to 
backport! Thanks :) 

> Duplicates when searching kafka stream state store with caching
> ---
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Christian Henry
>Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue transform(byte[] key, String guid) {
> try (WindowStoreIterator iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [1, 2, ... 9] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 1 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("1", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 6, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> -- next() returns 6
> and 
> -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> -- next() returns 6
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-08-04 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569349#comment-16569349
 ] 

Guozhang Wang commented on KAFKA-7158:
--

Actually I made the claim too quick :)

I retried multiple times on trunk but cannot reproduce the issue. But when 
running it on 2.0 branch I can indeed reproduce it, which means that some 
patches in trunk have already fixed it.

I browsed through the commits since 2.0 and nailed down this one: 
https://issues.apache.org/jira/browse/KAFKA-7080

I can confirm that the commit right before this patch can still re-produce the 
issue easily, and the commit right after it cannot reproduce the issue any 
more. By looking into the PR carefully, I realize it is actually a pretty 
critical bug fix as it is quite common that because of the wrong 
segmentInterval value, the segmentid used in the cache would be wrong, and as a 
result the cache segment that puts call would be access to is wrong, causing 
duplicated writes. And this is what [~cah6] has observed.


[~vvcephei], since you fixed KAFKA-7080, could you also file a PR for 2.0 
branch since it is actually a pretty critical bug.

> Duplicates when searching kafka stream state store with caching
> ---
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Christian Henry
>Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue transform(byte[] key, String guid) {
> try (WindowStoreIterator iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [1, 2, ... 9] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 1 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("1", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 6, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> -- next() returns 6
> and 
> -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> -- next() returns 6
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-08-03 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568970#comment-16568970
 ] 

Guozhang Wang commented on KAFKA-7158:
--

Hi [~cah6] I've tried your code on my laptop for 10+ times but still cannot 
reproduce this issue.. on the other hand, could you try replace

https://gist.github.com/cah6/adc2c52514f5386597a4bba6c429ff63#file-duplicateexample-java-L32

by using a random generated folder, such as

{code}
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
{code}

And try again?

> Duplicates when searching kafka stream state store with caching
> ---
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Christian Henry
>Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue transform(byte[] key, String guid) {
> try (WindowStoreIterator iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [1, 2, ... 9] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 1 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("1", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 6, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> -- next() returns 6
> and 
> -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> -- next() returns 6
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-08-02 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16567559#comment-16567559
 ] 

Guozhang Wang commented on KAFKA-7158:
--

Thank you [~cah6]. I will try it out.

> Duplicates when searching kafka stream state store with caching
> ---
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Christian Henry
>Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue transform(byte[] key, String guid) {
> try (WindowStoreIterator iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [1, 2, ... 9] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 1 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("1", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 6, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> -- next() returns 6
> and 
> -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> -- next() returns 6
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-08-02 Thread Christian Henry (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16567072#comment-16567072
 ] 

Christian Henry commented on KAFKA-7158:


Sorry for the delay, finally got some time to provide a minimal example. I 
found that the easiest way to set up and share a minimal example was via 
running stuff through the confluent platform, 
[https://www.confluent.io/download/]. I'm using v5.0.0 of that, which uses 
kafka 2.0.0.

So my steps are:

1) start platform with defaults
{code:java}
./bin/confluent start{code}
2) create input topic
{code:java}
bin/kafka-topics --create --topic duplicate-examples-input --zookeeper 
localhost:2181 --partitions 1 --replication-factor 1{code}
3) Run [https://gist.github.com/cah6/adc2c52514f5386597a4bba6c429ff63] to 
create application and monitor store values. I put my example in a local clone 
of [https://github.com/confluentinc/kafka-streams-examples] so that I didn't 
have to worry about project setup at all. Wait for "In init..." to get printed 
out.

4) Run [https://gist.github.com/cah6/04c09cc9747394d38182078d32b2a2d0] to push 
some values. Do this a few times, spaced apart by 5-10 seconds.

Running the driver 3 times on my machine resulted in this following. Final 
value SHOULD be 14:
{code:java}
In init: number of items in store is: 0
Number of items in store is: 0
Number of items in store is: 1
Number of items in store is: 2
Number of items in store is: 3
Number of items in store is: 4
Number of items in store is: 6
Number of items in store is: 7
Number of items in store is: 8
Number of items in store is: 9
Number of items in store is: 10
Number of items in store is: 17
Number of items in store is: 18
Number of items in store is: 19
Number of items in store is: 20
Number of items in store is: 21
{code}
Note that the incremental values in each run were fine, but somehow the number 
of objects in the store jumps between runs. 

 

> Duplicates when searching kafka stream state store with caching
> ---
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Christian Henry
>Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue transform(byte[] key, String guid) {
> try (WindowStoreIterator iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [1, 2, ... 9] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 1 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("1", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 6, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> -- next() returns 6
> and 
> -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> -- next() returns 6
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when

[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-08-02 Thread Christian Henry (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16567054#comment-16567054
 ] 

Christian Henry commented on KAFKA-7158:


Sorry for the delay, finally got some time to provide a minimal example. I 
found that the easiest way to set up and share a minimal example was with 
running stuff through the confluent platform, 
[https://www.confluent.io/download/]. Using v5.0.0 of that, which uses kafka 
2.0.0.

So my steps are:

1) start platform with defaults
{code:java}
./bin/confluent start
{code}

> Duplicates when searching kafka stream state store with caching
> ---
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Christian Henry
>Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue transform(byte[] key, String guid) {
> try (WindowStoreIterator iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [1, 2, ... 9] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 1 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("1", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 6, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> -- next() returns 6
> and 
> -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> -- next() returns 6
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-07-23 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16553499#comment-16553499
 ] 

Guozhang Wang commented on KAFKA-7158:
--

Hi Christian, I've tried the above example code on both 1.1 and trunk (2.0) but 
cannot reproduce the issue.. it would be great if you can extract your test 
into a minimal snippet that can expose this issue.

> Duplicates when searching kafka stream state store with caching
> ---
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Christian Henry
>Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue transform(byte[] key, String guid) {
> try (WindowStoreIterator iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [1, 2, ... 9] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 1 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("1", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 6, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> -- next() returns 6
> and 
> -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> -- next() returns 6
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-07-23 Thread Christian Henry (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16553210#comment-16553210
 ] 

Christian Henry commented on KAFKA-7158:


I don't think we explicitly set the cache size, so whatever the default is? I 
didn't initially attach a code snippet since I found this in our "integration" 
tests which have a fair amount of code to define the topology and get the 
application running, though I can try to go back and get some minimal snippet 
if it would help.

That snippet you provided is almost exactly what we did to reproduce this, just 
without the context.setTime line. We would find that the count was usually 
significantly higher than 10,000. 

> Duplicates when searching kafka stream state store with caching
> ---
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Christian Henry
>Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue transform(byte[] key, String guid) {
> try (WindowStoreIterator iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [1, 2, ... 9] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 1 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("1", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 6, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> -- next() returns 6
> and 
> -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> -- next() returns 6
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-07-23 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16553171#comment-16553171
 ] 

Guozhang Wang commented on KAFKA-7158:
--

How large is your cache size? Actually, if you have a code snippet that can 
reproduce this issue locally, could you share with me?

> Duplicates when searching kafka stream state store with caching
> ---
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Christian Henry
>Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue transform(byte[] key, String guid) {
> try (WindowStoreIterator iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [1, 2, ... 9] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 1 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("1", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 6, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> -- next() returns 6
> and 
> -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> -- next() returns 6
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-07-23 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16553170#comment-16553170
 ] 

Guozhang Wang commented on KAFKA-7158:
--

I tried the following test on 1.1 branch, is that representative of your issue:

{code}
for (int i = 0; i < 1; i++) {
context.setTime(i);
cachingStore.put(bytesKey("key" + i), bytesValue("value" + i));
}

final KeyValueIterator, byte[]> iterator = 
cachingStore.all();
int count = 0;
while (iterator.hasNext()) {
System.out.println(iterator.next());
count++;
}
System.out.println(count);
{code}

> Duplicates when searching kafka stream state store with caching
> ---
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Christian Henry
>Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue transform(byte[] key, String guid) {
> try (WindowStoreIterator iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [1, 2, ... 9] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 1 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("1", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 6, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> -- next() returns 6
> and 
> -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> -- next() returns 6
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-07-23 Thread Christian Henry (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16553161#comment-16553161
 ] 

Christian Henry commented on KAFKA-7158:


Sure, our store builder we're providing to Kaka Streams looks like this:
{code:java}
StoreBuilder> builder = Stores.windowStoreBuilder(
Stores.persistentWindowStore("store-name", 360, 3, 6, false),
Serdes.String(),
Serdes.String())
.withCachingEnabled()
{code}

> Duplicates when searching kafka stream state store with caching
> ---
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Christian Henry
>Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue transform(byte[] key, String guid) {
> try (WindowStoreIterator iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [1, 2, ... 9] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 1 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("1", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 6, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> -- next() returns 6
> and 
> -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> -- next() returns 6
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7158) Duplicates when searching kafka stream state store with caching

2018-07-23 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16553153#comment-16553153
 ] 

Guozhang Wang commented on KAFKA-7158:
--

Hello [~cah6] I'm trying to reproduce your observed issue locally so that I can 
dig further into this, could you share your code snippet for defining the 
{{duplicateStore}}?

> Duplicates when searching kafka stream state store with caching
> ---
>
> Key: KAFKA-7158
> URL: https://issues.apache.org/jira/browse/KAFKA-7158
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Christian Henry
>Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue transform(byte[] key, String guid) {
> try (WindowStoreIterator iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
> if (iterator.hasNext()) {
> return null;
> } else {
> duplicateStore.put(correlationId, some metadata);
> return new KeyValue<>(key, message);
> }
> }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [1, 2, ... 9] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 1 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("1", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 6, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 6, storeKey = 2)
> -- next() returns 6
> and 
> -- compareSegmentedKeys(cacheKey = 7, storeKey = 6)
> -- next() returns 6
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)