Hi - I have implemented a custom state store named BFStore with a change logger as follows:
class BFStoreChangeLogger[K, V](val storeName: String, val context: ProcessorContext, val partition: Int, val serialization: StateSerdes[K, V]) { private val topic = ProcessorStateManager.storeChangelogTopic(context.applicationId, storeName) private val collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector def this(storeName: String, context: ProcessorContext, serialization: StateSerdes[K, V]) { this(storeName, context, context.taskId.partition, serialization) } def logChange(key: K, value: V): Unit = { if (collector != null) { val keySerializer = serialization.keySerializer val valueSerializer = serialization.valueSerializer collector.send(this.topic, key, value, this.partition, context.timestamp, keySerializer, valueSerializer) //**// } } } In my driver program I build the topology and start the streams as follows: val builder: TopologyBuilder = new TopologyBuilder() builder.addSource("Source", config.fromTopic) .addProcessor("Process", () => new WeblogProcessor(), "Source") .addStateStore(new BFStoreSupplier[String](LOG_COUNT_STATE_STORE, stringSerde, true, changelogConfig), "Process") .addSink("Sink", "weblog-count-topic", "Process") val streams = new KafkaStreams(builder, streamingConfig) streams.start() When I run the program, immediately I get the following exception .. Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store log-counts at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) *Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed* at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:150) at com.lightbend.fdp.sample.kstream.processor.BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24) at com.lightbend.fdp.sample.kstream.processor.BFStore.flush(BFStore.scala:86) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335) ... 8 more Not sure I understand the whole trace but looks like this may be related to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the class BFStoreChangeLogger in the line I marked above with //**//. Any help / workaround will be appreciated .. regards. -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg