[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349179#comment-16349179
 ] 

ASF GitHub Bot commented on KAFKA-6492:
---

becketqin closed pull request #4498: KAFKA-6492: Fix log truncation to empty 
segment
URL: https://github.com/apache/kafka/pull/4498
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 45c820bff8d..5970f42f6d9 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -28,7 +28,7 @@ import kafka.utils._
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record.FileRecords.LogOffsetPosition
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.{Time}
+import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
 import scala.math._
@@ -345,20 +345,23 @@ class LogSegment private[log] (val log: FileRecords,
*/
   @nonthreadsafe
   def truncateTo(offset: Long): Int = {
+// Do offset translation before truncating the index to avoid needless 
scanning
+// in case we truncate the full index
 val mapping = translateOffset(offset)
-if (mapping == null)
-  return 0
 offsetIndex.truncateTo(offset)
 timeIndex.truncateTo(offset)
 txnIndex.truncateTo(offset)
-// after truncation, reset and allocate more space for the (new currently  
active) index
+
+// After truncation, reset and allocate more space for the (new currently 
active) index
 offsetIndex.resize(offsetIndex.maxIndexSize)
 timeIndex.resize(timeIndex.maxIndexSize)
-val bytesTruncated = log.truncateTo(mapping.position)
-if(log.sizeInBytes == 0) {
+
+val bytesTruncated = if (mapping == null) 0 else 
log.truncateTo(mapping.position)
+if (log.sizeInBytes == 0) {
   created = time.milliseconds
   rollingBasedTimestamp = None
 }
+
 bytesSinceLastIndexEntry = 0
 if (maxTimestampSoFar >= 0)
   loadLargestTimestamp()
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 469b3cca40e..c45ed0d2986 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.TestUtils
 import kafka.utils.TestUtils.checkEquals
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -36,13 +36,16 @@ class LogSegmentTest {
   var logDir: File = _
 
   /* create a segment with the given base offset */
-  def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = {
+  def createSegment(offset: Long,
+indexIntervalBytes: Int = 10,
+maxSegmentMs: Int = Int.MaxValue,
+time: Time = Time.SYSTEM): LogSegment = {
 val ms = FileRecords.open(Log.logFile(logDir, offset))
 val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, 
maxIndexSize = 1000)
 val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, 
maxIndexSize = 1500)
 val txnIndex = new TransactionIndex(offset, 
Log.transactionIndexFile(logDir, offset))
-val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, 
indexIntervalBytes, 0, maxSegmentMs = Int.MaxValue,
-  maxSegmentBytes = Int.MaxValue, Time.SYSTEM)
+val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, 
indexIntervalBytes, 0, maxSegmentMs = maxSegmentMs,
+  maxSegmentBytes = Int.MaxValue, time)
 segments += seg
 seg
   }
@@ -157,6 +160,47 @@ class LogSegmentTest {
 }
   }
 
+  @Test
+  def testTruncateEmptySegment() {
+// This tests the scenario in which the follower truncates to an empty 
segment. In this
+// case we must ensure that the index is resized so that the log segment 
is not mistakenly
+// rolled due to a full index
+
+val maxSegmentMs = 30
+val time = new MockTime
+val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+seg.close()
+
+val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+assertEquals(0, seg.timeIndex.sizeInBytes)
+assertEquals(0, seg.offsetIndex.sizeInBytes)
+
+time.sleep(500)
+reopened.truncateTo(57)
+assertEquals(0, reopened.timeWaitedForRoll(time.milliseconds(), 
RecordBatch.NO_TIMESTAMP))
+

[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file

2018-01-31 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347816#comment-16347816
 ] 

Jason Gustafson commented on KAFKA-6492:


[~becket_qin] Beat you to it. Let me know if the PR I submitted makes sense.

> LogSemgent.truncateTo() should always resize the index file
> ---
>
> Key: KAFKA-6492
> URL: https://issues.apache.org/jira/browse/KAFKA-6492
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.2, 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.1.0
>
>
> The bug is the following:
>  # Initially on a follower broker there are two segments 0 and segment 1. 
> Segment 0 is empty (maybe due to log compaction)
>  # log is truncated to 0.
>  # LogSemgent.Truncate() will not find a message to truncate in segment 0, so 
> it will skip resizing the index/timeindex files. 
>  # When a new message is fetched, Log.maybeRoll() will try to roll a new 
> segment because the index file of segment 0 is already full (max size is 0)
>  # After creating the new segment 0, the replica fetcher thread finds that 
> there is already a segment 0 exists. So it just throws exception and dies.
> The fix would be let the broker make sure the index files of active segments 
> are always resized properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file

2018-01-31 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347670#comment-16347670
 ] 

Jiangjie Qin commented on KAFKA-6492:
-

[~hachikuji] You are right. This issue will affect earlier versions as well. 
I'll submit a PR.

> LogSemgent.truncateTo() should always resize the index file
> ---
>
> Key: KAFKA-6492
> URL: https://issues.apache.org/jira/browse/KAFKA-6492
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.1.0
>
>
> The bug is the following:
>  # Initially on a follower broker there are two segments 0 and segment 1. 
> Segment 0 is empty (maybe due to log compaction)
>  # log is truncated to 0.
>  # LogSemgent.Truncate() will not find a message to truncate in segment 0, so 
> it will skip resizing the index/timeindex files. 
>  # When a new message is fetched, Log.maybeRoll() will try to roll a new 
> segment because the index file of segment 0 is already full (max size is 0)
>  # After creating the new segment 0, the replica fetcher thread finds that 
> there is already a segment 0 exists. So it just throws exception and dies.
> The fix would be let the broker make sure the index files of active segments 
> are always resized properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347612#comment-16347612
 ] 

ASF GitHub Bot commented on KAFKA-6492:
---

hachikuji opened a new pull request #4498: KAFKA-6492: Fix log truncation to 
empty segment
URL: https://github.com/apache/kafka/pull/4498
 
 
   This patch ensures that truncation to an empty segment forces resizing of 
the index file in order to prevent premature rolling.
   
   I have added unit tests which verify that appends are permitted following 
truncation to an empty segment. Without the fix, this test case reproduces the 
failure in which the rolled segment matches the current active segment.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> LogSemgent.truncateTo() should always resize the index file
> ---
>
> Key: KAFKA-6492
> URL: https://issues.apache.org/jira/browse/KAFKA-6492
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.1.0
>
>
> The bug is the following:
>  # Initially on a follower broker there are two segments 0 and segment 1. 
> Segment 0 is empty (maybe due to log compaction)
>  # log is truncated to 0.
>  # LogSemgent.Truncate() will not find a message to truncate in segment 0, so 
> it will skip resizing the index/timeindex files. 
>  # When a new message is fetched, Log.maybeRoll() will try to roll a new 
> segment because the index file of segment 0 is already full (max size is 0)
>  # After creating the new segment 0, the replica fetcher thread finds that 
> there is already a segment 0 exists. So it just throws exception and dies.
> The fix would be let the broker make sure the index files of active segments 
> are always resized properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6492) LogSemgent.truncateTo() should always resize the index file

2018-01-31 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347394#comment-16347394
 ] 

Jason Gustafson commented on KAFKA-6492:


[~becket_qin] Does this only affect 1.0.0? As far as I can tell, it can happen 
on older versions as well. I have seen recently an instance of the failure 
during log rolling on 0.10.1. It was a compacted topic, but I cannot confirm 
whether it is the specific truncation scenario you describe above (I don't have 
all the historic logs).

> LogSemgent.truncateTo() should always resize the index file
> ---
>
> Key: KAFKA-6492
> URL: https://issues.apache.org/jira/browse/KAFKA-6492
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.1.0
>
>
> The bug is the following:
>  # Initially on a follower broker there are two segments 0 and segment 1. 
> Segment 0 is empty (maybe due to log compaction)
>  # log is truncated to 0.
>  # LogSemgent.Truncate() will not find a message to truncate in segment 0, so 
> it will skip resizing the index/timeindex files. 
>  # When a new message is fetched, Log.maybeRoll() will try to roll a new 
> segment because the index file of segment 0 is already full (max size is 0)
>  # After creating the new segment 0, the replica fetcher thread finds that 
> there is already a segment 0 exists. So it just throws exception and dies.
> The fix would be let the broker make sure the index files of active segments 
> are always resized properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)