The only place where I am doing commit is from Processor.process() .. Here it is ..
class WeblogProcessor extends AbstractProcessor[String, String] { private var bfStore: BFStore[String] = _ override def init(context: ProcessorContext): Unit = { super.init(context) this.context.schedule(1000) bfStore = this.context.getStateStore(WeblogDriver.LOG_COUNT_STATE_STORE).asInstanceOf[BFStore[String]] } override def process(dummy: String, record: String): Unit = LogParseUtil.parseLine(record) match { case Success(r) => { bfStore + r.host context.commit() context.forward(dummy, r.host) } case Failure(ex) => throw ex } override def punctuate(timestamp: Long): Unit = super.punctuate(timestamp) override def close(): Unit = {} } The commit invokes the flush() of my Store. Here is the flush() method of my store .. override def flush(): Unit = { if (loggingEnabled) { changeLogger.logChange(changelogKey, bf } } which in turn calls logChange that gives the error. Am I missing something ? regards. On Mon, Jul 3, 2017 at 2:27 PM, Damian Guy <damian....@gmail.com> wrote: > Hi, > > It is because you are calling `context.timestamp` during `commit`. At this > point there is no `RecordContext` associated with the `ProcessorContext`, > hence the null pointer. The `RecordContext` is only set when streams is > processing a record. You probably want to log the change when you write to > the store. > > Thanks, > Damian > > On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> Just to give some more information, the ProcessorContext that gets passed >> to the init method of the custom store has a null RecordContext. Gave the >> following debug statement .. >> >> println(context.asInstanceOf[ProcessorContextImpl].recordContext) >> >> and got null. >> >> regards. >> >> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >> > Hi - >> > >> > I have implemented a custom state store named BFStore with a change >> > logger as follows: >> > >> > class BFStoreChangeLogger[K, V](val storeName: String, >> > val context: ProcessorContext, >> > val partition: Int, >> > val serialization: StateSerdes[K, V]) { >> > >> > private val topic = ProcessorStateManager. >> storeChangelogTopic(context.applicationId, >> > storeName) >> > private val collector = context.asInstanceOf[ >> RecordCollector.Supplier]. >> > recordCollector >> > >> > def this(storeName: String, context: ProcessorContext, serialization: >> > StateSerdes[K, V]) { >> > this(storeName, context, context.taskId.partition, serialization) >> > } >> > >> > def logChange(key: K, value: V): Unit = { >> > if (collector != null) { >> > val keySerializer = serialization.keySerializer >> > val valueSerializer = serialization.valueSerializer >> > collector.send(this.topic, key, value, this.partition, >> > context.timestamp, keySerializer, valueSerializer) //**// >> > } >> > } >> > } >> > >> > In my driver program I build the topology and start the streams as >> follows: >> > >> > val builder: TopologyBuilder = new TopologyBuilder() >> > >> > builder.addSource("Source", config.fromTopic) >> > .addProcessor("Process", () => new WeblogProcessor(), "Source") >> > .addStateStore(new BFStoreSupplier[String](LOG_ >> COUNT_STATE_STORE, >> > stringSerde, true, changelogConfig), "Process") >> > .addSink("Sink", "weblog-count-topic", "Process") >> > >> > val streams = new KafkaStreams(builder, streamingConfig) >> > streams.start() >> > >> > When I run the program, immediately I get the following exception .. >> > >> > Exception in thread "StreamThread-1" org.apache.kafka.streams.errors. >> ProcessorStateException: >> > task [0_0] Failed to flush state store log-counts >> > at org.apache.kafka.streams.processor.internals. >> > ProcessorStateManager.flush(ProcessorStateManager.java:337) >> > at org.apache.kafka.streams.processor.internals. >> > StreamTask$1.run(StreamTask.java:72) >> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> > measureLatencyNs(StreamsMetricsImpl.java:188) >> > at org.apache.kafka.streams.processor.internals. >> > StreamTask.commit(StreamTask.java:280) >> > at org.apache.kafka.streams.processor.internals.StreamThread.commitOne( >> > StreamThread.java:807) >> > at org.apache.kafka.streams.processor.internals.StreamThread.commitAll( >> > StreamThread.java:794) >> > at org.apache.kafka.streams.processor.internals. >> StreamThread.maybeCommit( >> > StreamThread.java:769) >> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> > StreamThread.java:647) >> > at org.apache.kafka.streams.processor.internals. >> > StreamThread.run(StreamThread.java:361) >> > *Caused by: java.lang.IllegalStateException: This should not happen as >> > timestamp() should only be called while a record is processed* >> > at org.apache.kafka.streams.processor.internals. >> AbstractProcessorContext. >> > timestamp(AbstractProcessorContext.java:150) >> > at com.lightbend.fdp.sample.kstream.processor. >> > BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24) >> > at com.lightbend.fdp.sample.kstream.processor.BFStore. >> > flush(BFStore.scala:86) >> > at org.apache.kafka.streams.processor.internals. >> > ProcessorStateManager.flush(ProcessorStateManager.java:335) >> > ... 8 more >> > >> > Not sure I understand the whole trace but looks like this may be related >> > to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the >> > class BFStoreChangeLogger in the line I marked above with //**//. >> > >> > Any help / workaround will be appreciated .. >> > >> > regards. >> > -- >> > Debasish Ghosh >> > http://manning.com/ghosh2 >> > http://manning.com/ghosh >> > >> > Twttr: @debasishg >> > Blog: http://debasishg.blogspot.com >> > Code: http://github.com/debasishg >> > >> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg