`commit` is called by streams, you can see it in your stack trace above:

>
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)

`commit` will subsequently call `flush` on any stores. At this point,
though, there will be no `RecordContext` as there are no records being
processed. Note, that calling `context.commit()` from your Processor isn't
actually performing the commit, it is just signalling that a commit is
necessary after this record has been processed. You may not want to do that
as it probably will impact throughput.

You should log the change when you write to the store, i.e, i think when
you do:
bfStore + r.host


Does that help?

Thanks,
Damian


On Mon, 3 Jul 2017 at 10:12 Debasish Ghosh <ghosh.debas...@gmail.com> wrote:

> The only place where I am doing commit is from Processor.process() ..
> Here it is ..
>
> class WeblogProcessor extends AbstractProcessor[String, String] {
>   private var bfStore: BFStore[String] = _
>
>   override def init(context: ProcessorContext): Unit = {
>     super.init(context)
>     this.context.schedule(1000)
>     bfStore =
> this.context.getStateStore(WeblogDriver.LOG_COUNT_STATE_STORE).asInstanceOf[BFStore[String]]
>   }
>
>   override def process(dummy: String, record: String): Unit =
> LogParseUtil.parseLine(record) match {
>     case Success(r) => {
>       bfStore + r.host
>       context.commit()
>       context.forward(dummy, r.host)
>     }
>     case Failure(ex) => throw ex
>   }
>
>   override def punctuate(timestamp: Long): Unit =
> super.punctuate(timestamp)
>   override def close(): Unit = {}
> }
>
> The commit invokes the flush() of my Store. Here is the flush() method of
> my store ..
>
> override def flush(): Unit = {
>   if (loggingEnabled) {
>     changeLogger.logChange(changelogKey, bf
>   }
> }
>
> which in turn calls logChange that gives the error.
>
> Am I missing something ?
>
> regards.
>
> On Mon, Jul 3, 2017 at 2:27 PM, Damian Guy <damian....@gmail.com> wrote:
>
>> Hi,
>>
>> It is because you are calling `context.timestamp` during `commit`. At
>> this point there is no `RecordContext` associated with the
>> `ProcessorContext`, hence the null pointer. The `RecordContext` is only set
>> when streams is processing a record. You probably want to log the change
>> when you write to the store.
>>
>> Thanks,
>> Damian
>>
>> On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> Just to give some more information, the ProcessorContext that gets passed
>>> to the init method of the custom store has a null RecordContext. Gave the
>>> following debug statement ..
>>>
>>> println(context.asInstanceOf[ProcessorContextImpl].recordContext)
>>>
>>> and got null.
>>>
>>> regards.
>>>
>>> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh <ghosh.debas...@gmail.com
>>> >
>>> wrote:
>>>
>>> > Hi -
>>> >
>>> > I have implemented a custom state store named BFStore with a change
>>> > logger as follows:
>>> >
>>> > class BFStoreChangeLogger[K, V](val storeName: String,
>>> >                                 val context: ProcessorContext,
>>> >                                 val partition: Int,
>>> >                                 val serialization: StateSerdes[K, V]) {
>>> >
>>> >   private val topic =
>>> ProcessorStateManager.storeChangelogTopic(context.applicationId,
>>> > storeName)
>>> >   private val collector =
>>> context.asInstanceOf[RecordCollector.Supplier].
>>> > recordCollector
>>> >
>>> >   def this(storeName: String, context: ProcessorContext, serialization:
>>> > StateSerdes[K, V]) {
>>> >     this(storeName, context, context.taskId.partition, serialization)
>>> >   }
>>> >
>>> >   def logChange(key: K, value: V): Unit = {
>>> >     if (collector != null) {
>>> >       val keySerializer = serialization.keySerializer
>>> >       val valueSerializer = serialization.valueSerializer
>>> >       collector.send(this.topic, key, value, this.partition,
>>> > context.timestamp, keySerializer, valueSerializer)  //**//
>>> >     }
>>> >   }
>>> > }
>>> >
>>> > In my driver program I build the topology and start the streams as
>>> follows:
>>> >
>>> > val builder: TopologyBuilder = new TopologyBuilder()
>>> >
>>> > builder.addSource("Source", config.fromTopic)
>>> >        .addProcessor("Process", () => new WeblogProcessor(), "Source")
>>> >        .addStateStore(new
>>> BFStoreSupplier[String](LOG_COUNT_STATE_STORE,
>>> > stringSerde, true, changelogConfig), "Process")
>>> >        .addSink("Sink", "weblog-count-topic", "Process")
>>> >
>>> > val streams = new KafkaStreams(builder, streamingConfig)
>>> > streams.start()
>>> >
>>> > When I run the program, immediately I get the following exception ..
>>> >
>>> > Exception in thread "StreamThread-1"
>>> org.apache.kafka.streams.errors.ProcessorStateException:
>>> > task [0_0] Failed to flush state store log-counts
>>> > at org.apache.kafka.streams.processor.internals.
>>> > ProcessorStateManager.flush(ProcessorStateManager.java:337)
>>> > at org.apache.kafka.streams.processor.internals.
>>> > StreamTask$1.run(StreamTask.java:72)
>>> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>> > measureLatencyNs(StreamsMetricsImpl.java:188)
>>> > at org.apache.kafka.streams.processor.internals.
>>> > StreamTask.commit(StreamTask.java:280)
>>> > at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
>>> > StreamThread.java:807)
>>> > at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
>>> > StreamThread.java:794)
>>> > at
>>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
>>> > StreamThread.java:769)
>>> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>> > StreamThread.java:647)
>>> > at org.apache.kafka.streams.processor.internals.
>>> > StreamThread.run(StreamThread.java:361)
>>> > *Caused by: java.lang.IllegalStateException: This should not happen as
>>> > timestamp() should only be called while a record is processed*
>>> > at
>>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
>>> > timestamp(AbstractProcessorContext.java:150)
>>> > at com.lightbend.fdp.sample.kstream.processor.
>>> > BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24)
>>> > at com.lightbend.fdp.sample.kstream.processor.BFStore.
>>> > flush(BFStore.scala:86)
>>> > at org.apache.kafka.streams.processor.internals.
>>> > ProcessorStateManager.flush(ProcessorStateManager.java:335)
>>> > ... 8 more
>>> >
>>> > Not sure I understand the whole trace but looks like this may be
>>> related
>>> > to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the
>>> > class BFStoreChangeLogger in the line I marked above with //**//.
>>> >
>>> > Any help / workaround will be appreciated ..
>>> >
>>> > regards.
>>> > --
>>> > Debasish Ghosh
>>> > http://manning.com/ghosh2
>>> > http://manning.com/ghosh
>>> >
>>> > Twttr: @debasishg
>>> > Blog: http://debasishg.blogspot.com
>>> > Code: http://github.com/debasishg
>>> >
>>>
>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to