[ 
https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635692#comment-16635692
 ] 

ASF GitHub Bot commented on KAFKA-7467:
---------------------------------------

bob-barrett opened a new pull request #5727: KAFKA-7467: NoSuchElementException 
is raised because controlBatch is …
URL: https://github.com/apache/kafka/pull/5727
 
 
   …empty
   
   This patch changes the behavior of MemoryRecords.filterTo() to always mark 
empty batches as non-control batches, rather than taking the attribute from the 
original batch. Empty control batches will cause fatal errors in segment 
recovery or the log cleaner due to trying access a record in an empty batch. In 
order to prevent errors from logs that are already in this state, this patch 
also adds a check to DefaultRecordBatch.isControlBatch() so that it will only 
return true if the batch is non-empty. Finally, it adds isControl to the output 
of DumpLogSegments. MemoryRecords and DefaultRecordsBatch changes were tested 
with unit tests, DumpLogSegments change was tested manually.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NoSuchElementException is raised because controlBatch is empty
> --------------------------------------------------------------
>
>                 Key: KAFKA-7467
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7467
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.0
>            Reporter: Badai Aqrandista
>            Assignee: Bob Barrett
>            Priority: Major
>
> Somehow, log cleaner died because of NoSuchElementException when it calls 
> onControlBatchRead:
> {noformat}
> [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 
> 0, discarding deletes. (kafka.log.LogCleaner)
> [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.util.NoSuchElementException
>   at java.util.Collections$EmptyIterator.next(Collections.java:4189)
>   at 
> kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
>   at 
> kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
>   at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
>   at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
>   at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>   at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>   at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>   at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> {noformat}
> The following code does not seem to expect the controlBatch to be empty:
> [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
> {noformat}
>   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
>     consumeAbortedTxnsUpTo(controlBatch.lastOffset)
>     val controlRecord = controlBatch.iterator.next()
>     val controlType = ControlRecordType.parse(controlRecord.key)
>     val producerId = controlBatch.producerId
> {noformat}
> MemoryRecords.filterTo copies the original control attribute for empty 
> batches, which results in empty control batches. Trying to read the control 
> type of an empty batch causes the error.
> {noformat}
>   else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
>     if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
>         throw new IllegalStateException("Empty batches are only supported for 
> magic v2 and above");
>     
> bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
>     DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), 
> batchMagic, batch.producerId(),
>             batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), 
> batch.lastOffset(),
>             batch.partitionLeaderEpoch(), batch.timestampType(), 
> batch.maxTimestamp(),
>             batch.isTransactional(), batch.isControlBatch());
>     filterResult.updateRetainedBatchMetadata(batch, 0, true);
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to