chia7712 commented on code in PR #19630:
URL: https://github.com/apache/kafka/pull/19630#discussion_r2080096579
##########
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##########
@@ -134,8 +135,9 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0))
}
- override def fetchEpochEndOffsets(partitions: collection.Map[TopicPartition,
EpochData]): Map[TopicPartition, EpochEndOffset] = {
- partitions.map { case (tp, epochData) =>
+ override def fetchEpochEndOffsets(partitions: util.Map[TopicPartition,
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]):
util.Map[TopicPartition, EpochEndOffset] = {
+ val tmpPartitions = partitions.asScala.toMap
+ tmpPartitions.map { case (tp, epochData) =>
Review Comment:
```
partitions.asScala.map { case (tp, epochData) =>
```
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -116,20 +120,23 @@ abstract class AbstractFetcherThread(name: String,
private def maybeFetch(): Unit = {
val fetchRequestOpt = inLock(partitionMapLock) {
- val ResultWithPartitions(fetchRequestOpt, partitionsWithError) =
leader.buildFetch(partitionStates.partitionStateMap.asScala)
+ val result = leader.buildFetch(partitionStates.partitionStateMap)
+ val fetchRequestOpt = result.result
+ val partitionsWithError = result.partitionsWithError
- handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
+ handlePartitionsWithErrors(partitionsWithError.asScala, "maybeFetch")
- if (fetchRequestOpt.isEmpty) {
+ if (!fetchRequestOpt.isPresent) {
Review Comment:
`fetchRequestOpt.isEmpty`
##########
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##########
@@ -156,48 +158,50 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
.setPartition(tp.partition)
.setErrorCode(Errors.forException(t).code)
}
- }
+ }.asJava
}
- override def buildFetch(partitions: Map[TopicPartition,
PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
+ override def buildFetch(partitions: util.Map[TopicPartition,
PartitionFetchState]): ResultWithPartitions[util.Optional[ReplicaFetch]] = {
// Only include replica in the fetch request if it is not throttled.
if (quota.isQuotaExceeded) {
- ResultWithPartitions(None, Set.empty)
+ new ResultWithPartitions(util.Optional.empty[ReplicaFetch](),
util.Collections.emptySet[TopicPartition]())
} else {
- selectPartitionToFetch(partitions) match {
- case Some((tp, fetchState)) =>
- buildFetchForPartition(tp, fetchState)
- case None =>
- ResultWithPartitions(None, Set.empty)
+ val selectPartition = selectPartitionToFetch(partitions)
+ if (selectPartition.isPresent) {
+ val (tp, fetchState) = selectPartition.get()
+ buildFetchForPartition(tp, fetchState)
+ } else {
+ new ResultWithPartitions(util.Optional.empty[ReplicaFetch](),
util.Collections.emptySet[TopicPartition]())
}
}
}
- private def selectPartitionToFetch(partitions: Map[TopicPartition,
PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = {
+ private def selectPartitionToFetch(partitions: util.Map[TopicPartition,
PartitionFetchState]): Optional[(TopicPartition, PartitionFetchState)] = {
// Only move one partition at a time to increase its catch-up rate and
thus reduce the time spent on
// moving any given replica. Replicas are selected in ascending order
(lexicographically by topic) from the
// partitions that are ready to fetch. Once selected, we will continue
fetching the same partition until it
// becomes unavailable or is removed.
inProgressPartition.foreach { tp =>
- val fetchStateOpt = partitions.get(tp)
+ val fetchStateOpt = Option(partitions.get(tp))
fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState =>
- return Some((tp, fetchState))
+ return Optional.of((tp, fetchState))
}
}
inProgressPartition = None
- val nextPartitionOpt = nextReadyPartition(partitions)
+ val nextPartitionOpt = nextReadyPartition(partitions.asScala.toMap)
nextPartitionOpt.foreach { case (tp, fetchState) =>
- inProgressPartition = Some(tp)
+ inProgressPartition = Option(tp)
info(s"Beginning/resuming copy of partition $tp from offset
${fetchState.fetchOffset}. " +
s"Including this partition, there are ${partitions.size} remaining
partitions to copy by this thread.")
+ return Optional.of((tp, fetchState))
}
- nextPartitionOpt
+ Optional.empty()
Review Comment:
?
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -136,12 +136,13 @@ private OffsetForLeaderEpochResponseData.EpochEndOffset
fetchEarlierEpochEndOffs
// Find the end-offset for the epoch earlier to the given epoch from
the leader
Map<TopicPartition,
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> partitionsWithEpochs
= new HashMap<>();
partitionsWithEpochs.put(partition, new
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
- Option<OffsetForLeaderEpochResponseData.EpochEndOffset>
maybeEpochEndOffset =
leader.fetchEpochEndOffsets(CollectionConverters.asScala(partitionsWithEpochs)).get(partition);
- if (maybeEpochEndOffset.isEmpty()) {
+ Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset>
endOffsets = leader.fetchEpochEndOffsets(partitionsWithEpochs);
Review Comment:
```java
var epochEndOffset =
leader.fetchEpochEndOffsets(partitionsWithEpochs).get(partition);
```
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -204,11 +214,27 @@ abstract class AbstractFetcherThread(name: String,
* occur during truncation.
*/
private def truncateToEpochEndOffsets(latestEpochsForPartitions:
Map[TopicPartition, EpochData]): Unit = {
- val endOffsets = leader.fetchEpochEndOffsets(latestEpochsForPartitions)
- //Ensure we hold a lock during truncation.
+
+ val partitionsMap = new java.util.HashMap[TopicPartition,
OffsetForLeaderPartition]()
+
+ // Fill it with converted values
+ latestEpochsForPartitions.foreach { case (tp, epochData) =>
Review Comment:
why not using `latestEpochsForPartitions.asJava`?
##########
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##########
@@ -156,48 +158,50 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
.setPartition(tp.partition)
.setErrorCode(Errors.forException(t).code)
}
- }
+ }.asJava
}
- override def buildFetch(partitions: Map[TopicPartition,
PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
+ override def buildFetch(partitions: util.Map[TopicPartition,
PartitionFetchState]): ResultWithPartitions[util.Optional[ReplicaFetch]] = {
// Only include replica in the fetch request if it is not throttled.
if (quota.isQuotaExceeded) {
- ResultWithPartitions(None, Set.empty)
+ new ResultWithPartitions(util.Optional.empty[ReplicaFetch](),
util.Collections.emptySet[TopicPartition]())
} else {
- selectPartitionToFetch(partitions) match {
- case Some((tp, fetchState)) =>
- buildFetchForPartition(tp, fetchState)
- case None =>
- ResultWithPartitions(None, Set.empty)
+ val selectPartition = selectPartitionToFetch(partitions)
+ if (selectPartition.isPresent) {
+ val (tp, fetchState) = selectPartition.get()
+ buildFetchForPartition(tp, fetchState)
+ } else {
+ new ResultWithPartitions(util.Optional.empty[ReplicaFetch](),
util.Collections.emptySet[TopicPartition]())
}
}
}
- private def selectPartitionToFetch(partitions: Map[TopicPartition,
PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = {
+ private def selectPartitionToFetch(partitions: util.Map[TopicPartition,
PartitionFetchState]): Optional[(TopicPartition, PartitionFetchState)] = {
// Only move one partition at a time to increase its catch-up rate and
thus reduce the time spent on
// moving any given replica. Replicas are selected in ascending order
(lexicographically by topic) from the
// partitions that are ready to fetch. Once selected, we will continue
fetching the same partition until it
// becomes unavailable or is removed.
inProgressPartition.foreach { tp =>
- val fetchStateOpt = partitions.get(tp)
+ val fetchStateOpt = Option(partitions.get(tp))
fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState =>
- return Some((tp, fetchState))
+ return Optional.of((tp, fetchState))
}
}
inProgressPartition = None
- val nextPartitionOpt = nextReadyPartition(partitions)
+ val nextPartitionOpt = nextReadyPartition(partitions.asScala.toMap)
nextPartitionOpt.foreach { case (tp, fetchState) =>
- inProgressPartition = Some(tp)
+ inProgressPartition = Option(tp)
Review Comment:
why?
##########
core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala:
##########
@@ -126,14 +128,15 @@ class RemoteLeaderEndPoint(logPrefix: String,
}
}
- override def fetchEpochEndOffsets(partitions: Map[TopicPartition,
EpochData]): Map[TopicPartition, EpochEndOffset] = {
+ override def fetchEpochEndOffsets(partitions: java.util.Map[TopicPartition,
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]):
java.util.Map[TopicPartition, EpochEndOffset] = {
+ val tmpPartitions = partitions.asScala.toMap
if (partitions.isEmpty) {
debug("Skipping leaderEpoch request since all partitions do not have an
epoch")
- return Map.empty
+ return java.util.Collections.emptyMap()
Review Comment:
`java.util.Map.of()`
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -116,20 +120,23 @@ abstract class AbstractFetcherThread(name: String,
private def maybeFetch(): Unit = {
val fetchRequestOpt = inLock(partitionMapLock) {
- val ResultWithPartitions(fetchRequestOpt, partitionsWithError) =
leader.buildFetch(partitionStates.partitionStateMap.asScala)
+ val result = leader.buildFetch(partitionStates.partitionStateMap)
+ val fetchRequestOpt = result.result
+ val partitionsWithError = result.partitionsWithError
- handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
+ handlePartitionsWithErrors(partitionsWithError.asScala, "maybeFetch")
- if (fetchRequestOpt.isEmpty) {
+ if (!fetchRequestOpt.isPresent) {
trace(s"There are no active partitions. Back off for $fetchBackOffMs
ms before sending a fetch request")
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}
fetchRequestOpt
}
- fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions,
fetchRequest) =>
- processFetchRequest(sessionPartitions, fetchRequest)
+ if (fetchRequestOpt.isPresent) {
Review Comment:
```java
fetchRequestOpt.ifPresent(replicaFetch =>
processFetchRequest(replicaFetch.partitionData, replicaFetch.fetchRequest))
```
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -218,16 +244,25 @@ abstract class AbstractFetcherThread(name: String,
curPartitionState != null && leaderEpochInRequest ==
curPartitionState.currentLeaderEpoch
}
- val ResultWithPartitions(fetchOffsets, partitionsWithError) =
maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions)
- handlePartitionsWithErrors(partitionsWithError,
"truncateToEpochEndOffsets")
+ val result = maybeTruncateToEpochEndOffsets(epochEndOffsets.toMap,
latestEpochsForPartitions)
Review Comment:
```java
val result = maybeTruncateToEpochEndOffsets(epochEndOffsets.toMap,
latestEpochsForPartitions)
handlePartitionsWithErrors(result.partitionsWithError.asScala,
"truncateToEpochEndOffsets")
updateFetchOffsetAndMaybeMarkTruncationComplete(result.result)
```
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -151,14 +158,17 @@ abstract class AbstractFetcherThread(name: String,
partitionStates.partitionStateMap.forEach { (tp, state) =>
if (state.isTruncating) {
- latestEpoch(tp).toScala match {
- case Some(epoch) =>
- partitionsWithEpochs += tp -> new EpochData()
- .setPartition(tp.partition)
- .setCurrentLeaderEpoch(state.currentLeaderEpoch)
- .setLeaderEpoch(epoch)
- case _ =>
- partitionsWithoutEpochs += tp
+ val latestEpochOpt = latestEpoch(tp)
Review Comment:
Are those changes necessary?
##########
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##########
@@ -156,48 +158,50 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
.setPartition(tp.partition)
.setErrorCode(Errors.forException(t).code)
}
- }
+ }.asJava
}
- override def buildFetch(partitions: Map[TopicPartition,
PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
+ override def buildFetch(partitions: util.Map[TopicPartition,
PartitionFetchState]): ResultWithPartitions[util.Optional[ReplicaFetch]] = {
// Only include replica in the fetch request if it is not throttled.
if (quota.isQuotaExceeded) {
- ResultWithPartitions(None, Set.empty)
+ new ResultWithPartitions(util.Optional.empty[ReplicaFetch](),
util.Collections.emptySet[TopicPartition]())
} else {
- selectPartitionToFetch(partitions) match {
- case Some((tp, fetchState)) =>
- buildFetchForPartition(tp, fetchState)
- case None =>
- ResultWithPartitions(None, Set.empty)
+ val selectPartition = selectPartitionToFetch(partitions)
+ if (selectPartition.isPresent) {
+ val (tp, fetchState) = selectPartition.get()
+ buildFetchForPartition(tp, fetchState)
+ } else {
+ new ResultWithPartitions(util.Optional.empty[ReplicaFetch](),
util.Collections.emptySet[TopicPartition]())
}
}
}
- private def selectPartitionToFetch(partitions: Map[TopicPartition,
PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = {
+ private def selectPartitionToFetch(partitions: util.Map[TopicPartition,
PartitionFetchState]): Optional[(TopicPartition, PartitionFetchState)] = {
// Only move one partition at a time to increase its catch-up rate and
thus reduce the time spent on
// moving any given replica. Replicas are selected in ascending order
(lexicographically by topic) from the
// partitions that are ready to fetch. Once selected, we will continue
fetching the same partition until it
// becomes unavailable or is removed.
inProgressPartition.foreach { tp =>
- val fetchStateOpt = partitions.get(tp)
+ val fetchStateOpt = Option(partitions.get(tp))
fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState =>
- return Some((tp, fetchState))
+ return Optional.of((tp, fetchState))
}
}
inProgressPartition = None
- val nextPartitionOpt = nextReadyPartition(partitions)
+ val nextPartitionOpt = nextReadyPartition(partitions.asScala.toMap)
nextPartitionOpt.foreach { case (tp, fetchState) =>
- inProgressPartition = Some(tp)
+ inProgressPartition = Option(tp)
info(s"Beginning/resuming copy of partition $tp from offset
${fetchState.fetchOffset}. " +
s"Including this partition, there are ${partitions.size} remaining
partitions to copy by this thread.")
+ return Optional.of((tp, fetchState))
Review Comment:
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]