Just to follow up, after making the change to 

.transform(() -> new PhaseTransformer<>(evaluator, storeName), 
transformer.getStoreName())

The problem appears to have gone away so I think my previous hypothesis was 
correct? Please let me know if this should have made no difference. 
May be worth adding more documentation around the purpose of the supplier as I 
couldn't initially find much on the topic. 

Thank you for your insights and taking the time to look at our issue.

Thanks
Adrian

-----Original Message-----
From: Adrian McCague [mailto:adrian.mcca...@zopa.com] 
Sent: 16 June 2017 10:30
To: users@kafka.apache.org
Subject: RE: IllegalStateException when putting to state store in Transformer 
implementation

Hi Guozhang

It's just occurred to me that the transformer is added to the topology like 
this:

PhaseTransformer<S, E> transformer = new PhaseTransformer<>(evaluator, 
storeName); ...
.transform(() -> transformer, transformer.getStoreName())

Thus meaning that the same transformer is used whenever the supplier is 
invoked, I wonder if this could cause some odd interaction in this case. 
Now that I think about it perhaps one instance of transformer is used per 
partition handled?

Thanks
Adrian

-----Original Message-----
From: Guozhang Wang [mailto:wangg...@gmail.com]
Sent: 16 June 2017 02:54
To: users@kafka.apache.org
Subject: Re: IllegalStateException when putting to state store in Transformer 
implementation

Adrian,

I looked though the 0.10.2.1 code but I cannot nail down to any obvious places 
where the processor context is set to null, which could trigger your exception. 
Also from your stack trace there is no direct clues available.

Would you mind creating a JIRA and attach the link to your JSON and your "
PhaseTransformer" implementation sketch, since what you observed may be a real 
issue?


Guozhang


On Thu, Jun 15, 2017 at 1:42 AM, Adrian McCague <adrian.mcca...@zopa.com>
wrote:

> Hi Guozhang, thanks for your reply
>
> I can confirm that the init method is quite basic:
>
> public void init(ProcessorContext context) {
>     context.schedule(TIMEOUT.getMillis());
>     this.context = context;
>
>     this.store = (KeyValueStore)this.context.getStateStore(storeName);
> }
>
> Omitting try catch and logging.
>
> In case it's relevant, this is how the state store is created:
>
> StateStoreSupplier storeSupplier = Stores.create(storeName)
>                 .withKeys(keyserde)
>                 .withValues(valueserde)
>                 .persistent()
>                 .build();
> builder.addStateStore(storeSupplier);
>
> I can confirm though that all stack traces we have clearly originate 
> from `put()` being called from `transform()`
>
> Thanks
> Adrian
>
> -----Original Message-----
> From: Guozhang Wang [mailto:wangg...@gmail.com]
> Sent: 14 June 2017 21:18
> To: users@kafka.apache.org
> Subject: Re: IllegalStateException when putting to state store in 
> Transformer implementation
>
> Hello Adrian,
>
> When you call "put()" on the windowed state store that does not 
> specify a timestamp, then the `timestamp()` is retrieved to use as the 
> default timestamp.
>
> ----------------------
>
> public synchronized void put(final K key, final V value) {
>     put(key, value, context.timestamp()); }
>
> ----------------------
>
> The question is when were you calling `put()` in the Transformer, did 
> you ever call it in `init()` function?
>
>
> Guozhang
>
>
> On Wed, Jun 14, 2017 at 11:08 AM, Adrian McCague 
> <adrian.mcca...@zopa.com>
> wrote:
>
> > Hi All
> >
> > We have a transformer implementation in our Kafka Streams 
> > application that raises this exception, sometimes, when starting.
> >
> > "java.lang.IllegalStateException: This should not happen as
> > timestamp() should only be called while a record is processed"
> >
> > This happens when 'put' is called on a state store within the 
> > `transform` method of a custom `Transformer`.
> >
> > The full trace can be seen here, apologies for the JSON formatting:
> > https://pastebin.com/QYKE7bSH
> >
> >
> >   *   We did not see this when the input and output topic of the topology
> > had only a single partition.
> >   *   We do not see this when the streams thread is handling only a
> single
> > partition of data. (ie 4 partitions, 4 consumers in the consumer group)
> >   *   We see this when deploying the consumer group and the first
> > consumers to connect are handling multiple partitions (assumed).
> > Once all have started and each consumer is processing a single 
> > partition each, the issue appears to go away.
> >
> > We are using Kafka Streams Client: 0.10.2.1
> >
> > Any suggestions would be welcome as for now I am assuming a 
> > programming error.
> > I can confirm that within our code, we never call `timestamp()` on 
> > the context.
> >
> > Thanks
> > Adrian
> >
>
>
>
> --
> -- Guozhang
>



--
-- Guozhang

Reply via email to