Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
jolshan merged PR #14753: URL: https://github.com/apache/kafka/pull/14753 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
jolshan commented on PR #14753: URL: https://github.com/apache/kafka/pull/14753#issuecomment-1811575702 Given the scope of the change, these test failures are not related. Will followup elsewhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
jolshan commented on code in PR #14753: URL: https://github.com/apache/kafka/pull/14753#discussion_r1393093702 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -898,20 +900,19 @@ class ReplicaManager(val config: KafkaConfig, } actionQueue.add { - () => -allResults.foreach { case (topicPartition, result) => - val requestKey = TopicPartitionOperationKey(topicPartition) - result.info.leaderHwChange match { -case LeaderHwChange.INCREASED => - // some delayed operations may be unblocked after HW changed - delayedProducePurgatory.checkAndComplete(requestKey) - delayedFetchPurgatory.checkAndComplete(requestKey) - delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) -case LeaderHwChange.SAME => - // probably unblock some follower fetch requests since log end offset has been updated - delayedFetchPurgatory.checkAndComplete(requestKey) -case LeaderHwChange.NONE => -// nothing + () => allResults.foreach { case (topicPartition, result) => +val requestKey = TopicPartitionOperationKey(topicPartition) +result.info.leaderHwChange match { + case LeaderHwChange.INCREASED => +// some delayed operations may be unblocked after HW changed +delayedProducePurgatory.checkAndComplete(requestKey) +delayedFetchPurgatory.checkAndComplete(requestKey) +delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.SAME => +// probably unblock some follower fetch requests since log end offset has been updated +delayedFetchPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.NONE => + // nothing Review Comment: I think it was the same here, but I can move it. https://github.com/apache/kafka/commit/7d147cf2413e5d361422728e5c9306574658c78d -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
hachikuji commented on code in PR #14753: URL: https://github.com/apache/kafka/pull/14753#discussion_r1392979091 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -898,20 +900,19 @@ class ReplicaManager(val config: KafkaConfig, } actionQueue.add { - () => -allResults.foreach { case (topicPartition, result) => - val requestKey = TopicPartitionOperationKey(topicPartition) - result.info.leaderHwChange match { -case LeaderHwChange.INCREASED => - // some delayed operations may be unblocked after HW changed - delayedProducePurgatory.checkAndComplete(requestKey) - delayedFetchPurgatory.checkAndComplete(requestKey) - delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) -case LeaderHwChange.SAME => - // probably unblock some follower fetch requests since log end offset has been updated - delayedFetchPurgatory.checkAndComplete(requestKey) -case LeaderHwChange.NONE => -// nothing + () => allResults.foreach { case (topicPartition, result) => +val requestKey = TopicPartitionOperationKey(topicPartition) +result.info.leaderHwChange match { + case LeaderHwChange.INCREASED => +// some delayed operations may be unblocked after HW changed +delayedProducePurgatory.checkAndComplete(requestKey) +delayedFetchPurgatory.checkAndComplete(requestKey) +delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.SAME => +// probably unblock some follower fetch requests since log end offset has been updated +delayedFetchPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.NONE => + // nothing Review Comment: nit: looks misaligned -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
jolshan commented on code in PR #14753: URL: https://github.com/apache/kafka/pull/14753#discussion_r1392978203 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig, recordConversionStatsCallback, timeout, responseCallback, - delayedProduceLock + delayedProduceLock, + actionQueue Review Comment: added the new name to the class value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
jolshan commented on code in PR #14753: URL: https://github.com/apache/kafka/pull/14753#discussion_r1392953096 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig, recordConversionStatsCallback, timeout, responseCallback, - delayedProduceLock + delayedProduceLock, + actionQueue Review Comment: yeah. I think it could be useful to rename action queue so that the compiler catches such a change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
artemlivshits commented on code in PR #14753: URL: https://github.com/apache/kafka/pull/14753#discussion_r1392946436 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig, recordConversionStatsCallback, timeout, responseCallback, - delayedProduceLock + delayedProduceLock, + actionQueue Review Comment: Wow, this is so subtle, I stared at the change for some time to understand what it actually does; it's almost impossible to spot in the code review and the compiler cannot help either, what can we do to catch these issues in the future? I think we should rename the member variable to be something like `defaultActionQueue` so that if we don't pass the `actionQueue` the compiler would catch it. Another question (probably to @dajac) -- is passing the `actionQueue` in the argument just an optimization or a correctness issue? If it's just an optimization, I wonder what would be the effects of removing it and reduce the overall system complexity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org