The only place where I am doing commit is from Processor.process() .. Here
it is ..

class WeblogProcessor extends AbstractProcessor[String, String] {
  private var bfStore: BFStore[String] = _

  override def init(context: ProcessorContext): Unit = {
    super.init(context)
    this.context.schedule(1000)
    bfStore =
this.context.getStateStore(WeblogDriver.LOG_COUNT_STATE_STORE).asInstanceOf[BFStore[String]]
  }

  override def process(dummy: String, record: String): Unit =
LogParseUtil.parseLine(record) match {
    case Success(r) => {
      bfStore + r.host
      context.commit()
      context.forward(dummy, r.host)
    }
    case Failure(ex) => throw ex
  }

  override def punctuate(timestamp: Long): Unit = super.punctuate(timestamp)
  override def close(): Unit = {}
}

The commit invokes the flush() of my Store. Here is the flush() method of
my store ..

override def flush(): Unit = {
  if (loggingEnabled) {
    changeLogger.logChange(changelogKey, bf
  }
}

which in turn calls logChange that gives the error.

Am I missing something ?

regards.

On Mon, Jul 3, 2017 at 2:27 PM, Damian Guy <damian....@gmail.com> wrote:

> Hi,
>
> It is because you are calling `context.timestamp` during `commit`. At this
> point there is no `RecordContext` associated with the `ProcessorContext`,
> hence the null pointer. The `RecordContext` is only set when streams is
> processing a record. You probably want to log the change when you write to
> the store.
>
> Thanks,
> Damian
>
> On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> Just to give some more information, the ProcessorContext that gets passed
>> to the init method of the custom store has a null RecordContext. Gave the
>> following debug statement ..
>>
>> println(context.asInstanceOf[ProcessorContextImpl].recordContext)
>>
>> and got null.
>>
>> regards.
>>
>> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>> > Hi -
>> >
>> > I have implemented a custom state store named BFStore with a change
>> > logger as follows:
>> >
>> > class BFStoreChangeLogger[K, V](val storeName: String,
>> >                                 val context: ProcessorContext,
>> >                                 val partition: Int,
>> >                                 val serialization: StateSerdes[K, V]) {
>> >
>> >   private val topic = ProcessorStateManager.
>> storeChangelogTopic(context.applicationId,
>> > storeName)
>> >   private val collector = context.asInstanceOf[
>> RecordCollector.Supplier].
>> > recordCollector
>> >
>> >   def this(storeName: String, context: ProcessorContext, serialization:
>> > StateSerdes[K, V]) {
>> >     this(storeName, context, context.taskId.partition, serialization)
>> >   }
>> >
>> >   def logChange(key: K, value: V): Unit = {
>> >     if (collector != null) {
>> >       val keySerializer = serialization.keySerializer
>> >       val valueSerializer = serialization.valueSerializer
>> >       collector.send(this.topic, key, value, this.partition,
>> > context.timestamp, keySerializer, valueSerializer)  //**//
>> >     }
>> >   }
>> > }
>> >
>> > In my driver program I build the topology and start the streams as
>> follows:
>> >
>> > val builder: TopologyBuilder = new TopologyBuilder()
>> >
>> > builder.addSource("Source", config.fromTopic)
>> >        .addProcessor("Process", () => new WeblogProcessor(), "Source")
>> >        .addStateStore(new BFStoreSupplier[String](LOG_
>> COUNT_STATE_STORE,
>> > stringSerde, true, changelogConfig), "Process")
>> >        .addSink("Sink", "weblog-count-topic", "Process")
>> >
>> > val streams = new KafkaStreams(builder, streamingConfig)
>> > streams.start()
>> >
>> > When I run the program, immediately I get the following exception ..
>> >
>> > Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.
>> ProcessorStateException:
>> > task [0_0] Failed to flush state store log-counts
>> > at org.apache.kafka.streams.processor.internals.
>> > ProcessorStateManager.flush(ProcessorStateManager.java:337)
>> > at org.apache.kafka.streams.processor.internals.
>> > StreamTask$1.run(StreamTask.java:72)
>> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> > measureLatencyNs(StreamsMetricsImpl.java:188)
>> > at org.apache.kafka.streams.processor.internals.
>> > StreamTask.commit(StreamTask.java:280)
>> > at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
>> > StreamThread.java:807)
>> > at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
>> > StreamThread.java:794)
>> > at org.apache.kafka.streams.processor.internals.
>> StreamThread.maybeCommit(
>> > StreamThread.java:769)
>> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> > StreamThread.java:647)
>> > at org.apache.kafka.streams.processor.internals.
>> > StreamThread.run(StreamThread.java:361)
>> > *Caused by: java.lang.IllegalStateException: This should not happen as
>> > timestamp() should only be called while a record is processed*
>> > at org.apache.kafka.streams.processor.internals.
>> AbstractProcessorContext.
>> > timestamp(AbstractProcessorContext.java:150)
>> > at com.lightbend.fdp.sample.kstream.processor.
>> > BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24)
>> > at com.lightbend.fdp.sample.kstream.processor.BFStore.
>> > flush(BFStore.scala:86)
>> > at org.apache.kafka.streams.processor.internals.
>> > ProcessorStateManager.flush(ProcessorStateManager.java:335)
>> > ... 8 more
>> >
>> > Not sure I understand the whole trace but looks like this may be related
>> > to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the
>> > class BFStoreChangeLogger in the line I marked above with //**//.
>> >
>> > Any help / workaround will be appreciated ..
>> >
>> > regards.
>> > --
>> > Debasish Ghosh
>> > http://manning.com/ghosh2
>> > http://manning.com/ghosh
>> >
>> > Twttr: @debasishg
>> > Blog: http://debasishg.blogspot.com
>> > Code: http://github.com/debasishg
>> >
>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to