Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


jolshan merged PR #14753:
URL: https://github.com/apache/kafka/pull/14753


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


jolshan commented on PR #14753:
URL: https://github.com/apache/kafka/pull/14753#issuecomment-1811575702

   Given the scope of the change, these test failures are not related. Will 
followup elsewhere.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


jolshan commented on code in PR #14753:
URL: https://github.com/apache/kafka/pull/14753#discussion_r1393093702


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -898,20 +900,19 @@ class ReplicaManager(val config: KafkaConfig,
 }
 
 actionQueue.add {
-  () =>
-allResults.foreach { case (topicPartition, result) =>
-  val requestKey = TopicPartitionOperationKey(topicPartition)
-  result.info.leaderHwChange match {
-case LeaderHwChange.INCREASED =>
-  // some delayed operations may be unblocked after HW changed
-  delayedProducePurgatory.checkAndComplete(requestKey)
-  delayedFetchPurgatory.checkAndComplete(requestKey)
-  delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
-case LeaderHwChange.SAME =>
-  // probably unblock some follower fetch requests since log end 
offset has been updated
-  delayedFetchPurgatory.checkAndComplete(requestKey)
-case LeaderHwChange.NONE =>
-// nothing
+  () => allResults.foreach { case (topicPartition, result) =>
+val requestKey = TopicPartitionOperationKey(topicPartition)
+result.info.leaderHwChange match {
+  case LeaderHwChange.INCREASED =>
+// some delayed operations may be unblocked after HW changed
+delayedProducePurgatory.checkAndComplete(requestKey)
+delayedFetchPurgatory.checkAndComplete(requestKey)
+delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
+  case LeaderHwChange.SAME =>
+// probably unblock some follower fetch requests since log end 
offset has been updated
+delayedFetchPurgatory.checkAndComplete(requestKey)
+  case LeaderHwChange.NONE =>
+  // nothing

Review Comment:
   I think it was the same here, but I can move it. 
https://github.com/apache/kafka/commit/7d147cf2413e5d361422728e5c9306574658c78d



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


hachikuji commented on code in PR #14753:
URL: https://github.com/apache/kafka/pull/14753#discussion_r1392979091


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -898,20 +900,19 @@ class ReplicaManager(val config: KafkaConfig,
 }
 
 actionQueue.add {
-  () =>
-allResults.foreach { case (topicPartition, result) =>
-  val requestKey = TopicPartitionOperationKey(topicPartition)
-  result.info.leaderHwChange match {
-case LeaderHwChange.INCREASED =>
-  // some delayed operations may be unblocked after HW changed
-  delayedProducePurgatory.checkAndComplete(requestKey)
-  delayedFetchPurgatory.checkAndComplete(requestKey)
-  delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
-case LeaderHwChange.SAME =>
-  // probably unblock some follower fetch requests since log end 
offset has been updated
-  delayedFetchPurgatory.checkAndComplete(requestKey)
-case LeaderHwChange.NONE =>
-// nothing
+  () => allResults.foreach { case (topicPartition, result) =>
+val requestKey = TopicPartitionOperationKey(topicPartition)
+result.info.leaderHwChange match {
+  case LeaderHwChange.INCREASED =>
+// some delayed operations may be unblocked after HW changed
+delayedProducePurgatory.checkAndComplete(requestKey)
+delayedFetchPurgatory.checkAndComplete(requestKey)
+delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
+  case LeaderHwChange.SAME =>
+// probably unblock some follower fetch requests since log end 
offset has been updated
+delayedFetchPurgatory.checkAndComplete(requestKey)
+  case LeaderHwChange.NONE =>
+  // nothing

Review Comment:
   nit: looks misaligned



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


jolshan commented on code in PR #14753:
URL: https://github.com/apache/kafka/pull/14753#discussion_r1392978203


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig,
   recordConversionStatsCallback,
   timeout,
   responseCallback,
-  delayedProduceLock
+  delayedProduceLock,
+  actionQueue

Review Comment:
   added the new name to the class value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


jolshan commented on code in PR #14753:
URL: https://github.com/apache/kafka/pull/14753#discussion_r1392953096


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig,
   recordConversionStatsCallback,
   timeout,
   responseCallback,
-  delayedProduceLock
+  delayedProduceLock,
+  actionQueue

Review Comment:
   yeah. I think it could be useful to rename action queue so that the compiler 
catches such a change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-14 Thread via GitHub


artemlivshits commented on code in PR #14753:
URL: https://github.com/apache/kafka/pull/14753#discussion_r1392946436


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig,
   recordConversionStatsCallback,
   timeout,
   responseCallback,
-  delayedProduceLock
+  delayedProduceLock,
+  actionQueue

Review Comment:
   Wow, this is so subtle, I stared at the change for some time to understand 
what it actually does; it's almost impossible to spot in the code review and 
the compiler cannot help either, what can we do to catch these issues in the 
future?  I think we should rename the member variable to be something like 
`defaultActionQueue` so that if we don't pass the `actionQueue` the compiler 
would catch it.
   
   Another question (probably to @dajac) -- is passing the `actionQueue` in the 
argument just an optimization or a correctness issue?  If it's just an 
optimization, I wonder what would be the effects of removing it and reduce the 
overall system complexity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org