[ 
https://issues.apache.org/jira/browse/KAFKA-1539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14068010#comment-14068010
 ] 

Dmitry Bugaychenko commented on KAFKA-1539:
-------------------------------------------

Digged the proble a bit more. It looks like calling flush on new 
BufferedWriter(new FileWriter(temp)) only forces buffered writer to dump 
everything into a FileOutputStream under the FileWriter and call flush on it. 
However, according to 
http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/java/io/FileOutputStream.java#FileOutputStream
 it does nothing. In order to really force data to be written to disk you need 
to call fos.getFD().sync(). According to that the patch could be like that:

{code}
  def write(offsets: Map[TopicAndPartition, Long]) {
    lock synchronized {
      // write to temp file and then swap with the existing file
      val temp = new File(file.getAbsolutePath + ".tmp")

      val fileOutputStream = new FileOutputStream(temp)
      val writer = new BufferedWriter(new FileWriter(fileOutputStream))
      try {
        // write the current version
        writer.write(0.toString)
        writer.newLine()
      
        // write the number of entries
        writer.write(offsets.size.toString)
        writer.newLine()

        // write the entries
        offsets.foreach { case (topicPart, offset) =>
          writer.write("%s %d %d".format(topicPart.topic, topicPart.partition, 
offset))
          writer.newLine()
        }
      
        // flush and overwrite old file
        writer.flush()
        
        // Force fsync to disk
        fileOutputStream.getFD.sync()
      } finally {
        writer.close()
      }
      
      // swap new offset checkpoint file with previous one
      if(!temp.renameTo(file)) {
        // renameTo() fails on Windows if the destination file exists.
        file.delete()
        if(!temp.renameTo(file))
          throw new IOException("File rename from %s to %s 
failed.".format(temp.getAbsolutePath, file.getAbsolutePath))
      }
    }
  }
{code}

Note that the problem is easily reproducable only on XFS, ext3/ext4 seems to 
handle this case much better. Hope we will be able to try the patch later this 
week and check if it helps.

> Due to OS caching Kafka might loose offset files which causes full reset of 
> data
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-1539
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1539
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.8.1.1
>            Reporter: Dmitry Bugaychenko
>            Assignee: Jay Kreps
>
> Seen this while testing power failure and disk failures. Due to chaching on 
> OS level (eg. XFS can cache data for 30 seconds) after failure we got offset 
> files of zero length. This dramatically slows down broker startup (it have to 
> re-check all segments) and if high watermark offsets lost it simply erases 
> all data and start recovering from other brokers (looks funny - first 
> spending 2-3 hours re-checking logs and then deleting them all due to missing 
> high watermark).
> Proposal: introduce offset files rotation. Keep two version of offset file, 
> write to oldest, read from the newest valid. In this case we would be able to 
> configure offset checkpoint time in a way that at least one file is alway 
> flushed and valid.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to