Ok, so I make the following change .. Is this the change that u suggested ?
// remove commit from process(). So process now looks as follows: override def process(dummy: String, record: String): Unit = LogParseUtil.parseLine(record) match { case Success(r) => { bfStore + r.host bfStore.flush() } case Failure(ex) => throw ex } Still I get the same exception. Just as a test, I removed the flush as well from process() .. override def process(dummy: String, record: String): Unit = LogParseUtil.parseLine(record) match { case Success(r) => { bfStore + r.host } case Failure(ex) => throw ex } and still get the same exception as it does call flush after commit from within .. here's the trace .. Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store log-counts at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:150) at com.lightbend.fdp.sample.kstream.processor.BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:25) at com.lightbend.fdp.sample.kstream.processor.BFStore.flush(BFStore.scala:89) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335) regards. On Mon, Jul 3, 2017 at 2:55 PM, Damian Guy <damian....@gmail.com> wrote: > `commit` is called by streams, you can see it in your stack trace above: > > > org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:280) > > `commit` will subsequently call `flush` on any stores. At this point, > though, there will be no `RecordContext` as there are no records being > processed. Note, that calling `context.commit()` from your Processor isn't > actually performing the commit, it is just signalling that a commit is > necessary after this record has been processed. You may not want to do that > as it probably will impact throughput. > > You should log the change when you write to the store, i.e, i think when > you do: > bfStore + r.host > > > Does that help? > > Thanks, > Damian > > > On Mon, 3 Jul 2017 at 10:12 Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> The only place where I am doing commit is from Processor.process() .. >> Here it is .. >> >> class WeblogProcessor extends AbstractProcessor[String, String] { >> private var bfStore: BFStore[String] = _ >> >> override def init(context: ProcessorContext): Unit = { >> super.init(context) >> this.context.schedule(1000) >> bfStore = this.context.getStateStore(WeblogDriver.LOG_COUNT_STATE_ >> STORE).asInstanceOf[BFStore[String]] >> } >> >> override def process(dummy: String, record: String): Unit = >> LogParseUtil.parseLine(record) match { >> case Success(r) => { >> bfStore + r.host >> context.commit() >> context.forward(dummy, r.host) >> } >> case Failure(ex) => throw ex >> } >> >> override def punctuate(timestamp: Long): Unit = >> super.punctuate(timestamp) >> override def close(): Unit = {} >> } >> >> The commit invokes the flush() of my Store. Here is the flush() method >> of my store .. >> >> override def flush(): Unit = { >> if (loggingEnabled) { >> changeLogger.logChange(changelogKey, bf >> } >> } >> >> which in turn calls logChange that gives the error. >> >> Am I missing something ? >> >> regards. >> >> On Mon, Jul 3, 2017 at 2:27 PM, Damian Guy <damian....@gmail.com> wrote: >> >>> Hi, >>> >>> It is because you are calling `context.timestamp` during `commit`. At >>> this point there is no `RecordContext` associated with the >>> `ProcessorContext`, hence the null pointer. The `RecordContext` is only set >>> when streams is processing a record. You probably want to log the change >>> when you write to the store. >>> >>> Thanks, >>> Damian >>> >>> On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh <ghosh.debas...@gmail.com> >>> wrote: >>> >>>> Just to give some more information, the ProcessorContext that gets >>>> passed >>>> to the init method of the custom store has a null RecordContext. Gave >>>> the >>>> following debug statement .. >>>> >>>> println(context.asInstanceOf[ProcessorContextImpl].recordContext) >>>> >>>> and got null. >>>> >>>> regards. >>>> >>>> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh < >>>> ghosh.debas...@gmail.com> >>>> wrote: >>>> >>>> > Hi - >>>> > >>>> > I have implemented a custom state store named BFStore with a change >>>> > logger as follows: >>>> > >>>> > class BFStoreChangeLogger[K, V](val storeName: String, >>>> > val context: ProcessorContext, >>>> > val partition: Int, >>>> > val serialization: StateSerdes[K, V]) >>>> { >>>> > >>>> > private val topic = ProcessorStateManager. >>>> storeChangelogTopic(context.applicationId, >>>> > storeName) >>>> > private val collector = context.asInstanceOf[ >>>> RecordCollector.Supplier]. >>>> > recordCollector >>>> > >>>> > def this(storeName: String, context: ProcessorContext, >>>> serialization: >>>> > StateSerdes[K, V]) { >>>> > this(storeName, context, context.taskId.partition, serialization) >>>> > } >>>> > >>>> > def logChange(key: K, value: V): Unit = { >>>> > if (collector != null) { >>>> > val keySerializer = serialization.keySerializer >>>> > val valueSerializer = serialization.valueSerializer >>>> > collector.send(this.topic, key, value, this.partition, >>>> > context.timestamp, keySerializer, valueSerializer) //**// >>>> > } >>>> > } >>>> > } >>>> > >>>> > In my driver program I build the topology and start the streams as >>>> follows: >>>> > >>>> > val builder: TopologyBuilder = new TopologyBuilder() >>>> > >>>> > builder.addSource("Source", config.fromTopic) >>>> > .addProcessor("Process", () => new WeblogProcessor(), "Source") >>>> > .addStateStore(new BFStoreSupplier[String](LOG_ >>>> COUNT_STATE_STORE, >>>> > stringSerde, true, changelogConfig), "Process") >>>> > .addSink("Sink", "weblog-count-topic", "Process") >>>> > >>>> > val streams = new KafkaStreams(builder, streamingConfig) >>>> > streams.start() >>>> > >>>> > When I run the program, immediately I get the following exception .. >>>> > >>>> > Exception in thread "StreamThread-1" org.apache.kafka.streams.errors. >>>> ProcessorStateException: >>>> > task [0_0] Failed to flush state store log-counts >>>> > at org.apache.kafka.streams.processor.internals. >>>> > ProcessorStateManager.flush(ProcessorStateManager.java:337) >>>> > at org.apache.kafka.streams.processor.internals. >>>> > StreamTask$1.run(StreamTask.java:72) >>>> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >>>> > measureLatencyNs(StreamsMetricsImpl.java:188) >>>> > at org.apache.kafka.streams.processor.internals. >>>> > StreamTask.commit(StreamTask.java:280) >>>> > at org.apache.kafka.streams.processor.internals. >>>> StreamThread.commitOne( >>>> > StreamThread.java:807) >>>> > at org.apache.kafka.streams.processor.internals. >>>> StreamThread.commitAll( >>>> > StreamThread.java:794) >>>> > at org.apache.kafka.streams.processor.internals. >>>> StreamThread.maybeCommit( >>>> > StreamThread.java:769) >>>> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >>>> > StreamThread.java:647) >>>> > at org.apache.kafka.streams.processor.internals. >>>> > StreamThread.run(StreamThread.java:361) >>>> > *Caused by: java.lang.IllegalStateException: This should not happen >>>> as >>>> > timestamp() should only be called while a record is processed* >>>> > at org.apache.kafka.streams.processor.internals. >>>> AbstractProcessorContext. >>>> > timestamp(AbstractProcessorContext.java:150) >>>> > at com.lightbend.fdp.sample.kstream.processor. >>>> > BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24) >>>> > at com.lightbend.fdp.sample.kstream.processor.BFStore. >>>> > flush(BFStore.scala:86) >>>> > at org.apache.kafka.streams.processor.internals. >>>> > ProcessorStateManager.flush(ProcessorStateManager.java:335) >>>> > ... 8 more >>>> > >>>> > Not sure I understand the whole trace but looks like this may be >>>> related >>>> > to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from >>>> the >>>> > class BFStoreChangeLogger in the line I marked above with //**//. >>>> > >>>> > Any help / workaround will be appreciated .. >>>> > >>>> > regards. >>>> > -- >>>> > Debasish Ghosh >>>> > http://manning.com/ghosh2 >>>> > http://manning.com/ghosh >>>> > >>>> > Twttr: @debasishg >>>> > Blog: http://debasishg.blogspot.com >>>> > Code: http://github.com/debasishg >>>> > >>>> >>>> >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg