Hi,

It is because you are calling `context.timestamp` during `commit`. At this
point there is no `RecordContext` associated with the `ProcessorContext`,
hence the null pointer. The `RecordContext` is only set when streams is
processing a record. You probably want to log the change when you write to
the store.

Thanks,
Damian

On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh <ghosh.debas...@gmail.com> wrote:

> Just to give some more information, the ProcessorContext that gets passed
> to the init method of the custom store has a null RecordContext. Gave the
> following debug statement ..
>
> println(context.asInstanceOf[ProcessorContextImpl].recordContext)
>
> and got null.
>
> regards.
>
> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
> > Hi -
> >
> > I have implemented a custom state store named BFStore with a change
> > logger as follows:
> >
> > class BFStoreChangeLogger[K, V](val storeName: String,
> >                                 val context: ProcessorContext,
> >                                 val partition: Int,
> >                                 val serialization: StateSerdes[K, V]) {
> >
> >   private val topic =
> ProcessorStateManager.storeChangelogTopic(context.applicationId,
> > storeName)
> >   private val collector = context.asInstanceOf[RecordCollector.Supplier].
> > recordCollector
> >
> >   def this(storeName: String, context: ProcessorContext, serialization:
> > StateSerdes[K, V]) {
> >     this(storeName, context, context.taskId.partition, serialization)
> >   }
> >
> >   def logChange(key: K, value: V): Unit = {
> >     if (collector != null) {
> >       val keySerializer = serialization.keySerializer
> >       val valueSerializer = serialization.valueSerializer
> >       collector.send(this.topic, key, value, this.partition,
> > context.timestamp, keySerializer, valueSerializer)  //**//
> >     }
> >   }
> > }
> >
> > In my driver program I build the topology and start the streams as
> follows:
> >
> > val builder: TopologyBuilder = new TopologyBuilder()
> >
> > builder.addSource("Source", config.fromTopic)
> >        .addProcessor("Process", () => new WeblogProcessor(), "Source")
> >        .addStateStore(new BFStoreSupplier[String](LOG_COUNT_STATE_STORE,
> > stringSerde, true, changelogConfig), "Process")
> >        .addSink("Sink", "weblog-count-topic", "Process")
> >
> > val streams = new KafkaStreams(builder, streamingConfig)
> > streams.start()
> >
> > When I run the program, immediately I get the following exception ..
> >
> > Exception in thread "StreamThread-1"
> org.apache.kafka.streams.errors.ProcessorStateException:
> > task [0_0] Failed to flush state store log-counts
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.flush(ProcessorStateManager.java:337)
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask$1.run(StreamTask.java:72)
> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:188)
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:280)
> > at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> > StreamThread.java:807)
> > at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> > StreamThread.java:794)
> > at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> > StreamThread.java:769)
> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:647)
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:361)
> > *Caused by: java.lang.IllegalStateException: This should not happen as
> > timestamp() should only be called while a record is processed*
> > at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> > timestamp(AbstractProcessorContext.java:150)
> > at com.lightbend.fdp.sample.kstream.processor.
> > BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24)
> > at com.lightbend.fdp.sample.kstream.processor.BFStore.
> > flush(BFStore.scala:86)
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.flush(ProcessorStateManager.java:335)
> > ... 8 more
> >
> > Not sure I understand the whole trace but looks like this may be related
> > to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the
> > class BFStoreChangeLogger in the line I marked above with //**//.
> >
> > Any help / workaround will be appreciated ..
> >
> > regards.
> > --
> > Debasish Ghosh
> > http://manning.com/ghosh2
> > http://manning.com/ghosh
> >
> > Twttr: @debasishg
> > Blog: http://debasishg.blogspot.com
> > Code: http://github.com/debasishg
> >
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to