If the last message (or all the messages) in the earliest segment has no 
timestamp it will use the filesystem timestamp for expiring.
Since the timestamps on your 3 brokers got reset then it will be 
log.retention.hours=24 (1 day) before these segments can be deleted (unless you 
reset the file timestamp back to something over a day ago).
Even though later segments have timestamps in the messages they cannot be 
expired until all the earlier segments are deleted so they are stuck waiting 
for 24 hours as well.

The latest distribution of Kafka is 0.10.2.1 so if you can, you should also 
probably upgrade to a newer version but that is a separate discussion.

-hans




> On May 25, 2017, at 11:50 AM, Milind Vaidya <kava...@gmail.com> wrote:
> 
> In  short it should work regardless as per "During the migration phase, if
> the first message in a segment does not have a timestamp, the log rolling
> will still be based on the (current time - create time of the segment)."
> 
> But that is not happening This is also for 3 out of 6 brokers.
> The 3 good ones deleted the data properly but these 3 do not show the same
> behaviour.
> 
> I came across this JIRA : https://issues.apache.org/jira/browse/KAFKA-3802 
> <https://issues.apache.org/jira/browse/KAFKA-3802>
> 
> It says it is fixed in next version 0.10.0.1
> <https://issues.apache.org/jira/browse/KAFKA/fixforversion/12334962 
> <https://issues.apache.org/jira/browse/KAFKA/fixforversion/12334962>>. I
> even tried that. On QA hosts it retains TS for .log files across restart.
> But when tried the new version on one of the prod host, same old story.
> 
> So internal or File system ts, it should get deleted when expired. What
> could be other reason and way out ot  this ?
> 
> On Thu, May 25, 2017 at 10:43 AM, Hans Jespersen <h...@confluent.io 
> <mailto:h...@confluent.io>> wrote:
> 
>> I quoted the wrong paragraph in my earlier response. The same KIP has a
>> section on log retention as well.
>> 
>> "Enforce time based log retention
>> 
>> To enforce time based log retention, the broker will check from the oldest
>> segment forward to the latest segment. For each segment, the broker checks
>> the last time index entry of a log segment. The timestamp will be the
>> latest timestamp of the messages in the log segment. So if that timestamp
>> expires, the broker will delete the log segment. The broker will stop at
>> the first segment which is not expired. i.e. the broker will not expire a
>> segment even if it is expired, unless all the older segment has been
>> expired."
>> 
>> If none of the messages in a segment has a timestamp, last modified time
>> will be used.
>> 
>> -hans
>> 
>> /**
>> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>> * h...@confluent.io (650)924-2670
>> */
>> 
>> On Thu, May 25, 2017 at 9:53 AM, Hans Jespersen <h...@confluent.io> wrote:
>> 
>>> 0.10.x format messages have timestamps within them so retention and
>>> expiring of messages isn't entirely based on the filesystem timestamp of
>>> the log segments anymore.
>>> 
>>> From KIP-33 - https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 33+-+Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-
>>> Enforcetimebasedlogrolling
>>> 
>>> "Enforce time based log rolling
>>> 
>>> Currently time based log rolling is based on the creating time of the log
>>> segment. With this KIP, the time based rolling would be changed to only
>>> based on the message timestamp. More specifically, if the first message
>> in
>>> the log segment has a timestamp, A new log segment will be rolled out if
>>> timestamp in the message about to be appended is greater than the
>> timestamp
>>> of the first message in the segment + log.roll.ms. When
>>> message.timestamp.type=CreateTime, user should set
>>> max.message.time.difference.ms appropriately together with log.roll.ms
>> to
>>> avoid frequent log segment roll out.
>>> 
>>> During the migration phase, if the first message in a segment does not
>>> have a timestamp, the log rolling will still be based on the (current
>> time
>>> - create time of the segment)."
>>> 
>>> -hans
>>> 
>>> /**
>>> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>>> * h...@confluent.io <mailto:h...@confluent.io> (650)924-2670 
>>> <(650)%20924-2670>
>>> */
>>> 
>>> On Thu, May 25, 2017 at 12:44 AM, Milind Vaidya <kava...@gmail.com>
>> wrote:
>>> 
>>>> I have 6 broker cluster.
>>>> 
>>>> I upgraded it from 0.8.1.1 to 0.10.0.0.
>>>> 
>>>> Kafka Producer to cluster to consumer (apache storm) upgrade went smooth
>>>> without any errors.
>>>> Initially keeping protocol to 0.8 and after clients were upgraded it was
>>>> promoted to 0.10.
>>>> 
>>>> Out of 6 brokers, 3 are honouring  log.retention.hours. For other 3 when
>>>> broker is restarted the time stamp for segment changes to current time.
>>>> That leads to segments not getting deleted hence disk gets full.
>>>> 
>>>> du -khc /disk1/kafka-broker/topic-1
>>>> 
>>>> 71G     /disk1/kafka-broker/topic-1
>>>> 
>>>> 71G     total
>>>> 
>>>> Latest segment timestamp : May 25 07:34
>>>> 
>>>> Oldest segment timestamp : May 25 07:16
>>>> 
>>>> 
>>>> It is impossible that 71 GB data was collected in mere 15 mins of
>>>> time. The log.retention.hours=24
>>>> and this is not new broker so oldest data should be around 24 hrs old.
>>>> 
>>>> As mentioned above only 3 out of 6 are showing same behaviour.  Why is
>>>> this
>>>> happening ?

Reply via email to