Hello, i have following problem with kafka-streams scala app and Exactly once delivery quarantee:
Topic filled with kafka-streams app(exactly once enabled) has wrong ending offset. Broker and streams API version 0.11. When i run *kafka.tools.GetOffsetShell*, it gives ending offset 17, but in topic there are just 12 messages (retention is disabled). When exactly once guarantee is disabled, these offsets are matching. I tried to reset kafka-streams according to this <https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/>, but problem still remains. When i run *SimpleConsumerShell* with *--print-offsets* option, output is folowing: next offset = 1 {"timestamp": 149583551238149, "data": {...}} next offset = 2 {"timestamp": 149583551238149, "data": {...}} next offset = 4 {"timestamp": 149583551238149, "data": {...}} next offset = 5 {"timestamp": 149583551238149, "data": {...}} next offset = 7 {"timestamp": 149583551238149, "data": {...}} next offset = 8 {"timestamp": 149583551238149, "data": {...}} ... Some offsets are apparently skipped when exactly-once dellivery guarantee is enabled. I’m reading from this topic with spark, and it is causing these errors: “*assertion failed*: *Ran out of messages before reaching ending offset 17 for topic offset-test partition 0 start 0. This should not happen, and indicates that messages may have been lost” *or* “**Got wrong record for spark-executor-<group.id <http://group.id>> <topic> 0 even after seeking to offset” – *depend on version of spark-streaming-kafka lib. We're running Cloudera distribution of Kafka. My kafka streams app config is following: application.id = MultipleTopicsAppTest application.server = bootstrap.servers = [host:9092] buffered.records.per.partition = 1000 cache.max.bytes.buffering = 10485760 client.id = commit.interval.ms = 100 connections.max.idle.ms = 540000 default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde key.serde = null metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 num.standby.replicas = 0 num.stream.threads = 1 partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper poll.ms = 100 processing.guarantee = exactly_once receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = 1 request.timeout.ms = 40000 retry.backoff.ms = 100 rocksdb.config.setter = null security.protocol = PLAINTEXT send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /tmp/kafka-streams timestamp.extractor = null value.serde = null windowstore.changelog.additional.retention.ms = 86400000 zookeeper.connect = What can cause this problem? Thanks!