Hi all,

We are debugging an issue with a Kafka Streams application that is producing 
incorrect output. The application is a simple group-by on a key, and then 
count. As expected, the application creates a repartitioning topic for the 
group-by stage. The problem appears to be that messages are getting lost in the 
repartitioning topic.

Looking at the Kafka broker logs, it appears that the log segments for the 
repartitioning topic are getting marked for deletion very aggressively (within 
~2 seconds of being created), so fast that some segments are deleted before the 
count stage of the Kafka Streams application has had a chance to consume the 
messages.

I have checked the configuration and I cannot see a reason why the log segments 
should be getting deleted so quickly. The following line reports the creation 
of the repartitioning topic:

[2018-01-29 15:31:39,992] INFO Created log for partition 
[streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition,0]
 in /kafka-data with properties {compression.type -> producer, 
message.format.version -> 0.11.0-IV2, file.delete.delay.ms -> 100000, 
max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, 
message.timestamp.type -> CreateTime, min.insync.replicas -> 1, 
segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, 
index.interval.bytes -> 4096, unclean.leader.election.enable -> false, 
retention.bytes -> 1073741824, delete.retention.ms -> 86400000, cleanup.policy 
-> delete, flush.ms -> 9223372036854775807, segment.ms -> 3600000, 
segment.bytes -> 1073741824, retention.ms -> 86400000, 
message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes 
-> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)

As you can see, retention is set to 24 hours, retention size 1 GB, segment 
rolling time to 1 hour, segment size 1 GB. For test purposes we are running the 
Streams app on a fixed input of 7,000 messages, with a total size of only about 
5.5 MB, so we shouldn't be getting anywhere near the segment or retention 
limits. The input topic has only one partition.

Just two seconds after the topic is created, the broker reports that it is 
rolling log segments and scheduling old log segments for deletion:

[2018-01-29 15:31:41,923] INFO Rolled new log segment for 
'streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0'
 in 1 ms. (kafka.log.Log)
[2018-01-29 15:31:41,924] INFO Scheduling log segment 0 for log 
streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0
 for deletion. (kafka.log.Log)
[2018-01-29 15:31:41,945] INFO Cleared earliest 0 entries from epoch cache 
based on passed offset 6582 leaving 1 in EpochFile for partition 
streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0
 (kafka.server.epoch.LeaderEpochFileCache)
[2018-01-29 15:31:42,923] INFO Rolled new log segment for 
'streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0'
 in 2 ms. (kafka.log.Log)
[2018-01-29 15:31:42,924] INFO Scheduling log segment 6582 for log 
streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0
 for deletion. (kafka.log.Log)

100 seconds later (consistent with the setting for file.delete.delay.ms), the 
files are actually deleted:

[2018-01-29 15:33:21,923] INFO Deleting segment 0 from log 
streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0.
 (kafka.log.Log)
[2018-01-29 15:33:21,929] INFO Deleting index 
/kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000000000.index.deleted
 (kafka.log.OffsetIndex)
[2018-01-29 15:33:21,929] INFO Deleting index 
/kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000000000.timeindex.deleted
 (kafka.log.TimeIndex)
[2018-01-29 15:33:22,925] INFO Deleting segment 6582 from log 
streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0.
 (kafka.log.Log)
[2018-01-29 15:33:22,926] INFO Deleting index 
/kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000006582.index.deleted
 (kafka.log.OffsetIndex)
[2018-01-29 15:33:22,927] INFO Deleting index 
/kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000006582.timeindex.deleted
 (kafka.log.TimeIndex)

Does anyone know what might be causing the messages in the repartitioning topic 
to be deleted so aggressively?

Thanks,
Martin


Reply via email to