Adrian,

I see. That would explain what you see, i.e. all tasks with their own
"processor context" are accessing the same state store instance; hence for
some task its processor context may not be updated yet while another task
is accessing that state store, hence causing the issue.

If you are with Java8 already, it is recommended to code with

.transform(() -> new PhaseTransformer<>(..))


Guozhang


On Fri, Jun 16, 2017 at 2:29 AM, Adrian McCague <adrian.mcca...@zopa.com>
wrote:

> Hi Guozhang
>
> It's just occurred to me that the transformer is added to the topology
> like this:
>
> PhaseTransformer<S, E> transformer = new PhaseTransformer<>(evaluator,
> storeName);
> ...
> .transform(() -> transformer, transformer.getStoreName())
>
> Thus meaning that the same transformer is used whenever the supplier is
> invoked, I wonder if this could cause some odd interaction in this case.
> Now that I think about it perhaps one instance of transformer is used per
> partition handled?
>
> Thanks
> Adrian
>
> -----Original Message-----
> From: Guozhang Wang [mailto:wangg...@gmail.com]
> Sent: 16 June 2017 02:54
> To: users@kafka.apache.org
> Subject: Re: IllegalStateException when putting to state store in
> Transformer implementation
>
> Adrian,
>
> I looked though the 0.10.2.1 code but I cannot nail down to any obvious
> places where the processor context is set to null, which could trigger your
> exception. Also from your stack trace there is no direct clues available.
>
> Would you mind creating a JIRA and attach the link to your JSON and your "
> PhaseTransformer" implementation sketch, since what you observed may be a
> real issue?
>
>
> Guozhang
>
>
> On Thu, Jun 15, 2017 at 1:42 AM, Adrian McCague <adrian.mcca...@zopa.com>
> wrote:
>
> > Hi Guozhang, thanks for your reply
> >
> > I can confirm that the init method is quite basic:
> >
> > public void init(ProcessorContext context) {
> >     context.schedule(TIMEOUT.getMillis());
> >     this.context = context;
> >
> >     this.store = (KeyValueStore)this.context.getStateStore(storeName);
> > }
> >
> > Omitting try catch and logging.
> >
> > In case it's relevant, this is how the state store is created:
> >
> > StateStoreSupplier storeSupplier = Stores.create(storeName)
> >                 .withKeys(keyserde)
> >                 .withValues(valueserde)
> >                 .persistent()
> >                 .build();
> > builder.addStateStore(storeSupplier);
> >
> > I can confirm though that all stack traces we have clearly originate
> > from `put()` being called from `transform()`
> >
> > Thanks
> > Adrian
> >
> > -----Original Message-----
> > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > Sent: 14 June 2017 21:18
> > To: users@kafka.apache.org
> > Subject: Re: IllegalStateException when putting to state store in
> > Transformer implementation
> >
> > Hello Adrian,
> >
> > When you call "put()" on the windowed state store that does not
> > specify a timestamp, then the `timestamp()` is retrieved to use as the
> > default timestamp.
> >
> > ----------------------
> >
> > public synchronized void put(final K key, final V value) {
> >     put(key, value, context.timestamp()); }
> >
> > ----------------------
> >
> > The question is when were you calling `put()` in the Transformer, did
> > you ever call it in `init()` function?
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jun 14, 2017 at 11:08 AM, Adrian McCague
> > <adrian.mcca...@zopa.com>
> > wrote:
> >
> > > Hi All
> > >
> > > We have a transformer implementation in our Kafka Streams
> > > application that raises this exception, sometimes, when starting.
> > >
> > > "java.lang.IllegalStateException: This should not happen as
> > > timestamp() should only be called while a record is processed"
> > >
> > > This happens when 'put' is called on a state store within the
> > > `transform` method of a custom `Transformer`.
> > >
> > > The full trace can be seen here, apologies for the JSON formatting:
> > > https://pastebin.com/QYKE7bSH
> > >
> > >
> > >   *   We did not see this when the input and output topic of the
> topology
> > > had only a single partition.
> > >   *   We do not see this when the streams thread is handling only a
> > single
> > > partition of data. (ie 4 partitions, 4 consumers in the consumer group)
> > >   *   We see this when deploying the consumer group and the first
> > > consumers to connect are handling multiple partitions (assumed).
> > > Once all have started and each consumer is processing a single
> > > partition each, the issue appears to go away.
> > >
> > > We are using Kafka Streams Client: 0.10.2.1
> > >
> > > Any suggestions would be welcome as for now I am assuming a
> > > programming error.
> > > I can confirm that within our code, we never call `timestamp()` on
> > > the context.
> > >
> > > Thanks
> > > Adrian
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to