Your code looks correct to me. If you write into the store, you should also be able to read it back from the store.
Can you reproduce the issue using `TopologyTestDriver`? How many
partitions does your input topic have? Is your stream partitioned by
key? Note that `transfrom()` does not do auto-repartitioning in contrast
to `groupByKey()`.
-Matthias
On 3/25/20 3:49 AM, Jan Bols wrote:
> Hi all,
> I'm trying to aggregate a stream of messages and return a stream of
> aggregated results using kafka streams.
> At some point, depending on the incoming message, the old aggregate needs
> to be closed and a new aggregate needs to be created, just like a session
> that is closed due to some close event and at the same time a new session
> is started.
>
> For this I'm using transformValues where I store the result of an
> aggregation similar to how a groupByKey().aggregate() is done. When the old
> session needs to be closed, it's sent first after the new value.
>
> The state store returns null for a given key at first retrieval and the new
> aggregation result is stored under the same key.
> However, at the second pass, the value for the same key is still null even
> though it has just been stored before.
>
> How can this be possible?
>
>
>
> I'm using transformValues in the following way:
>
> val storeName = "aggregateOverflow_binReportAgg"
> val store = Stores.keyValueStoreBuilder<K,
> V>(Stores.persistentKeyValueStore(storeName), serde.serde(), serde.serde())
> streamsBuilder.addStateStore(store)
>
> ...
>
> stream
> .flatTransformValues(ValueTransformerWithKeySupplier {
> AggregateOverflow(storeName, transformation) }, storeName)
>
>
> where AggregateOverflow gets the previous value from the state store,
> transforms the result into a AggregateOverflowResult.
> AggregateOverflowResult is a data class containing the current value and an
> optional overflow value like this:
>
> data class AggregateOverflowResult<V>(val current: V, val overflow: V?)
>
> When the overflow value is not null, it's sent downstream first after the
> current value. In each case, the current result is stored in the statestore
> for later retrieval like the following:
>
> class AggregateOverflow<K, V, VR : Any>(
> private val storeName: String,
> private val transformation: (K, V, VR?) -> AggregateOverflowResult<VR>?) :
> ValueTransformerWithKey<K, V, Iterable<VR>> {
> private val logger = KotlinLogging.logger{}
> private lateinit var state: KeyValueStore<K, VR>
>
> init {
> logger.debug { "$storeName: created" }
> }
>
> override fun init(context: ProcessorContext) {
> logger.debug { "$storeName: init called" }
> this.state = context.getStateStore(storeName) as KeyValueStore<K, VR>;
> }
>
> override fun transform(key: K, value: V): Iterable<VR> {
> val acc = state.get(key)
> if (acc == null) logger.debug { "$storeName: Found empty value for $key"
> }
> val result = transformation(key, value, acc)
> state.put(key, result?.current)
> logger.trace { "$storeName: \n Key: $key\n Value: $value\n aggregate
> old: $acc\n aggregate new: $result" }
> return listOfNotNull(result?.overflow, result?.current) //prevAcc will
> be forwarded first if not null
> }
>
> override fun close() {
> logger.debug { "$storeName: close called" }
> }
> }
>
> In the log file you can see that the first invocation is returning an empty
> value for the given key, you can also see that the new value is being
> serialized in the store.
> At the second invocation a few seconds later, the value for the same key is
> still null.
>
> Any idea's why this is?
> Best regards
> Jan
>
signature.asc
Description: OpenPGP digital signature
