[
https://issues.apache.org/jira/browse/KAFKA-17962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17904549#comment-17904549
]
Chia-Ping Tsai commented on KAFKA-17962:
----------------------------------------
{quote}
Are you saying that all connect system tests do NOT use the new consumer
because the configuration is not wired correctly?
{quote}
There are two issues when running Connect end-to-end (e2e) tests with the new
consumer:
issue_1: Incorrect Configuration of session.timeout.ms:
Connect e2e tests should not set the session.timeout.ms configuration when
using the new consumer, as the new consumer considers session.timeout.ms to be
an invalid configuration. KAFKA-18156 is adjusting the configurations for all
Connect e2e tests to ensure that session.timeout.ms is not added when using the
new consumer.
issue_2: Inconsistent Behaviors Between New and Classic Consumers:
1): The new consumer does not handle WakeupException and InterruptedException
for the rebalance listener, unlike the classic consumer.
2): The new consumer does not mark partitions as fetchable after the listener
encounters an error, whereas the classic consumer does.
KAFKA-18160 aims to fix these inconsistencies. Additionally, I have included a
test in the PR
(https://github.com/apache/kafka/pull/18089#issuecomment-2525265697) to
demonstrate the inconsistent behavior.
> Upgrade Connect distributed system tests to handle CONSUMER group protocol
> --------------------------------------------------------------------------
>
> Key: KAFKA-17962
> URL: https://issues.apache.org/jira/browse/KAFKA-17962
> Project: Kafka
> Issue Type: Sub-task
> Components: clients, connect, consumer, system tests
> Affects Versions: 3.9.0
> Reporter: Kirk True
> Assignee: Chia-Ping Tsai
> Priority: Major
> Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> Many of the system tests fail in {{connect_distributed_test.py}} because the
> parameterized value for {{group_protocol}} is not being consistently passed
> down through all parts of the system test. This can be confirmed by looking
> through the system test's results directory (e.g.
> {{{}results/2024-11-25--002/ConnectDistributedTest/test_pause_and_resume_sink/connect_protocol=compatible.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer/1{}}})
> after the test fails.
> Here's what it shows for me:
> {code:bash}
> $ grep -R "group.protocol = " ConnectDistributedService-0-281473237465016
> ConnectDistributedService-0-281473237465016/ducker06/connect.log:
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker06/connect.log:
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker06/connect.log:
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker05/connect.log:
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker05/connect.log:
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker05/connect.log:
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker05/connect.log:
> group.protocol = consumer
> ConnectDistributedService-0-281473237465016/ducker04/connect.log:
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker04/connect.log:
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker04/connect.log:
> group.protocol = classic
> {code}
> The {{ConsumerConfig}} output clearly shows that the {{group.protocol}} is
> not properly used when configuring the {{KafkaConsumer}} throughout the test.
> This results in failures shown in this
> {{test_pause_and_resume_sink.connect_protocol}} output:
> {noformat}
> TimeoutError('Failed to consume messages after resuming sink connector')
> Traceback (most recent call last):
> File
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
> line 351, in _do_run
> data = self.run_test()
> File
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
> line 411, in run_test
> return self.test_context.function(self.test)
> File
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
> line 438, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
> File
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
> line 415, in test_pause_and_resume_sink
> wait_until(lambda: len(self.sink.received_messages()) > num_messages,
> timeout_sec=30,
> File
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/utils/util.py",
> line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from
> last_exception
> ducktape.errors.TimeoutError: Failed to consume messages after resuming sink
> connector
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)