[GitHub] [kafka] mumrah commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-10 Thread GitBox


mumrah commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r629541231



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -161,19 +162,24 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
 val latestEpoch = log.latestEpoch.getOrElse(0)
-latestSnapshotId().asScala match {
-  case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
-(snapshotId.epoch == latestEpoch && snapshotId.offset > 
endOffset().offset)) =>
+val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match {

Review comment:
   Should we grab the `snapshots` lock for this whole match expression like 
we do in deleteBeforeSnapshot? Is there possible a race between this block and 
deleteBeforeSnapshot?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-10 Thread GitBox


mumrah commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r629541231



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -161,19 +162,24 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
 val latestEpoch = log.latestEpoch.getOrElse(0)
-latestSnapshotId().asScala match {
-  case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
-(snapshotId.epoch == latestEpoch && snapshotId.offset > 
endOffset().offset)) =>
+val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match {

Review comment:
   Should we grab the `snapshots` lock for this while match expression like 
we do in deleteBeforeSnapshot? Is there possible a race between this block and 
deleteBeforeSnapshot?

##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
-try {
-  if (snapshotIds.contains(snapshotId)) {
-Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-  } else {
-Optional.empty()
+snapshots synchronized {
+  val reader = snapshots.get(snapshotId) match {
+case None =>
+  // Snapshot doesn't exists
+  None
+case Some(None) =>
+  // Snapshot exists but has never been read before
+  try {
+val snapshotReader = 
Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+snapshots.put(snapshotId, snapshotReader)
+snapshotReader
+  } catch {
+case _: NoSuchFileException =>
+  // Snapshot doesn't exists in the data dir; remove
+  val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+  warn(s"Couldn't read $snapshotId; expected to find snapshot file 
$path")
+  snapshots.remove(snapshotId)
+  None
+  }
+case Some(value) =>
+  // Snapshot exists and it is already open; do nothing
+  value
   }
-} catch {
-  case _: NoSuchFileException =>
-Optional.empty()
+
+  reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
 }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-val descending = snapshotIds.descendingIterator
-if (descending.hasNext) {
-  Optional.of(descending.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-val ascendingIterator = snapshotIds.iterator
-if (ascendingIterator.hasNext) {
-  Optional.of(ascendingIterator.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-snapshotIds.add(snapshotId)
+snapshots synchronized {
+  snapshots.put(snapshotId, None)
+}
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-latestSnapshotId().asScala match {
-  case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-startOffset < logStartSnapshotId.offset &&
-logStartSnapshotId.offset <= snapshotId.offset &&
-log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
-log.deleteOldSegments()
+val (deleted, forgottenSnapshots) = snapshots synchronized {
+  latestSnapshotId().asScala match {
+case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+  startOffset < logStartSnapshotId.offset &&
+  logStartSnapshotId.offset <= snapshotId.offset &&
+  log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+
+  // Delete all segments that have a "last offset" less than the log 
start offset
+  log.deleteOldSegments()
 
-// Delete snapshot after increasing LogStartOffset
-removeSnapshotFilesBefore(logStartSnapshotId)
+  // Forget snapshots less than the log start offset
+  (true, forgetSnapshotsBefore(logStartSnapshotId))
+case _ =>
+  (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  }
+}
 
-true
+removeSnapshots(forgottenSnapshots)
+deleted
+  }
 
-  case _ => false
-}
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the 
associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until
+  private def 

[GitHub] [kafka] mumrah commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-10 Thread GitBox


mumrah commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r629531449



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##
@@ -54,8 +54,12 @@ public Records records() {
 }
 
 @Override
-public void close() throws IOException {
-fileRecords.close();
+public void close() {

Review comment:
   Ok, sounds good. If we're strictly dealing with IOExceptions, maybe we 
can use UncheckedIOException?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-10 Thread GitBox


mumrah commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r629529239



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -242,85 +246,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
-try {
-  if (snapshotIds.contains(snapshotId)) {
-Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-  } else {
-Optional.empty()
+snapshots synchronized {
+  val reader = snapshots.get(snapshotId) match {
+case None =>
+  // Snapshot doesn't exists
+  None
+case Some(None) =>
+  // Snapshot exists but has never been read before
+  try {
+val snapshotReader = 
Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+snapshots.put(snapshotId, snapshotReader)
+snapshotReader
+  } catch {
+case _: NoSuchFileException =>
+  // Snapshot doesn't exists in the data dir; remove
+  val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+  warn(s"Couldn't read $snapshotId; expected to find snapshot file 
$path")
+  snapshots.remove(snapshotId)
+  None
+  }
+case Some(value) =>
+  // Snapshot exists and it is already open; do nothing
+  value
   }
-} catch {
-  case _: NoSuchFileException =>
-Optional.empty()
+
+  reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
 }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-val descending = snapshotIds.descendingIterator
-if (descending.hasNext) {
-  Optional.of(descending.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-val ascendingIterator = snapshotIds.iterator
-if (ascendingIterator.hasNext) {
-  Optional.of(ascendingIterator.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-snapshotIds.add(snapshotId)
+snapshots synchronized {
+  snapshots.put(snapshotId, None)
+}
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-latestSnapshotId().asScala match {
-  case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-startOffset < logStartSnapshotId.offset &&
-logStartSnapshotId.offset <= snapshotId.offset &&
-log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
-log.deleteOldSegments()
+val (deleted, forgottenSnapshots) = snapshots synchronized {
+  latestSnapshotId().asScala match {
+case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+  startOffset < logStartSnapshotId.offset &&
+  logStartSnapshotId.offset <= snapshotId.offset &&
+  log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+
+  // Delete all segments that have a "last offset" less than the log 
start offset
+  log.deleteOldSegments()
 
-// Delete snapshot after increasing LogStartOffset
-removeSnapshotFilesBefore(logStartSnapshotId)
+  // Forget snapshots less than the log start offset
+  (true, forgetSnapshotsBefore(logStartSnapshotId))
+case _ =>
+  (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  }
+}
 
-true
+removeSnapshots(forgottenSnapshots)
+deleted
+  }
 
-  case _ => false
-}
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the 
associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until

Review comment:
    thanks for the explanation 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-07 Thread GitBox


mumrah commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628415285



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -161,19 +162,22 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
 val latestEpoch = log.latestEpoch.getOrElse(0)
-latestSnapshotId().asScala match {
+val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match {
   case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
 (snapshotId.epoch == latestEpoch && snapshotId.offset > 
endOffset().offset)) =>

Review comment:
   unrelated from your change but this indentation threw me off here.

##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -242,85 +246,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
-try {
-  if (snapshotIds.contains(snapshotId)) {
-Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-  } else {
-Optional.empty()
+snapshots synchronized {
+  val reader = snapshots.get(snapshotId) match {
+case None =>
+  // Snapshot doesn't exists
+  None
+case Some(None) =>
+  // Snapshot exists but has never been read before
+  try {
+val snapshotReader = 
Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+snapshots.put(snapshotId, snapshotReader)
+snapshotReader
+  } catch {
+case _: NoSuchFileException =>
+  // Snapshot doesn't exists in the data dir; remove
+  val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+  warn(s"Couldn't read $snapshotId; expected to find snapshot file 
$path")
+  snapshots.remove(snapshotId)
+  None
+  }
+case Some(value) =>
+  // Snapshot exists and it is already open; do nothing
+  value
   }
-} catch {
-  case _: NoSuchFileException =>
-Optional.empty()
+
+  reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
 }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-val descending = snapshotIds.descendingIterator
-if (descending.hasNext) {
-  Optional.of(descending.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-val ascendingIterator = snapshotIds.iterator
-if (ascendingIterator.hasNext) {
-  Optional.of(ascendingIterator.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-snapshotIds.add(snapshotId)
+snapshots synchronized {
+  snapshots.put(snapshotId, None)
+}
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-latestSnapshotId().asScala match {
-  case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-startOffset < logStartSnapshotId.offset &&
-logStartSnapshotId.offset <= snapshotId.offset &&
-log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
-log.deleteOldSegments()
+val (deleted, forgottenSnapshots) = snapshots synchronized {
+  latestSnapshotId().asScala match {
+case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+  startOffset < logStartSnapshotId.offset &&
+  logStartSnapshotId.offset <= snapshotId.offset &&
+  log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+
+  // Delete all segments that have a "last offset" less than the log 
start offset
+  log.deleteOldSegments()
 
-// Delete snapshot after increasing LogStartOffset
-removeSnapshotFilesBefore(logStartSnapshotId)
+  // Forget snapshots less than the log start offset
+  (true, forgetSnapshotsBefore(logStartSnapshotId))
+case _ =>
+  (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  }
+}
 
-true
+removeSnapshots(forgottenSnapshots)
+deleted
+  }
 
-  case _ => false
-}
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the 
associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until

Review comment:
   Can we avoid adding new code that uses a deprecated method?

##
File path: