Hi,

It is likely due to the timestamps you are extracting and using as the
record timestamp. Kafka uses the record timestamps for retention. I suspect
this is causing your segments to roll and be deleted.

Thanks,
Damian

On Fri, 15 Dec 2017 at 11:49 Wim Van Leuven <wim.vanleu...@highestpoint.biz>
wrote:

> Hello all,
>
> We are running some Kafka Streams processing apps over Confluent OS
> (v3.2.0) and I'm seeing unexpected but 'consitent' behaviour regarding
> segment and index deletion.
>
> So, we have a topic 'input' that contains about 30M records to ingest. A
> 1st processor transforms and pipes the data onto a second, intermediate
> topic. A 2nd processor picks up the records, treats them and sends them
> out.
>
> On our test environment the intermediate topic was set up with a retention
> of 1 hour because we don't need to keep the data, only while processing.
>
> On a test run we saw the 2nd processor exit with exceptions that it
> couldn't read offsets. We do not automatically reset because it should not
> happen.
>
> org.apache.kafka.streams.errors.StreamsException: No valid committed offset
> found for input topic cdr-raw-arch (partition 1) and no valid reset policy
> configured. You need to set configuration parameter "auto.offset.reset" or
> specify a topic specific reset policy via
> KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or
> KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)
>
> As we thought that it's the topic data expiring (processing takes longer
> than 1 hour) we changed the topic to retain the data for 1 day.
>
> On rerun, we however saw exactly the same behaviour. That's why I'm saying
> 'consistent behaviour' above.
>
> In the server logs, we see that kafka is rolling segments but immediately
> scheduling them for deletion.
>
> [2017-12-15 11:01:46,992] INFO Rolled new log segment for
> 'cdr-raw-arch-1' in 1 ms. (kafka.log.Log)
> [2017-12-15 11:01:46,993] INFO Scheduling log segment 7330185 for log
> cdr-raw-arch-1 for deletion. (kafka.log.Log)
> [2017-12-15 11:01:46,995] INFO Rolled new log segment for
> 'cdr-raw-arch-0' in 2 ms. (kafka.log.Log)
> [2017-12-15 11:01:46,995] INFO Scheduling log segment 7335872 for log
> cdr-raw-arch-0 for deletion. (kafka.log.Log)
> [2017-12-15 11:02:46,995] INFO Deleting segment 7330185 from log
> cdr-raw-arch-1. (kafka.log.Log)
> [2017-12-15 11:02:46,996] INFO Deleting segment 7335872 from log
> cdr-raw-arch-0. (kafka.log.Log)
> [2017-12-15 11:02:47,170] INFO Deleting index
> /data/4/kafka/cdr-raw-arch-1/00000000000007330185.index.deleted
> (kafka.log.OffsetIndex)
> [2017-12-15 11:02:47,171] INFO Deleting index
> /data/4/kafka/cdr-raw-arch-1/00000000000007330185.timeindex.deleted
> (kafka.log.TimeIndex)
> [2017-12-15 11:02:47,172] INFO Deleting index
> /data/3/kafka/cdr-raw-arch-0/00000000000007335872.index.deleted
> (kafka.log.OffsetIndex)
> [2017-12-15 11:02:47,173] INFO Deleting index
> /data/3/kafka/cdr-raw-arch-0/00000000000007335872.timeindex.deleted
> (kafka.log.TimeIndex)
>
>
> However, I do not understand the behaviour: Why is kafka deleting the data
> on the intermediary topic before it got processed? Almost immediately even?
>
> We do use timestamp extractors to pull business time from the records. Is
> that taken into account for retention time? Or is retention only based on
> times of the files on disk?
>
> Thank you to shed any light on this problem!
>
> Kind regards!
> -wim
>

Reply via email to