Hi -

I have implemented a custom state store named BFStore with a change logger
as follows:

class BFStoreChangeLogger[K, V](val storeName: String,
                                val context: ProcessorContext,
                                val partition: Int,
                                val serialization: StateSerdes[K, V]) {

  private val topic =
ProcessorStateManager.storeChangelogTopic(context.applicationId, storeName)
  private val collector =
context.asInstanceOf[RecordCollector.Supplier].recordCollector

  def this(storeName: String, context: ProcessorContext, serialization:
StateSerdes[K, V]) {
    this(storeName, context, context.taskId.partition, serialization)
  }

  def logChange(key: K, value: V): Unit = {
    if (collector != null) {
      val keySerializer = serialization.keySerializer
      val valueSerializer = serialization.valueSerializer
      collector.send(this.topic, key, value, this.partition,
context.timestamp, keySerializer, valueSerializer)  //**//
    }
  }
}

In my driver program I build the topology and start the streams as follows:

val builder: TopologyBuilder = new TopologyBuilder()

builder.addSource("Source", config.fromTopic)
       .addProcessor("Process", () => new WeblogProcessor(), "Source")
       .addStateStore(new BFStoreSupplier[String](LOG_COUNT_STATE_STORE,
stringSerde, true, changelogConfig), "Process")
       .addSink("Sink", "weblog-count-topic", "Process")

val streams = new KafkaStreams(builder, streamingConfig)
streams.start()

When I run the program, immediately I get the following exception ..

Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
to flush state store log-counts
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
at
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
at
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
*Caused by: java.lang.IllegalStateException: This should not happen as
timestamp() should only be called while a record is processed*
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:150)
at
com.lightbend.fdp.sample.kstream.processor.BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24)
at
com.lightbend.fdp.sample.kstream.processor.BFStore.flush(BFStore.scala:86)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
... 8 more

Not sure I understand the whole trace but looks like this may be related to
https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the class
BFStoreChangeLogger in the line I marked above with //**//.

Any help / workaround will be appreciated ..

regards.
-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to