Guozhang, Matthias

Thanks for confirming, it was quite clear with current understanding that there 
was a mistake in the supplier implementation when seeing it.

The Javadoc is indeed clear what to do, this was ultimately a failing to read 
that over the documentation provided on confluent / kafka.apache.org
If I have any suggestion for improvement it would be a one liner in the 
non-javadoc documentation explaining the purpose of a new instance of 
ProcessorSupplier / TransformerSupplier (there is no reference there at all as 
it stands) - but it is clear on the Javadoc.

Thanks for the assistance
Adrian

-----Original Message-----
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: 16 June 2017 20:35
To: users@kafka.apache.org
Subject: Re: IllegalStateException when putting to state store in Transformer 
implementation

Adrian,

I see. That would explain what you see, i.e. all tasks with their own 
"processor context" are accessing the same state store instance; hence for some 
task its processor context may not be updated yet while another task is 
accessing that state store, hence causing the issue.

If you are with Java8 already, it is recommended to code with

.transform(() -> new PhaseTransformer<>(..))


Guozhang


On Fri, Jun 16, 2017 at 2:29 AM, Adrian McCague <adrian.mcca...@zopa.com>
wrote:

> Hi Guozhang
>
> It's just occurred to me that the transformer is added to the topology 
> like this:
>
> PhaseTransformer<S, E> transformer = new PhaseTransformer<>(evaluator, 
> storeName); ...
> .transform(() -> transformer, transformer.getStoreName())
>
> Thus meaning that the same transformer is used whenever the supplier 
> is invoked, I wonder if this could cause some odd interaction in this case.
> Now that I think about it perhaps one instance of transformer is used 
> per partition handled?
>
> Thanks
> Adrian
>
> -----Original Message-----
> From: Guozhang Wang [mailto:wangg...@gmail.com]
> Sent: 16 June 2017 02:54
> To: users@kafka.apache.org
> Subject: Re: IllegalStateException when putting to state store in 
> Transformer implementation
>
> Adrian,
>
> I looked though the 0.10.2.1 code but I cannot nail down to any 
> obvious places where the processor context is set to null, which could 
> trigger your exception. Also from your stack trace there is no direct clues 
> available.
>
> Would you mind creating a JIRA and attach the link to your JSON and your "
> PhaseTransformer" implementation sketch, since what you observed may 
> be a real issue?
>
>
> Guozhang
>
>
> On Thu, Jun 15, 2017 at 1:42 AM, Adrian McCague 
> <adrian.mcca...@zopa.com>
> wrote:
>
> > Hi Guozhang, thanks for your reply
> >
> > I can confirm that the init method is quite basic:
> >
> > public void init(ProcessorContext context) {
> >     context.schedule(TIMEOUT.getMillis());
> >     this.context = context;
> >
> >     this.store = 
> > (KeyValueStore)this.context.getStateStore(storeName);
> > }
> >
> > Omitting try catch and logging.
> >
> > In case it's relevant, this is how the state store is created:
> >
> > StateStoreSupplier storeSupplier = Stores.create(storeName)
> >                 .withKeys(keyserde)
> >                 .withValues(valueserde)
> >                 .persistent()
> >                 .build();
> > builder.addStateStore(storeSupplier);
> >
> > I can confirm though that all stack traces we have clearly originate 
> > from `put()` being called from `transform()`
> >
> > Thanks
> > Adrian
> >
> > -----Original Message-----
> > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > Sent: 14 June 2017 21:18
> > To: users@kafka.apache.org
> > Subject: Re: IllegalStateException when putting to state store in 
> > Transformer implementation
> >
> > Hello Adrian,
> >
> > When you call "put()" on the windowed state store that does not 
> > specify a timestamp, then the `timestamp()` is retrieved to use as 
> > the default timestamp.
> >
> > ----------------------
> >
> > public synchronized void put(final K key, final V value) {
> >     put(key, value, context.timestamp()); }
> >
> > ----------------------
> >
> > The question is when were you calling `put()` in the Transformer, 
> > did you ever call it in `init()` function?
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jun 14, 2017 at 11:08 AM, Adrian McCague 
> > <adrian.mcca...@zopa.com>
> > wrote:
> >
> > > Hi All
> > >
> > > We have a transformer implementation in our Kafka Streams 
> > > application that raises this exception, sometimes, when starting.
> > >
> > > "java.lang.IllegalStateException: This should not happen as
> > > timestamp() should only be called while a record is processed"
> > >
> > > This happens when 'put' is called on a state store within the 
> > > `transform` method of a custom `Transformer`.
> > >
> > > The full trace can be seen here, apologies for the JSON formatting:
> > > https://pastebin.com/QYKE7bSH
> > >
> > >
> > >   *   We did not see this when the input and output topic of the
> topology
> > > had only a single partition.
> > >   *   We do not see this when the streams thread is handling only a
> > single
> > > partition of data. (ie 4 partitions, 4 consumers in the consumer group)
> > >   *   We see this when deploying the consumer group and the first
> > > consumers to connect are handling multiple partitions (assumed).
> > > Once all have started and each consumer is processing a single 
> > > partition each, the issue appears to go away.
> > >
> > > We are using Kafka Streams Client: 0.10.2.1
> > >
> > > Any suggestions would be welcome as for now I am assuming a 
> > > programming error.
> > > I can confirm that within our code, we never call `timestamp()` on 
> > > the context.
> > >
> > > Thanks
> > > Adrian
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>



--
-- Guozhang

Reply via email to