[ https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16756751#comment-16756751 ]
John Roesler commented on KAFKA-7882: ------------------------------------- Hi [~nijo], I'll have to look into your examples carefully to think about what might be going on. You're right, I wouldn't expect to see the state stores just randomly closing all the time, unless maybe the consumer isn't checking in with the broker frequently enough and kicking off a rebalance. Running out of buffer space will cause more writes to disk, but shouldn't cause the store to close. Since you mentioned it, I did want to let you know that suppress was released already in 2.1, but unfortunately, it only has an in-memory store at the moment. I've been working on an on-disk alternative, but I took the time to do a bunch of performance tests to evaluate different strategies, so I wasn't able to get it into 2.2. I'm currently aiming for 2.3. Anyhow, if your working set fits in memory, you could upgrade to 2.1 and give suppress a shot. I'll take a closer look at your report above and reply again with a direct answer. Thanks, -John > StateStores are frequently closed during the 'transform' method > --------------------------------------------------------------- > > Key: KAFKA-7882 > URL: https://issues.apache.org/jira/browse/KAFKA-7882 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Mateusz Owczarek > Priority: Major > > Hello, I have a problem with the state store being closed frequently while > transforming upcoming records. To ensure only one record of the same key and > the window comes to an aggregate I have created a custom Transformer (I know > something similar is going to be introduced with suppress method on KTable in > the future, but my implementation is quite simple and imo should work > correctly) with the following implementation: > {code:java} > override def transform(key: Windowed[K], value: V): (Windowed[K], V) = { > val partition = context.partition() > if (partition != -1) store.put(key.key(), (value, partition), > key.window().start()) > else logger.warn(s"-1 partition") > null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the > punctuator callback > } > {code} > > What I do get is the following error: > {code:java} > Store MyStore is currently closed{code} > This problem appears only when the number of streaming threads (or input > topic partitions) is greater than 1 even if I'm just saving to the store and > turn off the punctuation. > If punctuation is present, however, I sometimes get -1 as a partition value > in the transform method. I'm familiar with the basic docs, however, I haven't > found anything that could help me here. > I build my state store like this: > {code:java} > val stateStore = Stores.windowStoreBuilder( > Stores.persistentWindowStore( > stateStoreName, > timeWindows.maintainMs() + timeWindows.sizeMs + > TimeUnit.DAYS.toMillis(1), > timeWindows.segments, > timeWindows.sizeMs, > false > ), > serde[K], > serde[(V, Int)] > ) > {code} > and include it in a DSL API like this: > {code:java} > builder.addStateStore(stateStore) > (...).transform(new MyTransformer(...), "MyStore") > {code} > INB4: I don't close any state stores manually, I gave them retention time as > long as possible for the debugging stage, I tried to hotfix that with the > retry in the transform method and the state stores reopen at the end and the > `put` method works, but this approach is questionable and I am concerned if > it actually works. > Edit: > May this be because of the fact that the > {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low > value? If I understand correctly, spilling to disk is done therefore more > frequently, may it, therefore, cause closing the store? -- This message was sent by Atlassian JIRA (v7.6.3#76005)