Repository: kafka
Updated Branches:
  refs/heads/trunk dca263b4e -> 15e008783


KAFKA-3802; log mtimes reset on broker restart / shutdown

There seems to be a bug in the JDK that on some versions the mtime of
the file is modified on FileChannel.truncate() even if the javadoc states
`If the given size is greater than or equal to the file's current size then
 the file is not modified.`.

This causes problems with log retention, as all the files then look like
they contain recent data to Kafka. Therefore this is only done if the channel 
size is different to the target size.

Author: Moritz Siuts <[email protected]>

Reviewers: Jun Rao <[email protected]>, Ismael Juma <[email protected]>

Closes #1497 from msiuts/KAFKA-3802-log_mtimes_reset_on_broker_shutdown


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/15e00878
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/15e00878
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/15e00878

Branch: refs/heads/trunk
Commit: 15e008783cf73dcaed851fe6cc587767031886e5
Parents: dca263b
Author: Moritz Siuts <[email protected]>
Authored: Wed Jul 6 14:33:07 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Wed Jul 6 14:33:07 2016 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/log/FileMessageSet.scala   | 20 ++++--
 .../unit/kafka/log/FileMessageSetTest.scala     | 67 +++++++++++++++++++-
 2 files changed, 79 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/15e00878/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala 
b/core/src/main/scala/kafka/log/FileMessageSet.scala
index a454f2c..c8bce65 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -54,12 +54,12 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
     if(isSlice)
       new AtomicInteger(end - start) // don't check the file size if this is 
just a slice view
     else
-      new AtomicInteger(math.min(channel.size().toInt, end) - start)
+      new AtomicInteger(math.min(channel.size.toInt, end) - start)
 
   /* if this is not a slice, update the file pointer to the end of the file */
   if (!isSlice)
     /* set the file position to the last byte in the file */
-    channel.position(math.min(channel.size().toInt, end))
+    channel.position(math.min(channel.size.toInt, end))
 
   /**
    * Create a file message set with no slicing.
@@ -157,7 +157,7 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
    */
   def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: 
Int): Int = {
     // Ensure that the underlying size has not changed.
-    val newSize = math.min(channel.size().toInt, end) - start
+    val newSize = math.min(channel.size.toInt, end) - start
     if (newSize < _size.get()) {
       throw new KafkaException("Size of FileMessageSet %s has been truncated 
during write: old size %d, new size %d"
         .format(file.getAbsolutePath, _size.get(), newSize))
@@ -333,7 +333,11 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
   /**
    * Truncate this file message set to the given size in bytes. Note that this 
API does no checking that the
    * given size falls on a valid message boundary.
-   * @param targetSize The size to truncate to.
+   * In some versions of the JDK truncating to the same size as the file 
message set will cause an
+   * update of the files mtime, so truncate is only performed if the 
targetSize is smaller than the
+   * size of the underlying FileChannel.
+   * It is expected that no other threads will do writes to the log when this 
function is called.
+   * @param targetSize The size to truncate to. Must be between 0 and 
sizeInBytes.
    * @return The number of bytes truncated off
    */
   def truncateTo(targetSize: Int): Int = {
@@ -341,9 +345,11 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
     if(targetSize > originalSize || targetSize < 0)
       throw new KafkaException("Attempt to truncate log segment to " + 
targetSize + " bytes failed, " +
                                " size of this log segment is " + originalSize 
+ " bytes.")
-    channel.truncate(targetSize)
-    channel.position(targetSize)
-    _size.set(targetSize)
+    if (targetSize < channel.size.toInt) {
+      channel.truncate(targetSize)
+      channel.position(targetSize)
+      _size.set(targetSize)
+    }
     originalSize - targetSize
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/15e00878/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala 
b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index 417aa75..a64454d 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -19,12 +19,14 @@ package kafka.log
 
 import java.io._
 import java.nio._
-import java.util.concurrent.atomic._
+import java.nio.channels._
 
 import kafka.common.LongRef
 import org.junit.Assert._
 import kafka.utils.TestUtils._
 import kafka.message._
+import kafka.common.KafkaException
+import org.easymock.EasyMock
 import org.junit.Test
 
 class FileMessageSetTest extends BaseMessageSetTestCases {
@@ -153,6 +155,69 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   }
 
   /**
+    * Test that truncateTo only calls truncate on the FileChannel if the size 
of the
+    * FileChannel is bigger than the target size. This is important because 
some JVMs
+    * change the mtime of the file, even if truncate should do nothing.
+    */
+  @Test
+  def testTruncateNotCalledIfSizeIsSameAsTargetSize() {
+    val channelMock = EasyMock.createMock(classOf[FileChannel])
+
+    EasyMock.expect(channelMock.size).andReturn(42L).atLeastOnce()
+    EasyMock.expect(channelMock.position(42L)).andReturn(null)
+    EasyMock.replay(channelMock)
+
+    val msgSet = new FileMessageSet(tempFile(), channelMock)
+    msgSet.truncateTo(42)
+
+    EasyMock.verify(channelMock)
+  }
+
+  /**
+    * Expect a KafkaException if targetSize is bigger than the size of
+    * the FileMessageSet.
+    */
+  @Test
+  def testTruncateNotCalledIfSizeIsBiggerThanTargetSize() {
+    val channelMock = EasyMock.createMock(classOf[FileChannel])
+
+    EasyMock.expect(channelMock.size).andReturn(42L).atLeastOnce()
+    EasyMock.expect(channelMock.position(42L)).andReturn(null)
+    EasyMock.replay(channelMock)
+
+    val msgSet = new FileMessageSet(tempFile(), channelMock)
+
+    try {
+      msgSet.truncateTo(43)
+      fail("Should throw KafkaException")
+    } catch {
+      case e: KafkaException => // expected
+    }
+
+    EasyMock.verify(channelMock)
+  }
+
+  /**
+    * see #testTruncateNotCalledIfSizeIsSameAsTargetSize
+    */
+  @Test
+  def testTruncateIfSizeIsDifferentToTargetSize() {
+    val channelMock = EasyMock.createMock(classOf[FileChannel])
+
+    EasyMock.expect(channelMock.size).andReturn(42L).atLeastOnce()
+    EasyMock.expect(channelMock.position(42L)).andReturn(null).once()
+    EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once()
+    EasyMock.expect(channelMock.position(23L)).andReturn(null).once()
+    EasyMock.replay(channelMock)
+
+    val msgSet = new FileMessageSet(tempFile(), channelMock)
+    msgSet.truncateTo(23)
+
+    EasyMock.verify(channelMock)
+  }
+
+
+  /**
    * Test the new FileMessageSet with pre allocate as true
    */
   @Test

Reply via email to