Hello, i have following problem with kafka-streams scala app and Exactly
once delivery quarantee:



Topic filled with kafka-streams app(exactly once enabled) has wrong ending
offset. Broker and streams API version 0.11. When i run
*kafka.tools.GetOffsetShell*, it gives ending offset 17, but in topic there
are just 12 messages (retention is disabled). When exactly once guarantee
is disabled, these offsets are matching. I tried to reset kafka-streams
according to this
<https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/>,
but problem still remains.



When i run *SimpleConsumerShell* with *--print-offsets* option, output is
folowing:



next offset = 1

{"timestamp": 149583551238149, "data": {...}}

next offset = 2

{"timestamp": 149583551238149, "data": {...}}

next offset = 4

{"timestamp": 149583551238149, "data": {...}}

next offset = 5

{"timestamp": 149583551238149, "data": {...}}

next offset = 7

{"timestamp": 149583551238149, "data": {...}}

next offset = 8

{"timestamp": 149583551238149, "data": {...}}

...



Some offsets are apparently skipped when exactly-once dellivery guarantee
is enabled. I’m reading from this topic with spark, and it is causing these
errors: “*assertion failed*: *Ran out of messages before reaching ending
offset 17 for topic offset-test partition 0 start 0. This should not
happen, and indicates that messages may have been lost” *or* “**Got wrong
record for spark-executor-<group.id <http://group.id>> <topic> 0 even after
seeking to offset” – *depend on version of spark-streaming-kafka lib. We're
running Cloudera distribution of Kafka.



My kafka streams app config is following:



        application.id = MultipleTopicsAppTest

        application.server =

        bootstrap.servers = [host:9092]

        buffered.records.per.partition = 1000

        cache.max.bytes.buffering = 10485760

        client.id =

        commit.interval.ms = 100

        connections.max.idle.ms = 540000

        default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler

        default.key.serde = class
org.apache.kafka.common.serialization.Serdes$StringSerde

        default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp

        default.value.serde = class
org.apache.kafka.common.serialization.Serdes$StringSerde

        key.serde = null

        metadata.max.age.ms = 300000

        metric.reporters = []

        metrics.num.samples = 2

        metrics.recording.level = INFO

        metrics.sample.window.ms = 30000

        num.standby.replicas = 0

        num.stream.threads = 1

        partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper

        poll.ms = 100

        processing.guarantee = exactly_once

        receive.buffer.bytes = 32768

        reconnect.backoff.max.ms = 1000

        reconnect.backoff.ms = 50

        replication.factor = 1

        request.timeout.ms = 40000

        retry.backoff.ms = 100

        rocksdb.config.setter = null

        security.protocol = PLAINTEXT

        send.buffer.bytes = 131072

        state.cleanup.delay.ms = 600000

        state.dir = /tmp/kafka-streams

        timestamp.extractor = null

        value.serde = null

        windowstore.changelog.additional.retention.ms = 86400000

        zookeeper.connect =



What can cause this problem? Thanks!

Reply via email to