Rainer, Thanks for the info.
What version were you using before upgrading to 1.0.0? Thanks, Bill On Sun, Nov 12, 2017 at 7:06 PM, Rainer Guessner <raguess...@gmx.com> wrote: > Hi Bill, > > thanks for the suggestion towards StateRestoreListener, however that does > not solve my issue as its a global listener and doesn't help the processor > itself. > > Please find the simple one-class code for reproducing the issue below. > Please create two topics: > kafka-topics -zookeeper localhost:2181 kafka-topics.cmd -create -topic > sampleapp-input -partitions 1 -replication-factor 1 --config > cleanup.policy=delete > kafka-topics -zookeeper localhost:2181 kafka-topics.cmd -create -topic > sampleapp-mystatestore-changelog -partitions 1 -replication-factor 1 > --config cleanup.policy=compact > > Please run "testStepOne", delete the KStreams state, next run > "testStepTwo". > > I have included log output after the code. > > Thank you. > Rainer > > > <code> > public class KStreamsTester extends TestCase { > private final static Logger log = LoggerFactory.getLogger( > KStreamsTester.class); > > private final static String STATESTORE_NAME = "mystatestore"; > > public void testStepOne() throws Exception { > log.info("Starting step one"); > KafkaStreams streams = getKStreams(); > Thread.sleep(45000); > streams.close(); > } > > /** > * Delete state store between running step one and step two. > */ > > public void testStepTwo() throws Exception { > log.info("Starting step two"); > KafkaStreams streams = getKStreams(); > Thread.sleep(30000); > } > > public KafkaStreams getKStreams() { > Topology builder = new Topology(); > Properties props = new Properties(); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092 > "); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sampleapp"); > StreamsConfig config = new StreamsConfig(props); > > MyProcessorSupplier supplier = new MyProcessorSupplier(); > builder.addSource("source", new StringDeserializer(), new > StringDeserializer(), "sampleapp-input"); > builder.addProcessor("processor", supplier, "source"); > builder.addStateStore(new MyStateStoreBuilder(), "processor"); > > KafkaStreams streaming = new KafkaStreams(builder, config); > streaming.start(); > return streaming; > } > > public static class MyStateStoreBuilder implements StoreBuilder { > public StoreBuilder withCachingEnabled() { > return this; > } > > public StoreBuilder withLoggingEnabled(Map map) { > return this; > } > > public StoreBuilder withLoggingDisabled() { > return this; > } > > public StateStore build() { > return new MyStateStore(); > } > > public Map<String, String> logConfig() { > return Collections.emptyMap(); > } > > public boolean loggingEnabled() { > return true; > } > > public String name() { > return STATESTORE_NAME; > } > } > > public static class MyStateStore implements StateStore { > private final static Logger log = LoggerFactory.getLogger( > MyStateStore.class); > > private boolean open; > private ProcessorContext processorContext; > > public String name() { > return STATESTORE_NAME; > } > > public void init(ProcessorContext processorContext, StateStore > stateStore) { > log.info(".init"); > this.processorContext = processorContext; > open = true; > processorContext.register(stateStore, true, new > AbstractNotifyingRestoreCallback() { > public void onRestoreStart(TopicPartition topicPartition, > String storeName, long startingOffset, long endingOffset) { > log.info(".onRestoreStart"); > } > > public void onBatchRestored(TopicPartition topicPartition, > String storeName, long batchEndOffset, long numRestored) { > log.info(".onBatchRestored"); > } > > public void onRestoreEnd(TopicPartition topicPartition, > String storeName, long totalRestored) { > log.info(".onRestoreEnd"); > } > > public void restore(byte[] bytes, byte[] bytes1) { > log.info(".restore"); > } > }); > } > > public void flush() { > log.info(".flush"); > RecordCollector collector = ((RecordCollector.Supplier) > processorContext).recordCollector(); > int partition = processorContext.taskId().partition; > String topic = ProcessorStateManager.storeChangelogTopic( > processorContext.applicationId(), STATESTORE_NAME); > collector.send(topic, "ABC".getBytes(), "DEF".getBytes(), > partition, System.currentTimeMillis(), new ByteArraySerializer(), new > ByteArraySerializer()); > } > > public void close() { > log.info(".close"); > open = false; > } > > public boolean persistent() { > return true; > } > > public boolean isOpen() { > return open; > } > } > > public static class MyProcessorSupplier implements ProcessorSupplier { > public Processor get() { > return new MyProcessor(); > } > } > > public static class MyProcessor extends AbstractProcessor<Object, > Object> { > private final static Logger log = LoggerFactory.getLogger( > MyProcessor.class); > > public void init(ProcessorContext context) { > super.init(context); > log.info(".init"); > } > > public void process(Object o, Object o2) { > log.info(".process"); > } > } > } > </code> > > > 19:04:36,668 INFO [KStreamsTester$MyStateStore] .init > 19:04:36,668 INFO [KStreamsTester$MyProcessor] .init > 19:04:36,690 INFO [KStreamsTester$MyStateStore] .onRestoreStart > 19:04:36,706 INFO [KStreamsTester$MyStateStore] .restore > 19:04:36,706 INFO [KStreamsTester$MyStateStore] .restore > 19:04:36,706 INFO [KStreamsTester$MyStateStore] .restore > 19:04:36,706 INFO [KStreamsTester$MyStateStore] .onBatchRestored > 19:04:36,706 INFO [KStreamsTester$MyStateStore] .onRestoreEnd > 19:04:36,706 INFO [StreamThread] stream-thread > [sampleapp-07ac897b-7bfe-40c6-84c7-fb1f7fbd1b24-StreamThread-1] State > transition from PARTITIONS_ASSIGNED to RUNNING > 19:04:36,706 INFO [KafkaStreams] stream-client > [sampleapp-07ac897b-7bfe-40c6-84c7-fb1f7fbd1b24]State transition from > REBALANCING to RUNNING > 19:04:36,706 INFO [KStreamsTester$MyStateStore] .flush > > > ============= > > Sent: Sunday, November 12, 2017 at 1:01 PM > From: "Bill Bejeck" <b...@confluent.io> > To: users@kafka.apache.org > Subject: Re: Kafka Streams - Custom processor "init" method called before > state store has data restored into it > Rainer, > > Thanks for sharing the logs. > > With respect to option "c" in you orginal email, given that you are using > a custom processor and state store would setting a StateRestoreListener in > the custom store suit your needs? > > Would you be comfortable sharing your code so I can see if there is an > acceptable alternative I can workout for you? > > Thanks, > Bill > > > > > On Thu, Nov 9, 2017 at 2:26 PM, Rainer Guessner <raguess...@gmx.com> > wrote: > > > I have a few logs below. > > > > Thank you. > > Rainer > > > > 14:21:04,304 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery- > > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Creating restore > > consumer client > > 14:21:04,393 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery- > > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Creating shared > > producer client > > 14:21:04,429 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery- > > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Creating consumer > > client > > 14:21:04,620 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery- > > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] Starting > > 14:21:04,622 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery- > > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition > > from CREATED to RUNNING > > 14:21:04,639 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery- > > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition > > from RUNNING to PARTITIONS_REVOKED > > 14:21:04,639 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery- > > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] partition > revocation > > took 0 ms. > > suspended active tasks: [] > > suspended standby tasks: [] > > 14:21:04,748 INFO [RestApplication] Adding listener: http://0.0.0.0:8082 > > 14:21:04,768 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery- > > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition > > from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > 14:21:04,783 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery- > > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] partition > assignment > > took 15 ms. > > current active tasks: [0_0] > > current standby tasks: [] > > previous active tasks: [] > > > > STATESTORE INIT > > 14:21:04,873 INFO ... our stuff here > > PROCESSOR INIT > > 14:21:05,637 INFO ... our stuff here > > RESTORE CALLED > > ... > > RESTORE CALLED > > 14:21:05,680 INFO [StreamThread] stream-thread [t10_nr3_metadatarecovery- > > 17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] State transition > > from PARTITIONS_ASSIGNED to RUNNING > > > > ================== > > Sent: Thursday, November 09, 2017 at 1:43 PM > > From: "Bill Bejeck" <b...@confluent.io> > > To: users@kafka.apache.org > > Subject: Re: Kafka Streams - Custom processor "init" method called before > > state store has data restored into it > > Hi Rainer, > > > > Thanks for reporting this issue. Do you have any log data you can share? > > > > In the meantime, I'll look into the issue. > > > > Thanks, > > Bill > > > > On Thu, Nov 9, 2017 at 1:23 PM, Rainer Guessner <raguess...@gmx.com> > > wrote: > > > > > I have a custom processor that implements AbstractProcessor and a > custom > > > store that implements StateStore. > > > > > > Before Kafka 1.0.0 the processors "init" method gets called after the > > > state store is restored from changelog and that is good. > > > With Kafka 1.0.0 the processors "init" method is called BEFORE the > state > > > store is restored from changelog and that is bad. > > > > > > My processor can only initialize when it has access to the state. > However > > > at the time KStreams calls "init" on the processor the state store may > > not > > > have any data. It is not an option for me to initialize the processor > > > lazily when a record arrives, or to re-initialize it when > "onRestoreEnd" > > is > > > called (its only called on restore; The state store "init" gets called > > > before processor "init" regardless of restore or not.) > > > > > > I think I need to have either of these: > > > a) know whether or not a state restore will take place and when not > > > b) or get a call to the state store regardless of whether state restore > > > took place or not > > > c) or I need a "ready" method on the processor that gets called when > the > > > state store has completed restoring and is actually usable > > > > > > Please help, thank you in advance. > > > Rainer > > > > > >