That is correct. You need to return a new instance on each call. I am not sure how to improve the docs though:
1) it's a supplier pattern (that should explain it) 2) also the `TransformerSupplier#get()` method JavaDoc says: > /** > * Return a new {@link Transformer} instance. > * > * @return a new {@link Transformer} instance > */ > Transformer<K, V, R> get(); It the "new instance" not clear enough? If you have any suggestion how to improve this, please let us know. -Matthias On 6/16/17 10:30 AM, Adrian McCague wrote: > Just to follow up, after making the change to > > .transform(() -> new PhaseTransformer<>(evaluator, storeName), > transformer.getStoreName()) > > The problem appears to have gone away so I think my previous hypothesis was > correct? Please let me know if this should have made no difference. > May be worth adding more documentation around the purpose of the supplier as > I couldn't initially find much on the topic. > > Thank you for your insights and taking the time to look at our issue. > > Thanks > Adrian > > -----Original Message----- > From: Adrian McCague [mailto:adrian.mcca...@zopa.com] > Sent: 16 June 2017 10:30 > To: users@kafka.apache.org > Subject: RE: IllegalStateException when putting to state store in Transformer > implementation > > Hi Guozhang > > It's just occurred to me that the transformer is added to the topology like > this: > > PhaseTransformer<S, E> transformer = new PhaseTransformer<>(evaluator, > storeName); ... > .transform(() -> transformer, transformer.getStoreName()) > > Thus meaning that the same transformer is used whenever the supplier is > invoked, I wonder if this could cause some odd interaction in this case. > Now that I think about it perhaps one instance of transformer is used per > partition handled? > > Thanks > Adrian > > -----Original Message----- > From: Guozhang Wang [mailto:wangg...@gmail.com] > Sent: 16 June 2017 02:54 > To: users@kafka.apache.org > Subject: Re: IllegalStateException when putting to state store in Transformer > implementation > > Adrian, > > I looked though the 0.10.2.1 code but I cannot nail down to any obvious > places where the processor context is set to null, which could trigger your > exception. Also from your stack trace there is no direct clues available. > > Would you mind creating a JIRA and attach the link to your JSON and your " > PhaseTransformer" implementation sketch, since what you observed may be a > real issue? > > > Guozhang > > > On Thu, Jun 15, 2017 at 1:42 AM, Adrian McCague <adrian.mcca...@zopa.com> > wrote: > >> Hi Guozhang, thanks for your reply >> >> I can confirm that the init method is quite basic: >> >> public void init(ProcessorContext context) { >> context.schedule(TIMEOUT.getMillis()); >> this.context = context; >> >> this.store = (KeyValueStore)this.context.getStateStore(storeName); >> } >> >> Omitting try catch and logging. >> >> In case it's relevant, this is how the state store is created: >> >> StateStoreSupplier storeSupplier = Stores.create(storeName) >> .withKeys(keyserde) >> .withValues(valueserde) >> .persistent() >> .build(); >> builder.addStateStore(storeSupplier); >> >> I can confirm though that all stack traces we have clearly originate >> from `put()` being called from `transform()` >> >> Thanks >> Adrian >> >> -----Original Message----- >> From: Guozhang Wang [mailto:wangg...@gmail.com] >> Sent: 14 June 2017 21:18 >> To: users@kafka.apache.org >> Subject: Re: IllegalStateException when putting to state store in >> Transformer implementation >> >> Hello Adrian, >> >> When you call "put()" on the windowed state store that does not >> specify a timestamp, then the `timestamp()` is retrieved to use as the >> default timestamp. >> >> ---------------------- >> >> public synchronized void put(final K key, final V value) { >> put(key, value, context.timestamp()); } >> >> ---------------------- >> >> The question is when were you calling `put()` in the Transformer, did >> you ever call it in `init()` function? >> >> >> Guozhang >> >> >> On Wed, Jun 14, 2017 at 11:08 AM, Adrian McCague >> <adrian.mcca...@zopa.com> >> wrote: >> >>> Hi All >>> >>> We have a transformer implementation in our Kafka Streams >>> application that raises this exception, sometimes, when starting. >>> >>> "java.lang.IllegalStateException: This should not happen as >>> timestamp() should only be called while a record is processed" >>> >>> This happens when 'put' is called on a state store within the >>> `transform` method of a custom `Transformer`. >>> >>> The full trace can be seen here, apologies for the JSON formatting: >>> https://pastebin.com/QYKE7bSH >>> >>> >>> * We did not see this when the input and output topic of the topology >>> had only a single partition. >>> * We do not see this when the streams thread is handling only a >> single >>> partition of data. (ie 4 partitions, 4 consumers in the consumer group) >>> * We see this when deploying the consumer group and the first >>> consumers to connect are handling multiple partitions (assumed). >>> Once all have started and each consumer is processing a single >>> partition each, the issue appears to go away. >>> >>> We are using Kafka Streams Client: 0.10.2.1 >>> >>> Any suggestions would be welcome as for now I am assuming a >>> programming error. >>> I can confirm that within our code, we never call `timestamp()` on >>> the context. >>> >>> Thanks >>> Adrian >>> >> >> >> >> -- >> -- Guozhang >> > > > > -- > -- Guozhang >
signature.asc
Description: OpenPGP digital signature