Your code looks correct to me. If you write into the store, you should
also be able to read it back from the store.

Can you reproduce the issue using `TopologyTestDriver`? How many
partitions does your input topic have? Is your stream partitioned by
key? Note that `transfrom()` does not do auto-repartitioning in contrast
to `groupByKey()`.


-Matthias

On 3/25/20 3:49 AM, Jan Bols wrote:
> Hi all,
> I'm trying to aggregate a stream of messages and return a stream of
> aggregated results using kafka streams.
> At some point, depending on the incoming message, the old aggregate needs
> to be closed and a new aggregate needs to be created, just like a session
> that is closed due to some close event and at the same time a new session
> is started.
> 
> For this I'm using transformValues where I store the result of an
> aggregation similar to how a groupByKey().aggregate() is done. When the old
> session needs to be closed, it's sent first after the new value.
> 
> The state store returns null for a given key at first retrieval and the new
> aggregation result is stored under the same key.
> However, at the second pass, the value for the same key is still null even
> though it has just been stored before.
> 
> How can this be possible?
> 
> 
> 
> I'm using transformValues in the following way:
> 
> val storeName = "aggregateOverflow_binReportAgg"
> val store = Stores.keyValueStoreBuilder<K,
> V>(Stores.persistentKeyValueStore(storeName), serde.serde(), serde.serde())
> streamsBuilder.addStateStore(store)
> 
> ...
> 
> stream
>    .flatTransformValues(ValueTransformerWithKeySupplier {
> AggregateOverflow(storeName, transformation) }, storeName)
> 
> 
> where AggregateOverflow gets the previous value from the state store,
> transforms the result into a AggregateOverflowResult.
> AggregateOverflowResult is a data class containing the current value and an
> optional overflow value like this:
> 
> data class AggregateOverflowResult<V>(val current: V, val overflow: V?)
> 
> When the overflow value is not null, it's sent downstream first after the
> current value. In each case, the current result is stored in the statestore
> for later retrieval like the following:
> 
> class AggregateOverflow<K, V, VR : Any>(
>  private val storeName: String,
>  private val transformation: (K, V, VR?) -> AggregateOverflowResult<VR>?) :
> ValueTransformerWithKey<K, V, Iterable<VR>> {
>  private val logger = KotlinLogging.logger{}
>  private lateinit var state: KeyValueStore<K, VR>
> 
>  init {
>    logger.debug { "$storeName: created" }
>  }
> 
>  override fun init(context: ProcessorContext) {
>    logger.debug { "$storeName: init called" }
>    this.state = context.getStateStore(storeName) as KeyValueStore<K, VR>;
>  }
> 
>  override fun transform(key: K, value: V): Iterable<VR> {
>    val acc = state.get(key)
>    if (acc == null) logger.debug { "$storeName: Found empty value for $key"
> }
>    val result = transformation(key, value, acc)
>    state.put(key, result?.current)
>    logger.trace { "$storeName: \n Key: $key\n Value: $value\n aggregate
> old: $acc\n aggregate new: $result" }
>    return listOfNotNull(result?.overflow, result?.current) //prevAcc will
> be forwarded first if not null
>  }
> 
>  override fun close() {
>    logger.debug { "$storeName: close called" }
>  }
> }
> 
> In the log file you can see that the first invocation is returning an empty
> value for the given key, you can also see that the new value is being
> serialized in the store.
> At the second invocation a few seconds later, the value for the same key is
> still null.
> 
> Any idea's why this is?
> Best regards
> Jan
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to