Rainer,

Thanks for sharing the logs.

With respect to option "c"  in you orginal email, given that you are using
a custom processor and state store would setting a StateRestoreListener in
the custom store suit your needs?

Would you be comfortable sharing your code so I can see if there is an
acceptable alternative I can workout for you?

Thanks,
Bill




On Thu, Nov 9, 2017 at 2:26 PM, Rainer Guessner <raguess...@gmx.com> wrote:

> I have a few logs below.
>
> Thank you.
> Rainer
>
> 14:21:04,304 INFO  [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Creating restore
> consumer client
> 14:21:04,393 INFO  [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Creating shared
> producer client
> 14:21:04,429 INFO  [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Creating consumer
> client
> 14:21:04,620 INFO  [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Starting
> 14:21:04,622 INFO  [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition
> from CREATED to RUNNING
> 14:21:04,639 INFO  [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition
> from RUNNING to PARTITIONS_REVOKED
> 14:21:04,639 INFO  [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] partition revocation
> took 0 ms.
>  suspended active tasks: []
>  suspended standby tasks: []
> 14:21:04,748 INFO  [RestApplication] Adding listener: http://0.0.0.0:8082
> 14:21:04,768 INFO  [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition
> from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> 14:21:04,783 INFO  [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] partition assignment
> took 15 ms.
>  current active tasks: [0_0]
>  current standby tasks: []
>  previous active tasks: []
>
> STATESTORE INIT
> 14:21:04,873 INFO  ... our stuff here
> PROCESSOR INIT
> 14:21:05,637 INFO  ... our stuff here
> RESTORE CALLED
> ...
> RESTORE CALLED
> 14:21:05,680 INFO  [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition
> from PARTITIONS_ASSIGNED to RUNNING
>
> ==================
> Sent: Thursday, November 09, 2017 at 1:43 PM
> From: "Bill Bejeck" <b...@confluent.io>
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams - Custom processor "init" method called before
> state store has data restored into it
> Hi Rainer,
>
> Thanks for reporting this issue. Do you have any log data you can share?
>
> In the meantime, I'll look into the issue.
>
> Thanks,
> Bill
>
> On Thu, Nov 9, 2017 at 1:23 PM, Rainer Guessner <raguess...@gmx.com>
> wrote:
>
> > I have a custom processor that implements AbstractProcessor and a custom
> > store that implements StateStore.
> >
> > Before Kafka 1.0.0 the processors "init" method gets called after the
> > state store is restored from changelog and that is good.
> > With Kafka 1.0.0 the processors "init" method is called BEFORE the state
> > store is restored from changelog and that is bad.
> >
> > My processor can only initialize when it has access to the state. However
> > at the time KStreams calls "init" on the processor the state store may
> not
> > have any data. It is not an option for me to initialize the processor
> > lazily when a record arrives, or to re-initialize it when "onRestoreEnd"
> is
> > called (its only called on restore; The state store "init" gets called
> > before processor "init" regardless of restore or not.)
> >
> > I think I need to have either of these:
> > a) know whether or not a state restore will take place and when not
> > b) or get a call to the state store regardless of whether state restore
> > took place or not
> > c) or I need a "ready" method on the processor that gets called when the
> > state store has completed restoring and is actually usable
> >
> > Please help, thank you in advance.
> > Rainer
> >
>

Reply via email to