Hi, It is because you are calling `context.timestamp` during `commit`. At this point there is no `RecordContext` associated with the `ProcessorContext`, hence the null pointer. The `RecordContext` is only set when streams is processing a record. You probably want to log the change when you write to the store.
Thanks, Damian On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Just to give some more information, the ProcessorContext that gets passed > to the init method of the custom store has a null RecordContext. Gave the > following debug statement .. > > println(context.asInstanceOf[ProcessorContextImpl].recordContext) > > and got null. > > regards. > > On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > > > Hi - > > > > I have implemented a custom state store named BFStore with a change > > logger as follows: > > > > class BFStoreChangeLogger[K, V](val storeName: String, > > val context: ProcessorContext, > > val partition: Int, > > val serialization: StateSerdes[K, V]) { > > > > private val topic = > ProcessorStateManager.storeChangelogTopic(context.applicationId, > > storeName) > > private val collector = context.asInstanceOf[RecordCollector.Supplier]. > > recordCollector > > > > def this(storeName: String, context: ProcessorContext, serialization: > > StateSerdes[K, V]) { > > this(storeName, context, context.taskId.partition, serialization) > > } > > > > def logChange(key: K, value: V): Unit = { > > if (collector != null) { > > val keySerializer = serialization.keySerializer > > val valueSerializer = serialization.valueSerializer > > collector.send(this.topic, key, value, this.partition, > > context.timestamp, keySerializer, valueSerializer) //**// > > } > > } > > } > > > > In my driver program I build the topology and start the streams as > follows: > > > > val builder: TopologyBuilder = new TopologyBuilder() > > > > builder.addSource("Source", config.fromTopic) > > .addProcessor("Process", () => new WeblogProcessor(), "Source") > > .addStateStore(new BFStoreSupplier[String](LOG_COUNT_STATE_STORE, > > stringSerde, true, changelogConfig), "Process") > > .addSink("Sink", "weblog-count-topic", "Process") > > > > val streams = new KafkaStreams(builder, streamingConfig) > > streams.start() > > > > When I run the program, immediately I get the following exception .. > > > > Exception in thread "StreamThread-1" > org.apache.kafka.streams.errors.ProcessorStateException: > > task [0_0] Failed to flush state store log-counts > > at org.apache.kafka.streams.processor.internals. > > ProcessorStateManager.flush(ProcessorStateManager.java:337) > > at org.apache.kafka.streams.processor.internals. > > StreamTask$1.run(StreamTask.java:72) > > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > measureLatencyNs(StreamsMetricsImpl.java:188) > > at org.apache.kafka.streams.processor.internals. > > StreamTask.commit(StreamTask.java:280) > > at org.apache.kafka.streams.processor.internals.StreamThread.commitOne( > > StreamThread.java:807) > > at org.apache.kafka.streams.processor.internals.StreamThread.commitAll( > > StreamThread.java:794) > > at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( > > StreamThread.java:769) > > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:647) > > at org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:361) > > *Caused by: java.lang.IllegalStateException: This should not happen as > > timestamp() should only be called while a record is processed* > > at org.apache.kafka.streams.processor.internals.AbstractProcessorContext. > > timestamp(AbstractProcessorContext.java:150) > > at com.lightbend.fdp.sample.kstream.processor. > > BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24) > > at com.lightbend.fdp.sample.kstream.processor.BFStore. > > flush(BFStore.scala:86) > > at org.apache.kafka.streams.processor.internals. > > ProcessorStateManager.flush(ProcessorStateManager.java:335) > > ... 8 more > > > > Not sure I understand the whole trace but looks like this may be related > > to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the > > class BFStoreChangeLogger in the line I marked above with //**//. > > > > Any help / workaround will be appreciated .. > > > > regards. > > -- > > Debasish Ghosh > > http://manning.com/ghosh2 > > http://manning.com/ghosh > > > > Twttr: @debasishg > > Blog: http://debasishg.blogspot.com > > Code: http://github.com/debasishg > > > > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >