ijuma commented on a change in pull request #8709: URL: https://github.com/apache/kafka/pull/8709#discussion_r429653946
########## File path: core/src/main/scala/kafka/server/DelayedFetch.scala ########## @@ -120,14 +119,6 @@ class DelayedFetch(delayMs: Long, accumulatedSize += bytesAvailable } } - - if (fetchMetadata.isFromFollower) { - // Case H check if the follower has the latest HW from the leader - if (partition.getReplica(fetchMetadata.replicaId) - .exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) { - return forceComplete() - } - } Review comment: Interesting. This could cause an early return for the leader case too, right? ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1100,10 +1100,8 @@ class ReplicaManager(val config: KafkaConfig, leaderLogEndOffset = readInfo.logEndOffset, followerLogStartOffset = followerLogStartOffset, fetchTimeMs = fetchTimeMs, - readSize = adjustedMaxBytes, Review comment: Do we still need to compute `adjustedMaxBytes`? Also, do you know why we don't need `readSize` anymore? What change made it unnecessary? ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo, def withEmptyFetchInfo: LogReadResult = copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)) - override def toString = - s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " + - s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]" + override def toString = { + "LogReadResult(" + + s"info=$info, " + + s"highWatermark=$highWatermark, " + + s"leaderLogStartOffset=$leaderLogStartOffset, " + + s"leaderLogEndOffset=$leaderLogEndOffset, " + + s"followerLogStartOffset=$followerLogStartOffset, " + + s"fetchTimeMs=$fetchTimeMs, " + + s"preferredReadReplica=$preferredReadReplica, " + + s"lastStableOffset=$lastStableOffset, " + + s"error=$error" + + ")" Review comment: Could we use the `toString` from the case class? ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1100,10 +1100,8 @@ class ReplicaManager(val config: KafkaConfig, leaderLogEndOffset = readInfo.logEndOffset, followerLogStartOffset = followerLogStartOffset, fetchTimeMs = fetchTimeMs, - readSize = adjustedMaxBytes, Review comment: Thanks, makes sense. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo, def withEmptyFetchInfo: LogReadResult = copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)) - override def toString = - s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " + - s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]" + override def toString = { + "LogReadResult(" + + s"info=$info, " + + s"highWatermark=$highWatermark, " + + s"leaderLogStartOffset=$leaderLogStartOffset, " + + s"leaderLogEndOffset=$leaderLogEndOffset, " + + s"followerLogStartOffset=$followerLogStartOffset, " + + s"fetchTimeMs=$fetchTimeMs, " + + s"preferredReadReplica=$preferredReadReplica, " + + s"lastStableOffset=$lastStableOffset, " + + s"error=$error" + + ")" Review comment: That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it): ```scala public def productToString(product: Product): String = { val builder = new StringBuilder sb.append(product.prefix) for (i <- 0 until productArity) { builder.append(productElementName(i)).append("=").append(productElement(i)) } sb.build() } ``` Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new methods are added. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo, def withEmptyFetchInfo: LogReadResult = copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)) - override def toString = - s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " + - s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]" + override def toString = { + "LogReadResult(" + + s"info=$info, " + + s"highWatermark=$highWatermark, " + + s"leaderLogStartOffset=$leaderLogStartOffset, " + + s"leaderLogEndOffset=$leaderLogEndOffset, " + + s"followerLogStartOffset=$followerLogStartOffset, " + + s"fetchTimeMs=$fetchTimeMs, " + + s"preferredReadReplica=$preferredReadReplica, " + + s"lastStableOffset=$lastStableOffset, " + + s"error=$error" + + ")" Review comment: That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it): ```scala def productToString(product: Product): String = { val builder = new StringBuilder sb.append(product.prefix) for (i <- 0 until productArity) { builder.append(productElementName(i)).append("=").append(productElement(i)) } sb.build() } ``` Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new methods are added. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo, def withEmptyFetchInfo: LogReadResult = copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)) - override def toString = - s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " + - s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]" + override def toString = { + "LogReadResult(" + + s"info=$info, " + + s"highWatermark=$highWatermark, " + + s"leaderLogStartOffset=$leaderLogStartOffset, " + + s"leaderLogEndOffset=$leaderLogEndOffset, " + + s"followerLogStartOffset=$followerLogStartOffset, " + + s"fetchTimeMs=$fetchTimeMs, " + + s"preferredReadReplica=$preferredReadReplica, " + + s"lastStableOffset=$lastStableOffset, " + + s"error=$error" + + ")" Review comment: That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it): ```scala def productToString(product: Product): String = { val builder = new StringBuilder sb.append(product.prefix) for (i <- 0 until product.productArity) { builder.append(product.productElementName(i)) .append("=") .append(product.productElement(i)) } sb.build() } ``` Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new methods are added. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo, def withEmptyFetchInfo: LogReadResult = copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)) - override def toString = - s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " + - s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]" + override def toString = { + "LogReadResult(" + + s"info=$info, " + + s"highWatermark=$highWatermark, " + + s"leaderLogStartOffset=$leaderLogStartOffset, " + + s"leaderLogEndOffset=$leaderLogEndOffset, " + + s"followerLogStartOffset=$followerLogStartOffset, " + + s"fetchTimeMs=$fetchTimeMs, " + + s"preferredReadReplica=$preferredReadReplica, " + + s"lastStableOffset=$lastStableOffset, " + + s"error=$error" + + ")" Review comment: That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it and skipped some details like commas): ```scala def productToString(product: Product): String = { val builder = new StringBuilder sb.append(product.prefix) for (i <- 0 until product.productArity) { builder.append(product.productElementName(i)) .append("=") .append(product.productElement(i)) } sb.build() } ``` Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new methods are added. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo, def withEmptyFetchInfo: LogReadResult = copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)) - override def toString = - s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " + - s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]" + override def toString = { + "LogReadResult(" + + s"info=$info, " + + s"highWatermark=$highWatermark, " + + s"leaderLogStartOffset=$leaderLogStartOffset, " + + s"leaderLogEndOffset=$leaderLogEndOffset, " + + s"followerLogStartOffset=$followerLogStartOffset, " + + s"fetchTimeMs=$fetchTimeMs, " + + s"preferredReadReplica=$preferredReadReplica, " + + s"lastStableOffset=$lastStableOffset, " + + s"error=$error" + + ")" Review comment: That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it and skipped some details like commas): ```scala def productToString(product: Product): String = { val builder = new StringBuilder sb.append(product.prefix) for (i <- 0 until product.productArity) { builder.append(product.productElementName(i)) .append("=") .append(product.productElement(i)) } sb.build() } ``` Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new fields are added. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org