jeffkbkim commented on PR #12783:
URL: https://github.com/apache/kafka/pull/12783#issuecomment-1315804689

   error from `testAlterReplicaLogDirs(String)`:
   ```
   [2022-11-15 10:24:37,184] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
fetcherId=0] Unexpected error occurred while processing data for partition 
topic-0 at offset 15 (kafka.server.ReplicaFetcherThread:76)
   java.lang.NullPointerException
        at kafka.log.OffsetIndex.$anonfun$lookup$1(OffsetIndex.scala:90)
   ...
   ```
   we hit NPE because the replica fetcher thread tries to find the high 
watermark from the to-be-deleted log segment's offset index, where the 
currently replicating segment's offset index mmap is null. the offset index is 
unmapped during `LogManager.replaceCurrentWithFutureLog(TopicPartition)`.
   
   in UnifiedLog, we check whether the memory map buffer is closed when 
retrieiving HWM:
   ```
     private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
       localLog.checkIfMemoryMappedBufferClosed()
       ...
     }
   ```
   which makes it seem okay to read from the offset index. however, we should 
not allow any disk io if an offset index is unmapped. moreover, if a local log 
is closed, we should set `isMemoryMappedBufferClosed` to `true` in 
`LocalLog.close()`:
   ```
     private[log] def close(): Unit = {
       maybeHandleIOException(s"Error while renaming dir for $topicPartition in 
dir ${dir.getParent}") {
         checkIfMemoryMappedBufferClosed()
         segments.close()
         // isMemoryMappedBufferClosed = true?
       }
     }
   ```
   
   @dajac i'm not sure if this is a bug or there's another reason we do not set 
the mmap flag when closing a log. is my understanding correct? if so, i can 
create a jira. besides altering replica log dir UnifiedLog.close() is only 
called during shutdown.
   
   as for the replica fetcher, maybe we can directly retrieve 
`highWatermarkMetadata` like
   ```
     def maybeUpdateHighWatermark(hw: Long): Option[Long] = {
       lock.synchronized {
         val oldHighWatermark = highWatermarkMetadata
         updateHighWatermark(LogOffsetMetadata(hw)) match {
           case oldHighWatermark.messageOffset =>
             None
           case newHighWatermark =>
             Some(newHighWatermark)
         }
       }
     }
   ``` 
   instead of calling `fetchHighWatermarkMetadata()` for the old HWM since the 
follower does not care about the HWM's base offset or relative position in 
segment. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to