That is correct. You need to return a new instance on each call.

I am not sure how to improve the docs though:

1) it's a supplier pattern (that should explain it)
2) also the `TransformerSupplier#get()` method JavaDoc says:

>     /**
>      * Return a new {@link Transformer} instance.
>      *
>      * @return a new {@link Transformer} instance
>      */
>     Transformer<K, V, R> get();

It the "new instance" not clear enough?

If you have any suggestion how to improve this, please let us know.


-Matthias

On 6/16/17 10:30 AM, Adrian McCague wrote:
> Just to follow up, after making the change to 
> 
> .transform(() -> new PhaseTransformer<>(evaluator, storeName), 
> transformer.getStoreName())
> 
> The problem appears to have gone away so I think my previous hypothesis was 
> correct? Please let me know if this should have made no difference. 
> May be worth adding more documentation around the purpose of the supplier as 
> I couldn't initially find much on the topic. 
> 
> Thank you for your insights and taking the time to look at our issue.
> 
> Thanks
> Adrian
> 
> -----Original Message-----
> From: Adrian McCague [mailto:adrian.mcca...@zopa.com] 
> Sent: 16 June 2017 10:30
> To: users@kafka.apache.org
> Subject: RE: IllegalStateException when putting to state store in Transformer 
> implementation
> 
> Hi Guozhang
> 
> It's just occurred to me that the transformer is added to the topology like 
> this:
> 
> PhaseTransformer<S, E> transformer = new PhaseTransformer<>(evaluator, 
> storeName); ...
> .transform(() -> transformer, transformer.getStoreName())
> 
> Thus meaning that the same transformer is used whenever the supplier is 
> invoked, I wonder if this could cause some odd interaction in this case. 
> Now that I think about it perhaps one instance of transformer is used per 
> partition handled?
> 
> Thanks
> Adrian
> 
> -----Original Message-----
> From: Guozhang Wang [mailto:wangg...@gmail.com]
> Sent: 16 June 2017 02:54
> To: users@kafka.apache.org
> Subject: Re: IllegalStateException when putting to state store in Transformer 
> implementation
> 
> Adrian,
> 
> I looked though the 0.10.2.1 code but I cannot nail down to any obvious 
> places where the processor context is set to null, which could trigger your 
> exception. Also from your stack trace there is no direct clues available.
> 
> Would you mind creating a JIRA and attach the link to your JSON and your "
> PhaseTransformer" implementation sketch, since what you observed may be a 
> real issue?
> 
> 
> Guozhang
> 
> 
> On Thu, Jun 15, 2017 at 1:42 AM, Adrian McCague <adrian.mcca...@zopa.com>
> wrote:
> 
>> Hi Guozhang, thanks for your reply
>>
>> I can confirm that the init method is quite basic:
>>
>> public void init(ProcessorContext context) {
>>     context.schedule(TIMEOUT.getMillis());
>>     this.context = context;
>>
>>     this.store = (KeyValueStore)this.context.getStateStore(storeName);
>> }
>>
>> Omitting try catch and logging.
>>
>> In case it's relevant, this is how the state store is created:
>>
>> StateStoreSupplier storeSupplier = Stores.create(storeName)
>>                 .withKeys(keyserde)
>>                 .withValues(valueserde)
>>                 .persistent()
>>                 .build();
>> builder.addStateStore(storeSupplier);
>>
>> I can confirm though that all stack traces we have clearly originate 
>> from `put()` being called from `transform()`
>>
>> Thanks
>> Adrian
>>
>> -----Original Message-----
>> From: Guozhang Wang [mailto:wangg...@gmail.com]
>> Sent: 14 June 2017 21:18
>> To: users@kafka.apache.org
>> Subject: Re: IllegalStateException when putting to state store in 
>> Transformer implementation
>>
>> Hello Adrian,
>>
>> When you call "put()" on the windowed state store that does not 
>> specify a timestamp, then the `timestamp()` is retrieved to use as the 
>> default timestamp.
>>
>> ----------------------
>>
>> public synchronized void put(final K key, final V value) {
>>     put(key, value, context.timestamp()); }
>>
>> ----------------------
>>
>> The question is when were you calling `put()` in the Transformer, did 
>> you ever call it in `init()` function?
>>
>>
>> Guozhang
>>
>>
>> On Wed, Jun 14, 2017 at 11:08 AM, Adrian McCague 
>> <adrian.mcca...@zopa.com>
>> wrote:
>>
>>> Hi All
>>>
>>> We have a transformer implementation in our Kafka Streams 
>>> application that raises this exception, sometimes, when starting.
>>>
>>> "java.lang.IllegalStateException: This should not happen as
>>> timestamp() should only be called while a record is processed"
>>>
>>> This happens when 'put' is called on a state store within the 
>>> `transform` method of a custom `Transformer`.
>>>
>>> The full trace can be seen here, apologies for the JSON formatting:
>>> https://pastebin.com/QYKE7bSH
>>>
>>>
>>>   *   We did not see this when the input and output topic of the topology
>>> had only a single partition.
>>>   *   We do not see this when the streams thread is handling only a
>> single
>>> partition of data. (ie 4 partitions, 4 consumers in the consumer group)
>>>   *   We see this when deploying the consumer group and the first
>>> consumers to connect are handling multiple partitions (assumed).
>>> Once all have started and each consumer is processing a single 
>>> partition each, the issue appears to go away.
>>>
>>> We are using Kafka Streams Client: 0.10.2.1
>>>
>>> Any suggestions would be welcome as for now I am assuming a 
>>> programming error.
>>> I can confirm that within our code, we never call `timestamp()` on 
>>> the context.
>>>
>>> Thanks
>>> Adrian
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
> 
> 
> 
> --
> -- Guozhang
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to