ijuma commented on a change in pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#discussion_r429653946



##########
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##########
@@ -120,14 +119,6 @@ class DelayedFetch(delayMs: Long,
                   accumulatedSize += bytesAvailable
               }
             }
-
-            if (fetchMetadata.isFromFollower) {
-              // Case H check if the follower has the latest HW from the leader
-              if (partition.getReplica(fetchMetadata.replicaId)
-                .exists(r => offsetSnapshot.highWatermark.messageOffset > 
r.lastSentHighWatermark)) {
-                return forceComplete()
-              }
-            }

Review comment:
       Interesting. This could cause an early return for the leader case too, 
right?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1100,10 +1100,8 @@ class ReplicaManager(val config: KafkaConfig,
             leaderLogEndOffset = readInfo.logEndOffset,
             followerLogStartOffset = followerLogStartOffset,
             fetchTimeMs = fetchTimeMs,
-            readSize = adjustedMaxBytes,

Review comment:
       Do we still need to compute `adjustedMaxBytes`? Also, do you know why we 
don't need `readSize` anymore? What change made it unnecessary?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: 
[$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: 
[$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], 
error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       Could we use the `toString` from the case class?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1100,10 +1100,8 @@ class ReplicaManager(val config: KafkaConfig,
             leaderLogEndOffset = readInfo.logEndOffset,
             followerLogStartOffset = followerLogStartOffset,
             fetchTimeMs = fetchTimeMs,
-            readSize = adjustedMaxBytes,

Review comment:
       Thanks, makes sense.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: 
[$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: 
[$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], 
error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. 
We could write a utility method like so (didn't try too compile it):
   
   ```scala
   public def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until productArity) {
       
builder.append(productElementName(i)).append("=").append(productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want 
this format. Avoids some duplication and forgetting to update `toString` when 
new methods are added.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: 
[$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: 
[$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], 
error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. 
We could write a utility method like so (didn't try too compile it):
   
   ```scala
   def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until productArity) {
       
builder.append(productElementName(i)).append("=").append(productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want 
this format. Avoids some duplication and forgetting to update `toString` when 
new methods are added.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: 
[$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: 
[$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], 
error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. 
We could write a utility method like so (didn't try too compile it):
   
   ```scala
   def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until product.productArity) {
       builder.append(product.productElementName(i))
         .append("=")
         .append(product.productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want 
this format. Avoids some duplication and forgetting to update `toString` when 
new methods are added.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: 
[$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: 
[$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], 
error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. 
We could write a utility method like so (didn't try too compile it and skipped 
some details like commas):
   
   ```scala
   def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until product.productArity) {
       builder.append(product.productElementName(i))
         .append("=")
         .append(product.productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want 
this format. Avoids some duplication and forgetting to update `toString` when 
new methods are added.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: 
[$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: 
[$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], 
error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. 
We could write a utility method like so (didn't try too compile it and skipped 
some details like commas):
   
   ```scala
   def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until product.productArity) {
       builder.append(product.productElementName(i))
         .append("=")
         .append(product.productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want 
this format. Avoids some duplication and forgetting to update `toString` when 
new fields are added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to