Ok, Matthias, thanks for the clarification. This makes sense to me. Glad I learned something new about kafka-streams. Even if it was the hard way ;-)
Greetings Jan On Wed, Apr 1, 2020 at 11:52 PM Matthias J. Sax <mj...@apache.org> wrote: > That is expected behavior. > > And yes, there is a `Transformer` instance per partition with it's own > store that holds one shard of the overall state. The reason is, that you > could run one KafkaStreams instance per partition on different > hosts/servers and thus, we need to have a `Transformer` and state-store > per partition. > > It's also by design that `transform()` does not do auto-repartitioning > because it's Processor API integration, and when using the Processor API > it's the developers responsibility to reason about correct data > partitioning. > > > -Matthias > > On 4/1/20 2:05 PM, Jan Bols wrote: > > Ok, Matthias, > > > > thanks for the hint: > > *Even if any upstream operation was key-changing, no auto-repartition is > > triggered. If repartitioning is required, a call to through() > > < > https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#through-java.lang.String- > > > > should be performed before flatTransformValues(). * > > > > Of course, I didn't call *through* before calling the transformer. As a > > result some calls where being processed by another instance of the > > transformer running on a different partition. Calling *store.get(key)* on > > an instance would then not return any value even though another instance > > did a *store.put(key, value)* before. Is this expected behaviour? Is > there > > a transformer for each partition and does it get its own state store? > > > > Best regards > > > > Jan > > > > On Fri, Mar 27, 2020 at 12:59 AM Matthias J. Sax <mj...@apache.org> > wrote: > > > >> Your code looks correct to me. If you write into the store, you should > >> also be able to read it back from the store. > >> > >> Can you reproduce the issue using `TopologyTestDriver`? How many > >> partitions does your input topic have? Is your stream partitioned by > >> key? Note that `transfrom()` does not do auto-repartitioning in contrast > >> to `groupByKey()`. > >> > >> > >> -Matthias > >> > >> On 3/25/20 3:49 AM, Jan Bols wrote: > >>> Hi all, > >>> I'm trying to aggregate a stream of messages and return a stream of > >>> aggregated results using kafka streams. > >>> At some point, depending on the incoming message, the old aggregate > needs > >>> to be closed and a new aggregate needs to be created, just like a > session > >>> that is closed due to some close event and at the same time a new > session > >>> is started. > >>> > >>> For this I'm using transformValues where I store the result of an > >>> aggregation similar to how a groupByKey().aggregate() is done. When the > >> old > >>> session needs to be closed, it's sent first after the new value. > >>> > >>> The state store returns null for a given key at first retrieval and the > >> new > >>> aggregation result is stored under the same key. > >>> However, at the second pass, the value for the same key is still null > >> even > >>> though it has just been stored before. > >>> > >>> How can this be possible? > >>> > >>> > >>> > >>> I'm using transformValues in the following way: > >>> > >>> val storeName = "aggregateOverflow_binReportAgg" > >>> val store = Stores.keyValueStoreBuilder<K, > >>> V>(Stores.persistentKeyValueStore(storeName), serde.serde(), > >> serde.serde()) > >>> streamsBuilder.addStateStore(store) > >>> > >>> ... > >>> > >>> stream > >>> .flatTransformValues(ValueTransformerWithKeySupplier { > >>> AggregateOverflow(storeName, transformation) }, storeName) > >>> > >>> > >>> where AggregateOverflow gets the previous value from the state store, > >>> transforms the result into a AggregateOverflowResult. > >>> AggregateOverflowResult is a data class containing the current value > and > >> an > >>> optional overflow value like this: > >>> > >>> data class AggregateOverflowResult<V>(val current: V, val overflow: V?) > >>> > >>> When the overflow value is not null, it's sent downstream first after > the > >>> current value. In each case, the current result is stored in the > >> statestore > >>> for later retrieval like the following: > >>> > >>> class AggregateOverflow<K, V, VR : Any>( > >>> private val storeName: String, > >>> private val transformation: (K, V, VR?) -> > >> AggregateOverflowResult<VR>?) : > >>> ValueTransformerWithKey<K, V, Iterable<VR>> { > >>> private val logger = KotlinLogging.logger{} > >>> private lateinit var state: KeyValueStore<K, VR> > >>> > >>> init { > >>> logger.debug { "$storeName: created" } > >>> } > >>> > >>> override fun init(context: ProcessorContext) { > >>> logger.debug { "$storeName: init called" } > >>> this.state = context.getStateStore(storeName) as KeyValueStore<K, > VR>; > >>> } > >>> > >>> override fun transform(key: K, value: V): Iterable<VR> { > >>> val acc = state.get(key) > >>> if (acc == null) logger.debug { "$storeName: Found empty value for > >> $key" > >>> } > >>> val result = transformation(key, value, acc) > >>> state.put(key, result?.current) > >>> logger.trace { "$storeName: \n Key: $key\n Value: $value\n aggregate > >>> old: $acc\n aggregate new: $result" } > >>> return listOfNotNull(result?.overflow, result?.current) //prevAcc > will > >>> be forwarded first if not null > >>> } > >>> > >>> override fun close() { > >>> logger.debug { "$storeName: close called" } > >>> } > >>> } > >>> > >>> In the log file you can see that the first invocation is returning an > >> empty > >>> value for the given key, you can also see that the new value is being > >>> serialized in the store. > >>> At the second invocation a few seconds later, the value for the same > key > >> is > >>> still null. > >>> > >>> Any idea's why this is? > >>> Best regards > >>> Jan > >>> > >> > >> > > > >