Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
lucasbru commented on code in PR #15661: URL: https://github.com/apache/kafka/pull/15661#discussion_r1559790393 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -135,6 +135,28 @@ def last_commit(self, tp): else: return None +# This needs to be used for cooperative and consumer protocol +class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler): Review Comment: Seems to me that it would have been cleaner to have a single implementation that treats the value that is passed in `onAssigned` correctly (it is always incremental by contract, just happens to be non-incremental in the eager case), instead of having two implementations now. But I'll leave it to a follow-up PR to clean it up, if you agree, and merge this now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
lucasbru merged PR #15661: URL: https://github.com/apache/kafka/pull/15661 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
philipnee commented on code in PR #15661: URL: https://github.com/apache/kafka/pull/15661#discussion_r1559662337 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -135,6 +135,28 @@ def last_commit(self, tp): else: return None +# This needs to be used for cooperative and consumer protocol +class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler): Review Comment: I believe the current listener assumes Eager protocol so it is not making the incorrect assumptions. This (incremental handler) would probably work for Eager as well but I thought it would be clearer to distinguish the two. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
lucasbru commented on code in PR #15661: URL: https://github.com/apache/kafka/pull/15661#discussion_r1559094045 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -135,6 +135,28 @@ def last_commit(self, tp): else: return None +# This needs to be used for cooperative and consumer protocol +class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler): Review Comment: Why can we not implement this in the normal ConsumerEventHandler? It seems it's making incorrect assumptions about the consumer rebalance listener (since previously owned partitions are not guaranteed to be reported in onassinged) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
philipnee commented on PR #15661: URL: https://github.com/apache/kafka/pull/15661#issuecomment-2040290159 @cadonna @lucasbru - Is it possible for me to ask for a review on this issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
philipnee commented on PR #15661: URL: https://github.com/apache/kafka/pull/15661#issuecomment-2038266652 Can we change the message to : `err_msg="expecting valid assignments of %d partitions for node %d but got: %s" % \` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
philipnee commented on PR #15661: URL: https://github.com/apache/kafka/pull/15661#issuecomment-2038263452 Thanks for reviewing this. This is the test results: ``` SESSION REPORT (ALL TESTS) ducktape version: 0.11.4 session_id: 2024-04-04--035 run time: 9 minutes 46.055 seconds tests run:14 passed: 14 flaky:0 failed: 0 ignored: 0 test_id: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False status: PASS run time: 43.462 seconds test_id: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic status: PASS run time: 42.289 seconds test_id: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor.metadata_quorum=ZK.use_new_coordinator=False status: PASS run time: 45.432 seconds test_id: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RangeAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False status: PASS run time: 37.334 seconds test_id: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RangeAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic status: PASS run time: 36.570 seconds test_id: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RangeAssignor.metadata_quorum=ZK.use_new_coordinator=False status: PASS run time: 39.302 seconds test_id: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False status: PASS run time: 36.570 seconds test_id: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic status: PASS run time: 36.298 seconds test_id: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor.metadata_quorum=ZK.use_new_coordinator=False status: PASS run time: 39.076 seconds test_id: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.StickyAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False status: PASS run time: 35.942 seconds test_id: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.StickyAssignor.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic status: PASS run time: 36.115 seconds test_id: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.StickyAssignor.metadata_quorum=ZK.use_new_coordinator=False status: PASS run time: 39.770 seconds -
Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
philipnee commented on PR #15661: URL: https://github.com/apache/kafka/pull/15661#issuecomment-2038186018 @kirktrue - This inherits from the patch you attached to the jira ticket. Would you have time to review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
philipnee opened a new pull request, #15661: URL: https://github.com/apache/kafka/pull/15661 The current AssignmentValidationTest only tests EAGER assignment protocol and does not support incremental assignment like CooperativeStickyAssignor and consumer protocol. Therefore in the ConsumerEventHandler, I subclassed the existing handler overridden the assigned and revoke event handling methods, to permit incremental changes to the current assignments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org