Ok, Matthias,
thanks for the clarification. This makes sense to me.
Glad I learned something new about kafka-streams. Even if it was the hard
way ;-)

Greetings
Jan

On Wed, Apr 1, 2020 at 11:52 PM Matthias J. Sax <mj...@apache.org> wrote:

> That is expected behavior.
>
> And yes, there is a `Transformer` instance per partition with it's own
> store that holds one shard of the overall state. The reason is, that you
> could run one KafkaStreams instance per partition on different
> hosts/servers and thus, we need to have a `Transformer` and state-store
> per partition.
>
> It's also by design that `transform()` does not do auto-repartitioning
> because it's Processor API integration, and when using the Processor API
> it's the developers responsibility to reason about correct data
> partitioning.
>
>
> -Matthias
>
> On 4/1/20 2:05 PM, Jan Bols wrote:
> > Ok, Matthias,
> >
> > thanks for the hint:
> > *Even if any upstream operation was key-changing, no auto-repartition is
> > triggered. If repartitioning is required, a call to through()
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#through-java.lang.String-
> >
> > should be performed before flatTransformValues(). *
> >
> > Of course, I didn't call *through* before calling the transformer. As a
> > result some calls where being processed by another instance of the
> > transformer running on a different partition. Calling *store.get(key)* on
> > an instance would then not return any value even though another instance
> > did a *store.put(key, value)* before.  Is this expected behaviour? Is
> there
> > a transformer for each partition and does it get its own state store?
> >
> > Best regards
> >
> > Jan
> >
> > On Fri, Mar 27, 2020 at 12:59 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> Your code looks correct to me. If you write into the store, you should
> >> also be able to read it back from the store.
> >>
> >> Can you reproduce the issue using `TopologyTestDriver`? How many
> >> partitions does your input topic have? Is your stream partitioned by
> >> key? Note that `transfrom()` does not do auto-repartitioning in contrast
> >> to `groupByKey()`.
> >>
> >>
> >> -Matthias
> >>
> >> On 3/25/20 3:49 AM, Jan Bols wrote:
> >>> Hi all,
> >>> I'm trying to aggregate a stream of messages and return a stream of
> >>> aggregated results using kafka streams.
> >>> At some point, depending on the incoming message, the old aggregate
> needs
> >>> to be closed and a new aggregate needs to be created, just like a
> session
> >>> that is closed due to some close event and at the same time a new
> session
> >>> is started.
> >>>
> >>> For this I'm using transformValues where I store the result of an
> >>> aggregation similar to how a groupByKey().aggregate() is done. When the
> >> old
> >>> session needs to be closed, it's sent first after the new value.
> >>>
> >>> The state store returns null for a given key at first retrieval and the
> >> new
> >>> aggregation result is stored under the same key.
> >>> However, at the second pass, the value for the same key is still null
> >> even
> >>> though it has just been stored before.
> >>>
> >>> How can this be possible?
> >>>
> >>>
> >>>
> >>> I'm using transformValues in the following way:
> >>>
> >>> val storeName = "aggregateOverflow_binReportAgg"
> >>> val store = Stores.keyValueStoreBuilder<K,
> >>> V>(Stores.persistentKeyValueStore(storeName), serde.serde(),
> >> serde.serde())
> >>> streamsBuilder.addStateStore(store)
> >>>
> >>> ...
> >>>
> >>> stream
> >>>    .flatTransformValues(ValueTransformerWithKeySupplier {
> >>> AggregateOverflow(storeName, transformation) }, storeName)
> >>>
> >>>
> >>> where AggregateOverflow gets the previous value from the state store,
> >>> transforms the result into a AggregateOverflowResult.
> >>> AggregateOverflowResult is a data class containing the current value
> and
> >> an
> >>> optional overflow value like this:
> >>>
> >>> data class AggregateOverflowResult<V>(val current: V, val overflow: V?)
> >>>
> >>> When the overflow value is not null, it's sent downstream first after
> the
> >>> current value. In each case, the current result is stored in the
> >> statestore
> >>> for later retrieval like the following:
> >>>
> >>> class AggregateOverflow<K, V, VR : Any>(
> >>>  private val storeName: String,
> >>>  private val transformation: (K, V, VR?) ->
> >> AggregateOverflowResult<VR>?) :
> >>> ValueTransformerWithKey<K, V, Iterable<VR>> {
> >>>  private val logger = KotlinLogging.logger{}
> >>>  private lateinit var state: KeyValueStore<K, VR>
> >>>
> >>>  init {
> >>>    logger.debug { "$storeName: created" }
> >>>  }
> >>>
> >>>  override fun init(context: ProcessorContext) {
> >>>    logger.debug { "$storeName: init called" }
> >>>    this.state = context.getStateStore(storeName) as KeyValueStore<K,
> VR>;
> >>>  }
> >>>
> >>>  override fun transform(key: K, value: V): Iterable<VR> {
> >>>    val acc = state.get(key)
> >>>    if (acc == null) logger.debug { "$storeName: Found empty value for
> >> $key"
> >>> }
> >>>    val result = transformation(key, value, acc)
> >>>    state.put(key, result?.current)
> >>>    logger.trace { "$storeName: \n Key: $key\n Value: $value\n aggregate
> >>> old: $acc\n aggregate new: $result" }
> >>>    return listOfNotNull(result?.overflow, result?.current) //prevAcc
> will
> >>> be forwarded first if not null
> >>>  }
> >>>
> >>>  override fun close() {
> >>>    logger.debug { "$storeName: close called" }
> >>>  }
> >>> }
> >>>
> >>> In the log file you can see that the first invocation is returning an
> >> empty
> >>> value for the given key, you can also see that the new value is being
> >>> serialized in the store.
> >>> At the second invocation a few seconds later, the value for the same
> key
> >> is
> >>> still null.
> >>>
> >>> Any idea's why this is?
> >>> Best regards
> >>> Jan
> >>>
> >>
> >>
> >
>
>

Reply via email to