Hi Bill,
 
I was on 0.10.2.1.

Thank you
Rainer



Sent: Monday, November 13, 2017 at 4:02 PM
From: "Bill Bejeck" <b...@confluent.io>
To: users@kafka.apache.org
Subject: Re: Kafka Streams - Custom processor "init" method called before state 
store has data restored into it
Rainer,

Thanks for the info.

What version were you using before upgrading to 1.0.0?

Thanks,
Bill

On Sun, Nov 12, 2017 at 7:06 PM, Rainer Guessner <raguess...@gmx.com> wrote:

> Hi Bill,
>
> thanks for the suggestion towards StateRestoreListener, however that does
> not solve my issue as its a global listener and doesn't help the processor
> itself.
>
> Please find the simple one-class code for reproducing the issue below.
> Please create two topics:
> kafka-topics -zookeeper localhost:2181 kafka-topics.cmd -create -topic
> sampleapp-input -partitions 1 -replication-factor 1 --config
> cleanup.policy=delete
> kafka-topics -zookeeper localhost:2181 kafka-topics.cmd -create -topic
> sampleapp-mystatestore-changelog -partitions 1 -replication-factor 1
> --config cleanup.policy=compact
>
> Please run "testStepOne", delete the KStreams state, next run
> "testStepTwo".
>
> I have included log output after the code.
>
> Thank you.
> Rainer
>
>
> <code>
> public class KStreamsTester extends TestCase {
> private final static Logger log = LoggerFactory.getLogger(
> KStreamsTester.class);
>
> private final static String STATESTORE_NAME = "mystatestore";
>
> public void testStepOne() throws Exception {
> log.info("Starting step one");
> KafkaStreams streams = getKStreams();
> Thread.sleep(45000);
> streams.close();
> }
>
> /**
> * Delete state store between running step one and step two.
> */
>
> public void testStepTwo() throws Exception {
> log.info("Starting step two");
> KafkaStreams streams = getKStreams();
> Thread.sleep(30000);
> }
>
> public KafkaStreams getKStreams() {
> Topology builder = new Topology();
> Properties props = new Properties();
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092
> ");
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sampleapp");
> StreamsConfig config = new StreamsConfig(props);
>
> MyProcessorSupplier supplier = new MyProcessorSupplier();
> builder.addSource("source", new StringDeserializer(), new
> StringDeserializer(), "sampleapp-input");
> builder.addProcessor("processor", supplier, "source");
> builder.addStateStore(new MyStateStoreBuilder(), "processor");
>
> KafkaStreams streaming = new KafkaStreams(builder, config);
> streaming.start();
> return streaming;
> }
>
> public static class MyStateStoreBuilder implements StoreBuilder {
> public StoreBuilder withCachingEnabled() {
> return this;
> }
>
> public StoreBuilder withLoggingEnabled(Map map) {
> return this;
> }
>
> public StoreBuilder withLoggingDisabled() {
> return this;
> }
>
> public StateStore build() {
> return new MyStateStore();
> }
>
> public Map<String, String> logConfig() {
> return Collections.emptyMap();
> }
>
> public boolean loggingEnabled() {
> return true;
> }
>
> public String name() {
> return STATESTORE_NAME;
> }
> }
>
> public static class MyStateStore implements StateStore {
> private final static Logger log = LoggerFactory.getLogger(
> MyStateStore.class);
>
> private boolean open;
> private ProcessorContext processorContext;
>
> public String name() {
> return STATESTORE_NAME;
> }
>
> public void init(ProcessorContext processorContext, StateStore
> stateStore) {
> log.info(".init");
> this.processorContext = processorContext;
> open = true;
> processorContext.register(stateStore, true, new
> AbstractNotifyingRestoreCallback() {
> public void onRestoreStart(TopicPartition topicPartition,
> String storeName, long startingOffset, long endingOffset) {
> log.info(".onRestoreStart");
> }
>
> public void onBatchRestored(TopicPartition topicPartition,
> String storeName, long batchEndOffset, long numRestored) {
> log.info(".onBatchRestored");
> }
>
> public void onRestoreEnd(TopicPartition topicPartition,
> String storeName, long totalRestored) {
> log.info(".onRestoreEnd");
> }
>
> public void restore(byte[] bytes, byte[] bytes1) {
> log.info(".restore");
> }
> });
> }
>
> public void flush() {
> log.info(".flush");
> RecordCollector collector = ((RecordCollector.Supplier)
> processorContext).recordCollector();
> int partition = processorContext.taskId().partition;
> String topic = ProcessorStateManager.storeChangelogTopic(
> processorContext.applicationId(), STATESTORE_NAME);
> collector.send(topic, "ABC".getBytes(), "DEF".getBytes(),
> partition, System.currentTimeMillis(), new ByteArraySerializer(), new
> ByteArraySerializer());
> }
>
> public void close() {
> log.info(".close");
> open = false;
> }
>
> public boolean persistent() {
> return true;
> }
>
> public boolean isOpen() {
> return open;
> }
> }
>
> public static class MyProcessorSupplier implements ProcessorSupplier {
> public Processor get() {
> return new MyProcessor();
> }
> }
>
> public static class MyProcessor extends AbstractProcessor<Object,
> Object> {
> private final static Logger log = LoggerFactory.getLogger(
> MyProcessor.class);
>
> public void init(ProcessorContext context) {
> super.init(context);
> log.info(".init");
> }
>
> public void process(Object o, Object o2) {
> log.info(".process");
> }
> }
> }
> </code>
>
>
> 19:04:36,668 INFO [KStreamsTester$MyStateStore] .init
> 19:04:36,668 INFO [KStreamsTester$MyProcessor] .init
> 19:04:36,690 INFO [KStreamsTester$MyStateStore] .onRestoreStart
> 19:04:36,706 INFO [KStreamsTester$MyStateStore] .restore
> 19:04:36,706 INFO [KStreamsTester$MyStateStore] .restore
> 19:04:36,706 INFO [KStreamsTester$MyStateStore] .restore
> 19:04:36,706 INFO [KStreamsTester$MyStateStore] .onBatchRestored
> 19:04:36,706 INFO [KStreamsTester$MyStateStore] .onRestoreEnd
> 19:04:36,706 INFO [StreamThread] stream-thread
> [sampleapp-07ac897b-7bfe-40c6-84c7-fb1f7fbd1b24-StreamThread-1] State
> transition from PARTITIONS_ASSIGNED to RUNNING
> 19:04:36,706 INFO [KafkaStreams] stream-client
> [sampleapp-07ac897b-7bfe-40c6-84c7-fb1f7fbd1b24]State transition from
> REBALANCING to RUNNING
> 19:04:36,706 INFO [KStreamsTester$MyStateStore] .flush
>
>
> =============
>
> Sent: Sunday, November 12, 2017 at 1:01 PM
> From: "Bill Bejeck" <b...@confluent.io>
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams - Custom processor "init" method called before
> state store has data restored into it
> Rainer,
>
> Thanks for sharing the logs.
>
> With respect to option "c" in you orginal email, given that you are using
> a custom processor and state store would setting a StateRestoreListener in
> the custom store suit your needs?
>
> Would you be comfortable sharing your code so I can see if there is an
> acceptable alternative I can workout for you?
>
> Thanks,
> Bill
>
>
>
>
> On Thu, Nov 9, 2017 at 2:26 PM, Rainer Guessner <raguess...@gmx.com>
> wrote:
>
> > I have a few logs below.
> >
> > Thank you.
> > Rainer
> >
> > 14:21:04,304 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Creating restore
> > consumer client
> > 14:21:04,393 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Creating shared
> > producer client
> > 14:21:04,429 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Creating consumer
> > client
> > 14:21:04,620 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Starting
> > 14:21:04,622 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition
> > from CREATED to RUNNING
> > 14:21:04,639 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition
> > from RUNNING to PARTITIONS_REVOKED
> > 14:21:04,639 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] partition
> revocation
> > took 0 ms.
> > suspended active tasks: []
> > suspended standby tasks: []
> > 14:21:04,748 INFO [RestApplication] Adding listener: http://0.0.0.0:8082
> > 14:21:04,768 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition
> > from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > 14:21:04,783 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] partition
> assignment
> > took 15 ms.
> > current active tasks: [0_0]
> > current standby tasks: []
> > previous active tasks: []
> >
> > STATESTORE INIT
> > 14:21:04,873 INFO ... our stuff here
> > PROCESSOR INIT
> > 14:21:05,637 INFO ... our stuff here
> > RESTORE CALLED
> > ...
> > RESTORE CALLED
> > 14:21:05,680 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery-
> > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition
> > from PARTITIONS_ASSIGNED to RUNNING
> >
> > ==================
> > Sent: Thursday, November 09, 2017 at 1:43 PM
> > From: "Bill Bejeck" <b...@confluent.io>
> > To: users@kafka.apache.org
> > Subject: Re: Kafka Streams - Custom processor "init" method called before
> > state store has data restored into it
> > Hi Rainer,
> >
> > Thanks for reporting this issue. Do you have any log data you can share?
> >
> > In the meantime, I'll look into the issue.
> >
> > Thanks,
> > Bill
> >
> > On Thu, Nov 9, 2017 at 1:23 PM, Rainer Guessner <raguess...@gmx.com>
> > wrote:
> >
> > > I have a custom processor that implements AbstractProcessor and a
> custom
> > > store that implements StateStore.
> > >
> > > Before Kafka 1.0.0 the processors "init" method gets called after the
> > > state store is restored from changelog and that is good.
> > > With Kafka 1.0.0 the processors "init" method is called BEFORE the
> state
> > > store is restored from changelog and that is bad.
> > >
> > > My processor can only initialize when it has access to the state.
> However
> > > at the time KStreams calls "init" on the processor the state store may
> > not
> > > have any data. It is not an option for me to initialize the processor
> > > lazily when a record arrives, or to re-initialize it when
> "onRestoreEnd"
> > is
> > > called (its only called on restore; The state store "init" gets called
> > > before processor "init" regardless of restore or not.)
> > >
> > > I think I need to have either of these:
> > > a) know whether or not a state restore will take place and when not
> > > b) or get a call to the state store regardless of whether state restore
> > > took place or not
> > > c) or I need a "ready" method on the processor that gets called when
> the
> > > state store has completed restoring and is actually usable
> > >
> > > Please help, thank you in advance.
> > > Rainer
> > >
> >
>

Reply via email to