[jira] [Created] (KAFKA-16582) Feature Request: Introduce max.record.size Configuration Parameter for Producers
Ramiz Mehran created KAFKA-16582: Summary: Feature Request: Introduce max.record.size Configuration Parameter for Producers Key: KAFKA-16582 URL: https://issues.apache.org/jira/browse/KAFKA-16582 Project: Kafka Issue Type: New Feature Components: producer Affects Versions: 3.6.2 Reporter: Ramiz Mehran {*}Summary{*}: Currently, Kafka producers have a {{max.request.size}} configuration that limits the size of the request sent to Kafka brokers, which includes both compressed and uncompressed data sizes. However, it is also the maximum size of an individual record before it is compressed. This can lead to inefficiencies and unexpected behaviours, particularly when records are significantly large before compression but fit multiple times into the {{max.request.size}} after compression. {*}Problem{*}: During spikes in data transmission, especially with large records, even when compressed within the limits of {{{}max.request.size{}}}, it causes an increase in latency and potential backlog in processing due to the large batch sizes formed by compressed records. This problem is particularly pronounced when using highly efficient compression algorithms like zstd, where the compressed size may allow for large batches that are inefficient to process. {*}Proposed Solution{*}: Introduce a new producer configuration parameter: {{{}max.record.size{}}}. This parameter will allow administrators to define the maximum size of a record before it is compressed. This would help in managing expectations and system behavior more predictably by separating uncompressed record size limit from compressed request size limit. {*}Benefits{*}: # {*}Predictability{*}: Producers can reject records that exceed the {{max.record.size}} before spending resources on compression. # {*}Efficiency{*}: Helps in maintaining efficient batch sizes and system throughput, especially under high load conditions. # {*}System Stability{*}: Avoids the potential for large batch processing which can affect latency and throughput negatively. {*}Example{*}: Consider a scenario where the producer sends records up to 20 MB in size which, when compressed, fit into a batch under the 25 MB {{max.request.size }}multiple times. These batches can be problematic to process efficiently, even though they meet the current maximum request size constraints. With {{{}max.record.size{}}}, we could separate max.request.size to only limit compressed request size creation, thus helping us limit that to say 5 MB. Thus, preventing very large requests being made, and causing latency spikes. {*}Steps to Reproduce{*}: # Configure a Kafka producer with {{max.request.size}} set to 25 MB. # Send multiple uncompressed records close to 20 MB that compress to less than 25 MB. # Observe the impact on Kafka broker performance and client side latency. {*}Expected Behavior{*}: The producer should allow administrators to set both pre-compression record size limits and total request size limits post compression. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10034) Clarify Usage of "batch.size" and "max.request.size" Producer Configs
[ https://issues.apache.org/jira/browse/KAFKA-10034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838450#comment-17838450 ] Ramiz Mehran commented on KAFKA-10034: -- "Firstly, this configuration is a cap on the maximum uncompressed record batch size." should be changed to "Firstly, this configuration is a cap on the maximum uncompressed record size." As the cap is on uncompressed single record's max size. > Clarify Usage of "batch.size" and "max.request.size" Producer Configs > - > > Key: KAFKA-10034 > URL: https://issues.apache.org/jira/browse/KAFKA-10034 > Project: Kafka > Issue Type: Improvement > Components: docs, producer >Reporter: Mark Cox >Assignee: Badai Aqrandista >Priority: Minor > > The documentation around the producer configurations "batch.size" and > "max.request.size", and how they relate to one another, can be confusing. > In reality, the "max.request.size" is a hard limit on each individual record, > but the documentation makes it seem this is the maximum size of a request > sent to Kafka. If there is a situation where "batch.size" is set greater > than "max.request.size" (and each individual record is smaller than > "max.request.size") you could end up with larger requests than expected sent > to Kafka. > There are a few things that could be considered to make this clearer: > # Improve the documentation to clarify the two producer configurations and > how they relate to each other > # Provide a producer check, and possibly a warning, if "batch.size" is found > to be greater than "max.request.size" > # The producer could take the _minimum_ of "batch.size" or "max.request.size" > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-10034) Clarify Usage of "batch.size" and "max.request.size" Producer Configs
[ https://issues.apache.org/jira/browse/KAFKA-10034 ] Ramiz Mehran deleted comment on KAFKA-10034: -- was (Author: JIRAUSER290918): "Firstly, this configuration is a cap on the maximum uncompressed record batch size." should be changed to "Firstly, this configuration is a cap on the maximum uncompressed record size." > Clarify Usage of "batch.size" and "max.request.size" Producer Configs > - > > Key: KAFKA-10034 > URL: https://issues.apache.org/jira/browse/KAFKA-10034 > Project: Kafka > Issue Type: Improvement > Components: docs, producer >Reporter: Mark Cox >Assignee: Badai Aqrandista >Priority: Minor > > The documentation around the producer configurations "batch.size" and > "max.request.size", and how they relate to one another, can be confusing. > In reality, the "max.request.size" is a hard limit on each individual record, > but the documentation makes it seem this is the maximum size of a request > sent to Kafka. If there is a situation where "batch.size" is set greater > than "max.request.size" (and each individual record is smaller than > "max.request.size") you could end up with larger requests than expected sent > to Kafka. > There are a few things that could be considered to make this clearer: > # Improve the documentation to clarify the two producer configurations and > how they relate to each other > # Provide a producer check, and possibly a warning, if "batch.size" is found > to be greater than "max.request.size" > # The producer could take the _minimum_ of "batch.size" or "max.request.size" > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10034) Clarify Usage of "batch.size" and "max.request.size" Producer Configs
[ https://issues.apache.org/jira/browse/KAFKA-10034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838447#comment-17838447 ] Ramiz Mehran commented on KAFKA-10034: -- "Firstly, this configuration is a cap on the maximum uncompressed record batch size." should be changed to "Firstly, this configuration is a cap on the maximum uncompressed record size." > Clarify Usage of "batch.size" and "max.request.size" Producer Configs > - > > Key: KAFKA-10034 > URL: https://issues.apache.org/jira/browse/KAFKA-10034 > Project: Kafka > Issue Type: Improvement > Components: docs, producer >Reporter: Mark Cox >Assignee: Badai Aqrandista >Priority: Minor > > The documentation around the producer configurations "batch.size" and > "max.request.size", and how they relate to one another, can be confusing. > In reality, the "max.request.size" is a hard limit on each individual record, > but the documentation makes it seem this is the maximum size of a request > sent to Kafka. If there is a situation where "batch.size" is set greater > than "max.request.size" (and each individual record is smaller than > "max.request.size") you could end up with larger requests than expected sent > to Kafka. > There are a few things that could be considered to make this clearer: > # Improve the documentation to clarify the two producer configurations and > how they relate to each other > # Provide a producer check, and possibly a warning, if "batch.size" is found > to be greater than "max.request.size" > # The producer could take the _minimum_ of "batch.size" or "max.request.size" > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on PR #15305: URL: https://github.com/apache/kafka/pull/15305#issuecomment-2062936947 @showuon , just checking did you get a chance to look at the updated test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16467) Add README to docs folder
[ https://issues.apache.org/jira/browse/KAFKA-16467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838438#comment-17838438 ] ASF GitHub Bot commented on KAFKA-16467: showuon merged PR #596: URL: https://github.com/apache/kafka-site/pull/596 > Add README to docs folder > - > > Key: KAFKA-16467 > URL: https://issues.apache.org/jira/browse/KAFKA-16467 > Project: Kafka > Issue Type: Improvement >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > > We don't have a guide in project root folder or docs folder to show how to > run local website. It's good to provide a way to run document with kafka-site > repository. > > Option 1: Add links to wiki page > [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes] > and > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67634793]. > Option 2: Show how to run the document within container. For example: moving > `site-docs` from kafka to kafka-site repository and run `./start-preview.sh`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2062889530 @FrankYang0529 , there is checkstyle error: `[2024-04-17T14:04:27.072Z] [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15616/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:70:34: Name 'futureDirPattern' must match pattern '(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)'. [ConstantName] ` Please help fix it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1569846240 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future"); Review Comment: Sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
showuon merged PR #15747: URL: https://github.com/apache/kafka/pull/15747 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
showuon commented on PR #15747: URL: https://github.com/apache/kafka/pull/15747#issuecomment-2062884304 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
showuon commented on PR #15748: URL: https://github.com/apache/kafka/pull/15748#issuecomment-2062882937 Looks like `LogCleanerParameterizedIntegrationTest` failed due to this change. Please take a look. Thanks. https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15748/1/#showFailuresLink -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]
aaron-ai commented on code in PR #15728: URL: https://github.com/apache/kafka/pull/15728#discussion_r1569813058 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java: ## @@ -169,6 +170,7 @@ static Map sourceConsumerConfig(Map props) { result.putAll(Utils.entriesWithPrefix(props, CONSUMER_CLIENT_PREFIX)); result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX + CONSUMER_CLIENT_PREFIX)); result.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); +result.put(ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); Review Comment: @C0urante OK, I will draft a KIP later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16574) The metrics of LogCleaner disappear after reconfiguration
[ https://issues.apache.org/jira/browse/KAFKA-16574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838420#comment-17838420 ] Chia Chuan Yu commented on KAFKA-16574: --- Hi, [~chia7712] Can I have this one ? thanks! > The metrics of LogCleaner disappear after reconfiguration > - > > Key: KAFKA-16574 > URL: https://issues.apache.org/jira/browse/KAFKA-16574 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > see > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/core/src/main/scala/kafka/log/LogCleaner.scala#L227] > We don't rebuild the metrics after calling shutdown. The following test can > prove that. > {code:java} > @Test > def testMetricsAfterReconfiguration(): Unit = { > val logCleaner = new LogCleaner(new CleanerConfig(true), > logDirs = Array(TestUtils.tempDir()), > logs = new Pool[TopicPartition, UnifiedLog](), > logDirFailureChannel = new LogDirFailureChannel(1), > time = time) > def check(): Unit = > LogCleaner.MetricNames.foreach(name => > assertNotNull(KafkaYammerMetrics.defaultRegistry.allMetrics().get(logCleaner.metricsGroup > .metricName(name, java.util.Collections.emptyMap())), s"$name is > gone?")) > try { > check() > logCleaner.reconfigure(new KafkaConfig(TestUtils.createBrokerConfig(1, > "localhost:2181")), > new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181"))) > check() > } finally logCleaner.shutdown() > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Modify SocketServer#sendResponse trace msg [kafka]
TaiJuWu commented on code in PR #15749: URL: https://github.com/apache/kafka/pull/15749#discussion_r1569739083 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -1082,7 +1082,7 @@ private[kafka] class Processor( // `protected` for test usage protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send): Unit = { val connectionId = response.request.context.connectionId -trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response") +trace(s"Socket server received response from $connectionId, registering for write and sending data: $response") Review Comment: Oh, you’re right. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Modify SocketServer#sendResponse trace msg [kafka]
TaiJuWu closed pull request #15749: MINOR: Modify SocketServer#sendResponse trace msg URL: https://github.com/apache/kafka/pull/15749 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16577) New consumer fails with stop within allotted timeout in consumer_test.py system test
[ https://issues.apache.org/jira/browse/KAFKA-16577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16577: -- Description: The {{consumer_test.py}} system test intermittently fails with the following error: {code} test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 2 minutes 35.925 seconds AssertionError('Node ubuntu@worker5: did not stop within the specified timeout of 30 seconds') Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", line 176, in test_consumer_bounce self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", line 39, in rolling_bounce_consumers consumer.stop_node(node, clean_shutdown) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", line 419, in stop_node (str(node.account), str(self.stop_timeout_sec)) AssertionError: Node ubuntu@worker5: did not stop within the specified timeout of 30 seconds {code} Affected tests: * {{test_broker_failure}} * {{test_consumer_bounce}} was: The {{consumer_test.py}} system test intermittently fails with the following error: {code} test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 2 minutes 35.925 seconds AssertionError('Node ubuntu@worker5: did not stop within the specified timeout of 30 seconds') Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", line 176, in test_consumer_bounce self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", line 39, in rolling_bounce_consumers consumer.stop_node(node, clean_shutdown) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", line 419, in stop_node (str(node.account), str(self.stop_timeout_sec)) AssertionError: Node ubuntu@worker5: did not stop within the specified timeout of 30 seconds {code} Affected tests: * {{test_consumer_bounce}} > New consumer fails with stop within allotted timeout in consumer_test.py > system test > > > Key: KAFKA-16577 > URL: https://issues.apache.org/jira/browse/KAFKA-16577 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: flaky-test, kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{consumer_test.py}} system test intermittently fails with the following > error: > {code} > test_id: > kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL >
[jira] [Updated] (KAFKA-16577) New consumer fails with stop within allotted timeout in consumer_test.py system test
[ https://issues.apache.org/jira/browse/KAFKA-16577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16577: -- Summary: New consumer fails with stop within allotted timeout in consumer_test.py system test (was: New consumer fails with stop within allotted timeout in consumer_test.py’s test_consumer_bounce system test) > New consumer fails with stop within allotted timeout in consumer_test.py > system test > > > Key: KAFKA-16577 > URL: https://issues.apache.org/jira/browse/KAFKA-16577 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: flaky-test, kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{consumer_test.py}} system test intermittently fails with the following > error: > {code} > test_id: > kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 2 minutes 35.925 seconds > AssertionError('Node ubuntu@worker5: did not stop within the specified > timeout of 30 seconds') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", > line 176, in test_consumer_bounce > self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", > line 39, in rolling_bounce_consumers > consumer.stop_node(node, clean_shutdown) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 419, in stop_node > (str(node.account), str(self.stop_timeout_sec)) > AssertionError: Node ubuntu@worker5: did not stop within the specified > timeout of 30 seconds > {code} > Affected tests: > * {{test_consumer_bounce}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on PR #15685: URL: https://github.com/apache/kafka/pull/15685#issuecomment-2062753494 I noticed I need to do Quorum and Broker features which are basically the same implementation. Stay tuned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16581) Implement KafkaRaftClient unittest for membership change
José Armando García Sancio created KAFKA-16581: -- Summary: Implement KafkaRaftClient unittest for membership change Key: KAFKA-16581 URL: https://issues.apache.org/jira/browse/KAFKA-16581 Project: Kafka Issue Type: Sub-task Reporter: José Armando García Sancio Fix For: 3.8.0 Add a new test suite like KafkaRaftClientTest and KafkaRaftClientSnapshotTest that validates the implementation. In addition KafkaRaftClientTest and KafkaRaftClientSnapshotTest should work with all versions for the RPCs. Test upgrades from kraft.version 0 to 1. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16580) Write simulation tests for kraft membership change
[ https://issues.apache.org/jira/browse/KAFKA-16580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-16580: --- Description: Update RaftEventSimulationTest to validate kraft.version 1 and changes made for membership change. > Write simulation tests for kraft membership change > -- > > Key: KAFKA-16580 > URL: https://issues.apache.org/jira/browse/KAFKA-16580 > Project: Kafka > Issue Type: Sub-task >Reporter: José Armando García Sancio >Priority: Major > Fix For: 3.8.0 > > > Update RaftEventSimulationTest to validate kraft.version 1 and changes made > for membership change. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16580) Write simulation tests for kraft membership change
José Armando García Sancio created KAFKA-16580: -- Summary: Write simulation tests for kraft membership change Key: KAFKA-16580 URL: https://issues.apache.org/jira/browse/KAFKA-16580 Project: Kafka Issue Type: Sub-task Reporter: José Armando García Sancio Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14094) KIP-853: KRaft controller membership changes
[ https://issues.apache.org/jira/browse/KAFKA-14094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-14094: --- Fix Version/s: 3.8.0 > KIP-853: KRaft controller membership changes > > > Key: KAFKA-14094 > URL: https://issues.apache.org/jira/browse/KAFKA-14094 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: José Armando García Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > Labels: 4.0-blocker > Fix For: 3.8.0 > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14094) KIP-853: KRaft controller membership changes
[ https://issues.apache.org/jira/browse/KAFKA-14094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-14094: --- Labels: (was: 4.0-blocker) > KIP-853: KRaft controller membership changes > > > Key: KAFKA-14094 > URL: https://issues.apache.org/jira/browse/KAFKA-14094 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: José Armando García Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > Fix For: 3.8.0 > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16557) Fix OffsetFetchRequestState.toString()
[ https://issues.apache.org/jira/browse/KAFKA-16557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838402#comment-17838402 ] Fu Qiao commented on KAFKA-16557: - Created a PR for this if you don't mind: [https://github.com/apache/kafka/pull/15750] Thanks > Fix OffsetFetchRequestState.toString() > -- > > Key: KAFKA-16557 > URL: https://issues.apache.org/jira/browse/KAFKA-16557 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, logging > Fix For: 3.8.0 > > > The code incorrectly overrides the {{toString()}} method instead of > overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting > consumer issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]
phooq opened a new pull request, #15750: URL: https://github.com/apache/kafka/pull/15750 ### Description This PR is for [KAFKA-16557](https://issues.apache.org/jira/browse/KAFKA-16557). This is to enhance the debugging and troubleshooting for consumer related issues. ### Test `./gradew jar` ``` BUILD SUCCESSFUL in 3m 16s ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16579: -- Description: To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a slew of system tests to run both the "old" and "new" implementations. KAFKA-16271 updated the system tests in {{consumer_rolling_upgrade_test.py}} so it could test the new consumer. However, the test is tailored specifically to the "old" Consumer's protocol and assignment strategy upgrade. Unsurprisingly, when we run those system tests with the new {{AsyncKafkaConsumer}}, we get errors like the following: {code} test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 29.634 seconds AssertionError("Mismatched assignment: {frozenset(), frozenset({TopicPartition(topic='test_topic', partition=0), TopicPartition(topic='test_topic', partition=1)})}") Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", line 77, in rolling_update_test self._verify_range_assignment(consumer) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", line 41, in _verify_range_assignment "Mismatched assignment: %s" % assignment AssertionError: Mismatched assignment: {frozenset(), frozenset({TopicPartition(topic='test_topic', partition=0), TopicPartition(topic='test_topic', partition=1)})} {code} The task here is to revert the changes made in KAFKA-16271. was: To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a slew of system tests to run both the "old" and "new" implementations. KAFKA-16271 updated the system tests in {{consumer_rolling_upgrade_test.py}} so it could test the new consumer. However, the test is tailored specifically to the "old" Consumer's protocol and assignment strategy upgrade. Unsurprisingly, when we run those system tests with the new {{AsyncKafkaConsumer}}, we get errors like the following: {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 6 minutes 3.899 seconds InsufficientResourcesError('Not enough nodes available to allocate. linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py", line 919, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py", line 107, in __init__ self.allocate_nodes() File
[jira] [Updated] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16579: -- Description: To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a slew of system tests to run both the "old" and "new" implementations. KAFKA-16271 updated the system tests in {{consumer_rolling_upgrade_test.py}} so it could test the new consumer. However, the test is tailored specifically to the "old" Consumer's protocol and assignment strategy upgrade. Unsurprisingly, when we run those system tests with the new {{AsyncKafkaConsumer}}, we get errors like the following: {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 6 minutes 3.899 seconds InsufficientResourcesError('Not enough nodes available to allocate. linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py", line 919, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py", line 107, in __init__ self.allocate_nodes() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py", line 217, in allocate_nodes self.nodes = self.cluster.alloc(self.cluster_spec) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/cluster.py", line 54, in alloc allocated = self.do_alloc(cluster_spec) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/finite_subcluster.py", line 31, in do_alloc allocated = self._available_nodes.remove_spec(cluster_spec) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/node_container.py", line 117, in remove_spec raise InsufficientResourcesError("Not enough nodes available to allocate. " + msg) ducktape.cluster.node_container.InsufficientResourcesError: Not enough nodes available to allocate. linux nodes requested: 1. linux nodes available: 0 {code} The task here is to revert the changes made in KAFKA-16272 [PR 15576|https://github.com/apache/kafka/pull/15576]. was: To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a slew of system tests to run both the "old" and "new" implementations. KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it could test the new consumer with Connect. However, we are not supporting Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the Connect system tests with the new {{{}AsyncKafkaConsumer{}}}, we get errors like the following: {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 6 minutes 3.899 seconds InsufficientResourcesError('Not enough nodes available to allocate. linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
[jira] [Updated] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16578: -- Description: To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a slew of system tests to run both the "old" and "new" implementations. KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it could test the new consumer with Connect. However, we are not supporting Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the Connect system tests with the new {{AsyncKafkaConsumer}}, we get errors like the following: {code} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 6 minutes 3.899 seconds InsufficientResourcesError('Not enough nodes available to allocate. linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py", line 919, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py", line 107, in __init__ self.allocate_nodes() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py", line 217, in allocate_nodes self.nodes = self.cluster.alloc(self.cluster_spec) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/cluster.py", line 54, in alloc allocated = self.do_alloc(cluster_spec) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/finite_subcluster.py", line 31, in do_alloc allocated = self._available_nodes.remove_spec(cluster_spec) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/node_container.py", line 117, in remove_spec raise InsufficientResourcesError("Not enough nodes available to allocate. " + msg) ducktape.cluster.node_container.InsufficientResourcesError: Not enough nodes available to allocate. linux nodes requested: 1. linux nodes available: 0 {code} The task here is to revert the changes made in KAFKA-16272 [PR 15576|https://github.com/apache/kafka/pull/15576]. was: To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a slew of system tests to run both the "old" and "new" implementations. KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it could test the new consumer with Connect. However, we are not supporting Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the Connect system tests with the new {{{}AsyncKafkaConsumer{}}}, we get errors like the following: {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 6 minutes 3.899 seconds InsufficientResourcesError('Not enough nodes available to allocate. linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run
[jira] [Created] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer
Kirk True created KAFKA-16579: - Summary: Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer Key: KAFKA-16579 URL: https://issues.apache.org/jira/browse/KAFKA-16579 Project: Kafka Issue Type: Task Components: clients, consumer, system tests Affects Versions: 3.8.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a slew of system tests to run both the "old" and "new" implementations. KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it could test the new consumer with Connect. However, we are not supporting Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the Connect system tests with the new {{{}AsyncKafkaConsumer{}}}, we get errors like the following: {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 6 minutes 3.899 seconds InsufficientResourcesError('Not enough nodes available to allocate. linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py", line 919, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py", line 107, in __init__ self.allocate_nodes() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py", line 217, in allocate_nodes self.nodes = self.cluster.alloc(self.cluster_spec) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/cluster.py", line 54, in alloc allocated = self.do_alloc(cluster_spec) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/finite_subcluster.py", line 31, in do_alloc allocated = self._available_nodes.remove_spec(cluster_spec) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/node_container.py", line 117, in remove_spec raise InsufficientResourcesError("Not enough nodes available to allocate. " + msg) ducktape.cluster.node_container.InsufficientResourcesError: Not enough nodes available to allocate. linux nodes requested: 1. linux nodes available: 0 {code} The task here is to revert the changes made in KAFKA-16272 [PR 15576|https://github.com/apache/kafka/pull/15576]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16579: - Assignee: Philip Nee (was: Kirk True) > Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer > - > > Key: KAFKA-16579 > URL: https://issues.apache.org/jira/browse/KAFKA-16579 > Project: Kafka > Issue Type: Task > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated > a slew of system tests to run both the "old" and "new" implementations. > KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it > could test the new consumer with Connect. However, we are not supporting > Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the > Connect system tests with the new {{{}AsyncKafkaConsumer{}}}, we get errors > like the following: > {code:java} > test_id: > kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 6 minutes 3.899 seconds > InsufficientResourcesError('Not enough nodes available to allocate. linux > nodes requested: 1. linux nodes available: 0') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py", > line 919, in test_exactly_once_source > consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, > self.source.topic, consumer_timeout_ms=1000, print_key=True) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py", > line 97, in __init__ > BackgroundThreadService.__init__(self, context, num_nodes) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", > line 26, in __init__ > super(BackgroundThreadService, self).__init__(context, num_nodes, > cluster_spec, *args, **kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py", > line 107, in __init__ > self.allocate_nodes() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py", > line 217, in allocate_nodes > self.nodes = self.cluster.alloc(self.cluster_spec) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/cluster.py", > line 54, in alloc > allocated = self.do_alloc(cluster_spec) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/finite_subcluster.py", > line 31, in do_alloc > allocated = self._available_nodes.remove_spec(cluster_spec) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/node_container.py", > line 117, in remove_spec > raise InsufficientResourcesError("Not enough nodes available to allocate. > " + msg) > ducktape.cluster.node_container.InsufficientResourcesError: Not enough nodes > available to allocate. linux nodes requested: 1. linux nodes available: 0 > {code} > The task here is to revert the changes made in KAFKA-16272 [PR > 15576|https://github.com/apache/kafka/pull/15576]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer
Kirk True created KAFKA-16578: - Summary: Revert changes to connect_distributed_test.py for the new async Consumer Key: KAFKA-16578 URL: https://issues.apache.org/jira/browse/KAFKA-16578 Project: Kafka Issue Type: Task Components: clients, consumer, system tests Affects Versions: 3.8.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a slew of system tests to run both the "old" and "new" implementations. KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it could test the new consumer with Connect. However, we are not supporting Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the Connect system tests with the new {{{}AsyncKafkaConsumer{}}}, we get errors like the following: {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 6 minutes 3.899 seconds InsufficientResourcesError('Not enough nodes available to allocate. linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py", line 919, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py", line 107, in __init__ self.allocate_nodes() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py", line 217, in allocate_nodes self.nodes = self.cluster.alloc(self.cluster_spec) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/cluster.py", line 54, in alloc allocated = self.do_alloc(cluster_spec) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/finite_subcluster.py", line 31, in do_alloc allocated = self._available_nodes.remove_spec(cluster_spec) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/node_container.py", line 117, in remove_spec raise InsufficientResourcesError("Not enough nodes available to allocate. " + msg) ducktape.cluster.node_container.InsufficientResourcesError: Not enough nodes available to allocate. linux nodes requested: 1. linux nodes available: 0 {code} The task here is to revert the changes made in KAFKA-16272 [PR 15576|https://github.com/apache/kafka/pull/15576]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16576) New consumer fails with assert in consumer_test.py’s test_consumer_failure system test
[ https://issues.apache.org/jira/browse/KAFKA-16576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16576: -- Labels: flaky-test kip-848-client-support system-tests (was: kip-848-client-support system-tests) > New consumer fails with assert in consumer_test.py’s test_consumer_failure > system test > -- > > Key: KAFKA-16576 > URL: https://issues.apache.org/jira/browse/KAFKA-16576 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: flaky-test, kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{consumer_test.py}} system test intermittently fails with the following > error: > {code} > test_id: > kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 42.582 seconds > AssertionError() > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", > line 399, in test_consumer_failure > assert partition_owner is not None > AssertionError > Notify > {code} > Affected tests: > * {{test_consumer_failure}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16577) New consumer fails with stop within allotted timeout in consumer_test.py’s test_consumer_bounce system test
[ https://issues.apache.org/jira/browse/KAFKA-16577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16577: -- Labels: flaky-test kip-848-client-support system-tests (was: kip-848-client-support system-tests) > New consumer fails with stop within allotted timeout in consumer_test.py’s > test_consumer_bounce system test > --- > > Key: KAFKA-16577 > URL: https://issues.apache.org/jira/browse/KAFKA-16577 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: flaky-test, kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{consumer_test.py}} system test intermittently fails with the following > error: > {code} > test_id: > kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 2 minutes 35.925 seconds > AssertionError('Node ubuntu@worker5: did not stop within the specified > timeout of 30 seconds') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", > line 176, in test_consumer_bounce > self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", > line 39, in rolling_bounce_consumers > consumer.stop_node(node, clean_shutdown) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 419, in stop_node > (str(node.account), str(self.stop_timeout_sec)) > AssertionError: Node ubuntu@worker5: did not stop within the specified > timeout of 30 seconds > {code} > Affected tests: > * {{test_consumer_bounce}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16577) New consumer fails with stop within allotted timeout in consumer_test.py’s test_consumer_bounce system test
[ https://issues.apache.org/jira/browse/KAFKA-16577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16577: -- Description: The {{consumer_test.py}} system test intermittently fails with the following error: {code} test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 2 minutes 35.925 seconds AssertionError('Node ubuntu@worker5: did not stop within the specified timeout of 30 seconds') Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", line 176, in test_consumer_bounce self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", line 39, in rolling_bounce_consumers consumer.stop_node(node, clean_shutdown) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", line 419, in stop_node (str(node.account), str(self.stop_timeout_sec)) AssertionError: Node ubuntu@worker5: did not stop within the specified timeout of 30 seconds {code} Affected tests: * {{test_consumer_bounce}} was: The {{consumer_test.py}} system test intermittently fails with the following error: {code} test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 42.582 seconds AssertionError() Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", line 399, in test_consumer_failure assert partition_owner is not None AssertionError Notify {code} Affected tests: * {{test_consumer_failure}} > New consumer fails with stop within allotted timeout in consumer_test.py’s > test_consumer_bounce system test > --- > > Key: KAFKA-16577 > URL: https://issues.apache.org/jira/browse/KAFKA-16577 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{consumer_test.py}} system test intermittently fails with the following > error: > {code} > test_id: > kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 2 minutes 35.925 seconds > AssertionError('Node ubuntu@worker5: did not stop within the specified > timeout of 30 seconds') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return
[jira] [Updated] (KAFKA-16577) New consumer fails with stop within allotted timeout in consumer_test.py’s test_consumer_bounce system test
[ https://issues.apache.org/jira/browse/KAFKA-16577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16577: -- Summary: New consumer fails with stop within allotted timeout in consumer_test.py’s test_consumer_bounce system test (was: New consumer fails with stop timeout in consumer_test.py’s test_consumer_bounce system test) > New consumer fails with stop within allotted timeout in consumer_test.py’s > test_consumer_bounce system test > --- > > Key: KAFKA-16577 > URL: https://issues.apache.org/jira/browse/KAFKA-16577 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{consumer_test.py}} system test intermittently fails with the following > error: > {code} > test_id: > kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 42.582 seconds > AssertionError() > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", > line 399, in test_consumer_failure > assert partition_owner is not None > AssertionError > Notify > {code} > Affected tests: > * {{test_consumer_failure}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16577) New consumer fails with stop timeout in consumer_test.py’s test_consumer_bounce system test
Kirk True created KAFKA-16577: - Summary: New consumer fails with stop timeout in consumer_test.py’s test_consumer_bounce system test Key: KAFKA-16577 URL: https://issues.apache.org/jira/browse/KAFKA-16577 Project: Kafka Issue Type: Bug Components: clients, consumer, system tests Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 The {{consumer_test.py}} system test intermittently fails with the following error: {code} test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 42.582 seconds AssertionError() Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", line 399, in test_consumer_failure assert partition_owner is not None AssertionError Notify {code} Affected tests: * {{test_consumer_failure}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16576) New consumer fails with assert in consumer_test.py’s test_consumer_failure system test
[ https://issues.apache.org/jira/browse/KAFKA-16576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16576: -- Description: The {{consumer_test.py}} system test intermittently fails with the following error: {code} test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 42.582 seconds AssertionError() Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", line 399, in test_consumer_failure assert partition_owner is not None AssertionError Notify {code} Affected tests: * {{test_consumer_failure}} was: The {{consumer_test.py}} system test fails with the following errors: {quote} * Timed out waiting for consumption {quote} Affected tests: * {{test_broker_failure}} * {{test_consumer_bounce}} * {{test_static_consumer_bounce}} > New consumer fails with assert in consumer_test.py’s test_consumer_failure > system test > -- > > Key: KAFKA-16576 > URL: https://issues.apache.org/jira/browse/KAFKA-16576 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{consumer_test.py}} system test intermittently fails with the following > error: > {code} > test_id: > kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 42.582 seconds > AssertionError() > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", > line 399, in test_consumer_failure > assert partition_owner is not None > AssertionError > Notify > {code} > Affected tests: > * {{test_consumer_failure}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16576) New consumer fails with assert in consumer_test.py’s test_consumer_failure system test
Kirk True created KAFKA-16576: - Summary: New consumer fails with assert in consumer_test.py’s test_consumer_failure system test Key: KAFKA-16576 URL: https://issues.apache.org/jira/browse/KAFKA-16576 Project: Kafka Issue Type: Bug Components: clients, consumer, system tests Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 The {{consumer_test.py}} system test fails with the following errors: {quote} * Timed out waiting for consumption {quote} Affected tests: * {{test_broker_failure}} * {{test_consumer_bounce}} * {{test_static_consumer_bounce}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16557) Fix OffsetFetchRequestState.toString()
[ https://issues.apache.org/jira/browse/KAFKA-16557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838384#comment-17838384 ] Fu Qiao commented on KAFKA-16557: - Hey [~kirktrue] : I am new to Kafka, and would like to start with some tasks. Would you mind if I pick this up? Thanks > Fix OffsetFetchRequestState.toString() > -- > > Key: KAFKA-16557 > URL: https://issues.apache.org/jira/browse/KAFKA-16557 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, logging > Fix For: 3.8.0 > > > The code incorrectly overrides the {{toString()}} method instead of > overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting > consumer issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Modify SocketServer#sendResponse trace msg [kafka]
splett2 commented on code in PR #15749: URL: https://github.com/apache/kafka/pull/15749#discussion_r1569578496 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -1082,7 +1082,7 @@ private[kafka] class Processor( // `protected` for test usage protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send): Unit = { val connectionId = response.request.context.connectionId -trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response") +trace(s"Socket server received response from $connectionId, registering for write and sending data: $response") Review Comment: why is the suggested comment more correct? we are sending the response _to_ the connection, we are not receiving a response from the connection. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
junrao commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1569562746 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: @kamalcph : Thanks for the explanation. I understand the problem now. As for the fix, it seems that it could work for HWM. However, I am not sure that we could always do the same thing of LastStableOffset. For example, if we lose the local data in all replicas, the lastStableOffset could still be in the middle of a tiered segment and moving it to localLogStartOffset immediately will be incorrect. Here is another potential approach. Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) is only used in DelayedFetch for estimating the amount of available bytes. If occasionally OffsetMetadata is not available, we don't have to force an exception in convertToOffsetMetadataOrThrow(). Instead, we can leave the OffsetMetadata as empty and just use a conservative 1 byte for estimating the amount of available bytes. This approach will apply to both HWM and LSO. The inaccurate byte estimate will be ok as long as it's infrequent. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
philipnee commented on code in PR #15723: URL: https://github.com/apache/kafka/pull/15723#discussion_r1569554430 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -98,15 +98,15 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return EMPTY; if (coordinatorRequestState.canSendRequest(currentTimeMs)) { -NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs); +NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(); return new NetworkClientDelegate.PollResult(request); } return new NetworkClientDelegate.PollResult(coordinatorRequestState.remainingBackoffMs(currentTimeMs)); } -NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long currentTimeMs) { -coordinatorRequestState.onSendAttempt(currentTimeMs); +NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest() { +coordinatorRequestState.onSendAttempt(); Review Comment: Thanks. This will fix some of the no-backoff issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Modify SocketServer#sendResponse trace msg [kafka]
TaiJuWu opened a new pull request, #15749: URL: https://github.com/apache/kafka/pull/15749 As title. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on PR #15698: URL: https://github.com/apache/kafka/pull/15698#issuecomment-2062253184 Thanks for the comments @kirktrue, all addressed. Regarding your comment regarding tests [here](https://github.com/apache/kafka/pull/15698#pullrequestreview-2004676007), we have that covered with the existing `testHeartbeatResponseOnErrorHandling`. That one is validating that we get the right `nextHeartbeatMs` time (which considers the timer), for each specific error type. Is that what you were looking for? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1569511540 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -231,6 +231,35 @@ public void testTimerNotDue() { assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); } +@Test +public void testHeartbeatNotSentIfAnotherOneInFlight() { +mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + +// Heartbeat sent (no response received) +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); + +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " + +"previous on in-flight"); Review Comment: Fixed (not nit-picky at all, ugly typo, good catch!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16571: reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition [kafka]
jolshan commented on PR #15739: URL: https://github.com/apache/kafka/pull/15739#issuecomment-2062215298 I ran the test 3 times and it seems to be working ok. I can run a few more times if we want to confirm the flakiness is gone. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1569504137 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -482,6 +482,15 @@ public long nextHeartbeatMs(final long currentTimeMs) { return heartbeatTimer.remainingMs(); } +public void onFailedAttempt(final long currentTimeMs) { +// Reset timer to allow sending HB after a failure without waiting for the interval. +// After a failure, a next HB may be needed with backoff (ex. errors that lead to +// retries, like coordinator load error), or immediately (ex. errors that lead to +// rejoining, like fencing errors). +heartbeatTimer.reset(0); +super.onFailedAttempt(currentTimeMs); Review Comment: no, it doesn't. The timer is only to indicate that an interval should be respected. In cases of failures, we don't want to follow the interval (so we reset timer to 0). Each error will : - send a next HB based on other conditions (ex. as soon as the coordinator is discovered, when releasing assignment finishes after getting fenced) - not send a next HB at all (fatal errors) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1569498394 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -380,7 +380,7 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response, break; case UNRELEASED_INSTANCE_ID: -logger.error("GroupHeartbeatRequest failed due to the instance id {} was not released: {}", +logger.error("GroupHeartbeatRequest failed due to unreleased instance id {}: {}", Review Comment: both. The change for the default log is needed: it was not including the errorMessage, and that makes it hard to know what happened when you get errors like INVALID_REQUEST (I personally got it and lost time investigating, so fixed it). The other log changes are just improvements because I was already there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove unneeded explicit type arguments [kafka]
mimaison merged PR #15736: URL: https://github.com/apache/kafka/pull/15736 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1569436277 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -80,10 +80,10 @@ class ZkAdminManager(val config: KafkaConfig, private val configHelper = new ConfigHelper(metadataCache, config, new ZkConfigRepository(adminZkClient)) private val createTopicPolicy = -Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[CreateTopicPolicy])) +Option(config.getConfiguredInstance(CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, classOf[CreateTopicPolicy])) private val alterConfigPolicy = -Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[AlterConfigPolicy])) +Option(config.getConfiguredInstance(CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, classOf[AlterConfigPolicy])) Review Comment: fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove unneeded explicit type arguments [kafka]
mimaison commented on PR #15736: URL: https://github.com/apache/kafka/pull/15736#issuecomment-2062104700 None of the test failures are related, merging to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1569413973 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -777,6 +778,59 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { +if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { +log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", +consumerGroup.groupId()); +return false; +} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { +log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() <= 1) { +log.info("Skip downgrading the consumer group {} to classic group because it's empty.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { +log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", +consumerGroup.groupId()); +} +return true; +} + +public CompletableFuture convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List records) { +consumerGroup.createGroupTombstoneRecords(records); +ClassicGroup classicGroup; +try { +classicGroup = consumerGroup.toClassicGroup( +leavingMemberId, +logContext, +time, +consumerGroupSessionTimeoutMs, +metadataImage, +records +); +} catch (SchemaException e) { +log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " + +"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e); + +throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", +consumerGroup.groupId(), e.getMessage())); +} + +groups.put(consumerGroup.groupId(), classicGroup); Review Comment: Ah yes. Thanks for the catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix typo [kafka]
birdoplank closed pull request #15743: Fix typo URL: https://github.com/apache/kafka/pull/15743 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1569405350 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -775,6 +777,126 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { +if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { +log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", +consumerGroup.groupId()); +return false; +} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { +log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() <= 1) { +log.info("Skip downgrading the consumer group {} to classic group because it's empty.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { +log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", +consumerGroup.groupId()); +} +return true; +} + +public CompletableFuture convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List records) { +consumerGroup.createGroupTombstoneRecords(records); +ClassicGroup classicGroup; +try { +classicGroup = consumerGroup.toClassicGroup( +leavingMemberId, +logContext, +time, +consumerGroupSessionTimeoutMs, +metadataImage, +records +); +} catch (SchemaException e) { +log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " + +"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e); + +throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", +consumerGroup.groupId(), e.getMessage())); +} + +groups.put(consumerGroup.groupId(), classicGroup); +metrics.onClassicGroupStateTransition(null, classicGroup.currentState()); + +CompletableFuture appendFuture = new CompletableFuture<>(); +appendFuture.whenComplete((__, t) -> { +if (t == null) { +classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member)); +prepareRebalance(classicGroup, String.format("Downgrade group %s.", classicGroup.groupId())); Review Comment: Yeah that makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]
lianetm commented on code in PR #15737: URL: https://github.com/apache/kafka/pull/15737#discussion_r1569396277 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -140,22 +150,32 @@ class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler): def __init__(self, node, verify_offsets, idx): super().__init__(node, verify_offsets, idx) -def handle_partitions_revoked(self, event): +def handle_partitions_revoked(self, event, node, logger): self.revoked_count += 1 self.state = ConsumerState.Rebalancing self.position = {} +revoked = [] + for topic_partition in event["partitions"]: -topic = topic_partition["topic"] -partition = topic_partition["partition"] -self.assignment.remove(TopicPartition(topic, partition)) +tp = _create_partition_from_dict(topic_partition) -def handle_partitions_assigned(self, event): +if tp in self.assignment: +self.assignment.remove(tp) +revoked.append(tp) +else: +logger.warn("Could not remove topic partition %s from assignment as it was not previously assigned to %s" % (tp, node.account.hostname)) Review Comment: do we understand why this situation is happening? Is it related maybe to the mismatch assignment failure we've seen elsewhere in the tests? My point is just to make sure we're not hiding the real failure with this change. I wouldn't expect that the consumer would ever receive a partition to revoke if it was not previously assigned right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
kamalcph commented on PR #15631: URL: https://github.com/apache/kafka/pull/15631#issuecomment-2062050733 > @kamalcph : Do you want this PR to be in 3.7 and 3.6? If so, could you create a separate cherry-pick RP? Thanks. Opened #15747 and #15748 to backport it to 3.7 and 3.6 branches. PTAL. Thanks for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
kamalcph opened a new pull request, #15748: URL: https://github.com/apache/kafka/pull/15748 cherry-picked from: d092787487047178c674bd60a182129a8d997c4a Co-authored-by: hzh0425 <642256...@qq.com> ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1569356239 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1223,6 +1223,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, s"but we only have log segments starting from offset: $logStartOffset.") } + private def checkLocalLogStartOffset(offset: Long): Unit = { Review Comment: Agree on this. The `checkLocalLogStartOffset` is used only in the `convertToOffsetMetadataOrThrow` method which reads from local-disk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? yes, we want to address this case too. And, the issue can also happen during clean preferred-leader-election: ``` Call stack: The replica (1002) has full data but HW is invalid, then the fetch-offset will be equal to LeaderLog(1001).highWatermark Leader (1001): KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLocalLog Partition.fetchRecords Partition.updateFollowerFetchState Partition.maybeExpandIsr Partition.submitAlterPartition ... ... ... # If there is not enough data to respond and there is no remote data, we will let the fetch request wait for new data. # parks the request in the DelayedFetchPurgatory Another thread, runs Preferred-Leader-Election in controller (1003), since the replica 1002 joined the ISR list, it can be elected as the preferred leader. The controller sends LeaderAndIsr requests to all the brokers. KafkaController.processReplicaLeaderElection KafkaController.onReplicaElection PartitionStateMachine.handleStateChanges PartitionStateMachine.doHandleStateChanges PartitionStateMachine.electLeaderForPartitions ControllerChannelManager.sendRequestsToBrokers Replica 1002 got elected as Leader and have invalid highWatermark since it didn't process the fetch-response from the previous leader 1001, throws OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that in LeaderAndIsr request even if one partition fails, then the remaining partitions in that request won't be processed. KafkaApis.handleLeaderAndIsrRequest ReplicaManager.becomeLeaderOrFollower ReplicaManager.makeLeaders Partition.makeLeader Partition.maybeIncrementLeaderHW UnifiedLog.maybeIncrementHighWatermark (LeaderLog) UnifiedLog.fetchHighWatermarkMetadata The controller assumes that the current-leader for the tp0 is 1002, but the broker 1002 couldn't process the LISR. The controller retries the LISR until the broker 1002 becomes leader for tp0. During this time, the producers won't be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION error-code to the producer. During this time, if a follower sends the FETCH request to read from the current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned by the leader: KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLog Partition.fetchRecords # readFromLocalLog Partition.updateFollowerFetchState Partition.maybeIncrementLeaderHW LeaderLog.maybeIncrementHighWatermark UnifiedLog.fetchHighWatermarkMetadata UnifiedLog.convertToOffsetMetadataOrThrow LocalLog.convertToOffsetMetadataOrThrow LocalLog.read # OffsetOutOfRangeException exception ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about
[PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
kamalcph opened a new pull request, #15747: URL: https://github.com/apache/kafka/pull/15747 cherry-picked from: d092787487047178c674bd60a182129a8d997c4a Co-authored-by: hzh0425 <642256...@qq.com> ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16575) Automatically remove KTable aggregation result when group becomes empty
Matthias J. Sax created KAFKA-16575: --- Summary: Automatically remove KTable aggregation result when group becomes empty Key: KAFKA-16575 URL: https://issues.apache.org/jira/browse/KAFKA-16575 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax Using `KTable.groupBy(...).aggregate(...)` can handle updates (inserts, deletes, actual updates) of the input KTable, by calling the provided `Adder` and `Subtractor`. However, when all records from the input table (which map to the same group/row in the result table) get removed, the result entry is not removed automatically. For example, if we implement a "count", the count would go to zero for a group by default, instead of removing the row from the result, if all input record for this group got deleted. Users can let their `Subtractor` return `null` for this case, to actually delete the row, but it's not well documented and it seems it should be a built-in feature of the table-aggregation to remove "empty groups" from the result, instead of relying on "correct" behavior of user-code. (Also the built-in `count()` does not return `null`, but actually zero...) An internal counter how many elements are in a group should be sufficient. Of course, there is backward compatibility questions we need to answer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1569328393 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > For the makeLeaders path, it will call UnifiedLog.convertToOffsetMetadataOrThrow. Within it, checkLogStartOffset(offset) shouldn't throw OFFSET_OUT_OF_RANGE since we are comparing the offset with logStartOffset. Do you know which part throws OFFSET_OUT_OF_RANGE error? The next line `localLog.convertToOffsetMetadataOrThrow` in [convertToOffsetMetadataOrThrow](https://sourcegraph.com/github.com/apache/kafka@a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L1429) method reads the segment from disk, there it throws the error. The call [segments.floorSegment(startOffset)](https://sourcegraph.com/github.com/apache/kafka@a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/-/blob/core/src/main/scala/kafka/log/LocalLog.scala?L366) in LocalLog fails to find the segment with log-start-offset, then OffsetOutOfRangeException is thrown. > For the follower fetch path, it's bounded by LogEndOffset. So it shouldn't need to call UnifiedLog.fetchHighWatermarkMetadata, right? The regular consumer will call UnifiedLog.fetchHighWatermarkMetadata. yes, you're right. I attached the wrong call stack for handling the follower request. Please find the updated call stack below: Leader with invalid high-watermark handles the FETCH requests from follower and throws OFFSET_OUT_OF_RANGE error: ``` KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLog Partition.fetchRecords # readFromLocalLog Partition.updateFollowerFetchState Partition.maybeIncrementLeaderHW LeaderLog.maybeIncrementHighWatermark UnifiedLog.fetchHighWatermarkMetadata UnifiedLog.convertToOffsetMetadataOrThrow LocalLog.convertToOffsetMetadataOrThrow LocalLog.read # OffsetOutOfRangeException exception ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
chia7712 commented on code in PR #15745: URL: https://github.com/apache/kafka/pull/15745#discussion_r1569310965 ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -82,6 +82,10 @@ public Builder setNumBrokerNodes(int numBrokerNodes) { return setBrokerNodes(numBrokerNodes, 1); } +public Builder setNumBrokerNodes(int numBrokerNodes, int disksPerBroker) { Review Comment: why we need this method? it is equal to `setBrokerNodes` ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -155,29 +161,31 @@ public ClusterConfig copyOf() { } public static Builder defaultClusterBuilder() { -return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); +return new Builder(Type.ZK, 1, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); } -public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, +public static Builder clusterBuilder(Type type, int brokers, int controllers, int disksPerBroker, boolean autoStart, SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { -return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion); +return new Builder(type, brokers, controllers, disksPerBroker, autoStart, securityProtocol, metadataVersion); } public static class Builder { private Type type; private int brokers; private int controllers; +private int disksPerBroker; private String name; private boolean autoStart; private SecurityProtocol securityProtocol; private String listenerName; private File trustStoreFile; private MetadataVersion metadataVersion; -Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { +Builder(Type type, int brokers, int controllers, int disksPerBroker, boolean autoStart, SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { Review Comment: Could we call setters instead of having a complicated constructor? We should not waste the build pattern ... this is unrelated to this PR, and it already have a jira (https://issues.apache.org/jira/browse/KAFKA-16560). However, we can do a bit refactor here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16574) The metrics of LogCleaner disappear after reconfiguration
Chia-Ping Tsai created KAFKA-16574: -- Summary: The metrics of LogCleaner disappear after reconfiguration Key: KAFKA-16574 URL: https://issues.apache.org/jira/browse/KAFKA-16574 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai see [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/core/src/main/scala/kafka/log/LogCleaner.scala#L227] We don't rebuild the metrics after calling shutdown. The following test can prove that. {code:java} @Test def testMetricsAfterReconfiguration(): Unit = { val logCleaner = new LogCleaner(new CleanerConfig(true), logDirs = Array(TestUtils.tempDir()), logs = new Pool[TopicPartition, UnifiedLog](), logDirFailureChannel = new LogDirFailureChannel(1), time = time) def check(): Unit = LogCleaner.MetricNames.foreach(name => assertNotNull(KafkaYammerMetrics.defaultRegistry.allMetrics().get(logCleaner.metricsGroup .metricName(name, java.util.Collections.emptyMap())), s"$name is gone?")) try { check() logCleaner.reconfigure(new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181")), new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181"))) check() } finally logCleaner.shutdown() } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16522) Admin client changes for adding and removing voters
[ https://issues.apache.org/jira/browse/KAFKA-16522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838347#comment-17838347 ] José Armando García Sancio commented on KAFKA-16522: [~phong260702] thanks for assigning this issue to yourself. I updated the issue to show that it is currently blocked by two other issues that need to be implemented first. > Admin client changes for adding and removing voters > --- > > Key: KAFKA-16522 > URL: https://issues.apache.org/jira/browse/KAFKA-16522 > Project: Kafka > Issue Type: Sub-task >Reporter: José Armando García Sancio >Assignee: Quoc Phong Dang >Priority: Major > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes#KIP853:KRaftControllerMembershipChanges-Admin -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16405) Mismatch assignment error when running consumer rolling upgrade system tests
[ https://issues.apache.org/jira/browse/KAFKA-16405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True resolved KAFKA-16405. --- Reviewer: Lucas Brutschy Resolution: Fixed > Mismatch assignment error when running consumer rolling upgrade system tests > > > Key: KAFKA-16405 > URL: https://issues.apache.org/jira/browse/KAFKA-16405 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > relevant to [https://github.com/apache/kafka/pull/15578] > > We are seeing: > {code:java} > > SESSION REPORT (ALL TESTS) > ducktape version: 0.11.4 > session_id: 2024-03-21--001 > run time: 3 minutes 24.632 seconds > tests run:7 > passed: 5 > flaky:0 > failed: 2 > ignored: 0 > > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=classic > status: PASS > run time: 24.599 seconds > > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 26.638 seconds > AssertionError("Mismatched assignment: {frozenset(), > frozenset({TopicPartition(topic='test_topic', partition=3), > TopicPartition(topic='test_topic', partition=0), > TopicPartition(topic='test_topic', partition=1), > TopicPartition(topic='test_topic', partition=2)})}") > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", > line 77, in rolling_update_test > self._verify_range_assignment(consumer) > File > "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", > line 38, in _verify_range_assignment > assert assignment == set([ > AssertionError: Mismatched assignment: {frozenset(), > frozenset({TopicPartition(topic='test_topic', partition=3), > TopicPartition(topic='test_topic', partition=0), > TopicPartition(topic='test_topic', partition=1), > TopicPartition(topic='test_topic', partition=2)})} > > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False > status: PASS > run time: 29.815 seconds > > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True > status: PASS > run time: 29.766 seconds > > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic > status: PASS > run time: 30.086 seconds > > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 35.965 seconds > AssertionError("Mismatched assignment: {frozenset(), > frozenset({TopicPartition(topic='test_topic', partition=3), > TopicPartition(topic='test_topic', partition=0), > TopicPartition(topic='test_topic', partition=1), > TopicPartition(topic='test_topic', partition=2)})}") > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run >
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? yes, we want to address this case too. And, the issue can also happen during clean preferred-leader-election: ``` Call stack: The replica (1002) has full data but HW is invalid, then the fetch-offset will be equal to LeaderLog(1001).highWatermark Leader (1001): KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLocalLog Partition.fetchRecords Partition.updateFollowerFetchState Partition.maybeExpandIsr Partition.submitAlterPartition ... ... ... # If there is not enough data to respond and there is no remote data, we will let the fetch request wait for new data. # parks the request in the DelayedFetchPurgatory Another thread, runs Preferred-Leader-Election in controller (1003), since the replica 1002 joined the ISR list, it can be elected as the preferred leader. The controller sends LeaderAndIsr requests to all the brokers. KafkaController.processReplicaLeaderElection KafkaController.onReplicaElection PartitionStateMachine.handleStateChanges PartitionStateMachine.doHandleStateChanges PartitionStateMachine.electLeaderForPartitions ControllerChannelManager.sendRequestsToBrokers Replica 1002 got elected as Leader and have invalid highWatermark since it didn't process the fetch-response from the previous leader 1001, throws OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that in LeaderAndIsr request even if one partition fails, then the remaining partitions in that request won't be processed. KafkaApis.handleLeaderAndIsrRequest ReplicaManager.becomeLeaderOrFollower ReplicaManager.makeLeaders Partition.makeLeader Partition.maybeIncrementLeaderHW UnifiedLog.maybeIncrementHighWatermark (LeaderLog) UnifiedLog.fetchHighWatermarkMetadata The controller assumes that the current-leader for the tp0 is 1002, but the broker 1002 couldn't process the LISR. The controller retries the LISR until the broker 1002 becomes leader for tp0. During this time, the producers won't be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION error-code to the producer. During this time, if a follower sends the FETCH request to read from the current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned by the leader: KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLog Partition.fetchRecords Partition.readRecords UnifiedLog.read UnifiedLog.fetchHighWatermarkMetadata UnifiedLog.convertToOffsetMetadataOrThrow LocalLog.convertToOffsetMetadataOrThrow LocalLog.read ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1569178614 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,50 +1216,63 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) +} + } - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { -cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -resumeCleaning(topicPartition) + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { +val topicPartition = destLog.topicPartition +info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") Review Comment: `sourceLog` could be empty now, so maybe we need to revise the log message ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + if (cleaner != null) { Review Comment: we can replace this `abortAndPauseCleaning(tp)` ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + if (cleaner != null) { +cleaner.abortAndPauseCleaning(tp) + } + + replaceCurrentWithFutureLog(currentLog, futureLog) + + info(s"Successfully renamed abandoned future log for $tp") Review Comment: line#1189 will print `info(s"The current replica is successfully replaced with the future replica for $topicPartition")` and that is not much related to fact, right? ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + if (cleaner != null) { Review Comment: I guess `cleaner.abortAndPauseCleaning` is added because we call `resumeCleaning` later, and it will cause error if we don't call `abortAndPauseCleaning` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1569269269 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2117,6 +2117,20 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult getPartitionElrUpdatesForConfigChanges(Optional topicName) { +if (!isElrEnabled()) return ControllerResult.of(Collections.emptyList(), null); + +List records = new ArrayList<>(); +if (topicName.isPresent()) { +generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, NO_LEADER, NO_LEADER, records, + brokersToElrs.partitionsWithElr(topicsByName.get(topicName.get(; +} else { +generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, NO_LEADER, NO_LEADER, records, +brokersToElrs.partitionsWithElr()); +} Review Comment: @CalvinConfluent makes sense, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16003) The znode /config/topics is not updated during KRaft migration in "dual-write" mode
[ https://issues.apache.org/jira/browse/KAFKA-16003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-16003: - Fix Version/s: 3.7.1 > The znode /config/topics is not updated during KRaft migration in > "dual-write" mode > --- > > Key: KAFKA-16003 > URL: https://issues.apache.org/jira/browse/KAFKA-16003 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 3.6.1 >Reporter: Paolo Patierno >Assignee: Mickael Maison >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > I tried the following scenario ... > I have a ZooKeeper-based cluster and create a my-topic-1 topic (without > specifying any specific configuration for it). The correct znodes are created > under /config/topics and /brokers/topics. > I start a migration to KRaft but not moving forward from "dual write" mode. > While in this mode, I create a new my-topic-2 topic (still without any > specific config). I see that a new znode is created under /brokers/topics but > NOT under /config/topics. It seems that the KRaft controller is not updating > this information in ZooKeeper during the dual-write. The controller log shows > that the write to ZooKeeper was done, but not everything I would say: > {code:java} > 2023-12-13 10:23:26,229 TRACE [KRaftMigrationDriver id=3] Create Topic > my-topic-2, ID Macbp8BvQUKpzmq2vG_8dA. Transitioned migration state from > ZkMigrationLeadershipState{kraftControllerId=3, kraftControllerEpoch=7, > kraftMetadataOffset=445, kraftMetadataEpoch=7, > lastUpdatedTimeMs=1702462785587, migrationZkVersion=236, controllerZkEpoch=3, > controllerZkVersion=3} to ZkMigrationLeadershipState{kraftControllerId=3, > kraftControllerEpoch=7, kraftMetadataOffset=445, kraftMetadataEpoch=7, > lastUpdatedTimeMs=1702462785587, migrationZkVersion=237, controllerZkEpoch=3, > controllerZkVersion=3} > (org.apache.kafka.metadata.migration.KRaftMigrationDriver) > [controller-3-migration-driver-event-handler] > 2023-12-13 10:23:26,229 DEBUG [KRaftMigrationDriver id=3] Made the following > ZK writes when handling KRaft delta: {CreateTopic=1} > (org.apache.kafka.metadata.migration.KRaftMigrationDriver) > [controller-3-migration-driver-event-handler] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
junrao commented on code in PR #15673: URL: https://github.com/apache/kafka/pull/15673#discussion_r1569221453 ## clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java: ## @@ -70,7 +70,7 @@ else if (isolationLevel == IsolationLevel.READ_COMMITTED) minVersion = 2; else if (requireTimestamp) minVersion = 1; -return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel); +return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(false), CONSUMER_REPLICA_ID, isolationLevel); Review Comment: Hmm, does that mean we can't test the latest unstable version in the client? ## core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala: ## @@ -219,7 +219,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { TestUtils.generateAndProduceMessages(servers, topic, 9) TestUtils.produceMessage(servers, topic, "test-10", System.currentTimeMillis() + 10L) -for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion) { +for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion(false)) { Review Comment: Do we have tests that enable the latest unstable version? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
mimaison commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1569237791 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -80,10 +80,10 @@ class ZkAdminManager(val config: KafkaConfig, private val configHelper = new ConfigHelper(metadataCache, config, new ZkConfigRepository(adminZkClient)) private val createTopicPolicy = -Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[CreateTopicPolicy])) +Option(config.getConfiguredInstance(CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, classOf[CreateTopicPolicy])) private val alterConfigPolicy = -Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[AlterConfigPolicy])) +Option(config.getConfiguredInstance(CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, classOf[AlterConfigPolicy])) Review Comment: This one was correct! We need to undo this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1569228945 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + +// Complete async commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); Review Comment: Interesting. Good to know. Fuzzy temporal logic 臘♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1569232314 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + +// Complete async commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); +final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); +commitEvent.future().complete(null); + +// Commit async is completed, so commit sync completes immediately (since offsets are empty) +assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); +} + +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() { +final TopicPartition tp = new TopicPartition("foo", 0); +testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp); + +// Complete async commit event and sync commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); +final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); +commitEvent.future().complete(null); + +// Commit async is completed, so commit sync completes immediately (since offsets are empty) +assertDoesNotThrow(() -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), Duration.ofMillis(100))); +} + +@Test +public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() { +final TopicPartition tp = new TopicPartition("foo", 0); +testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp); + +// Complete exceptionally async commit event and sync commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); +final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); +commitEvent.future().completeExceptionally(new KafkaException("Test exception")); + +// Commit async is completed exceptionally, but this will be handled by commit callback - commit sync should not fail. +assertDoesNotThrow(() -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), Duration.ofMillis(100))); +} + +private void testSyncCommitTimesoutAfterIncompleteAsyncCommit(TopicPartition tp) { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); Review Comment: Yes, that's what I meant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1569231774 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); Review Comment: When I see duplicate (or nearly duplicated) code, my brain turns up its sensitivity because I assume there's some devil-in-the-details reason that the code wasn't reused. I'd assumed that it could be ever-so-slightly refactored into using `testSyncCommitTimesoutAfterIncompleteAsyncCommit()`, but no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1569226961 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -793,9 +795,9 @@ public void commitAsync(Map offsets, OffsetCo } private CompletableFuture commit(final CommitEvent commitEvent) { -maybeThrowFencedInstanceException(); -maybeInvokeCommitCallbacks(); maybeThrowInvalidGroupIdException(); +maybeThrowFencedInstanceException(); +offsetCommitCallbackInvoker.executeCallbacks(); Review Comment: Works for me. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
kirktrue commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1569217494 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { -ApplicationEvent invokedEvent = invokeRebalanceCallbacks( +ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); +if (invokedEvent.error().isPresent()) { +throw invokedEvent.error().get(); Review Comment: `ConsumerRebalanceListenerCallbackNeededEvent` handles 'assign', 'revoke', and 'lose' callbacks. It was my understanding—I could be wrong—that we wanted to _wait_ to throw the exception after the reconciliation was fully processed. That is, not necessarily on the first callback 樂 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { -ApplicationEvent invokedEvent = invokeRebalanceCallbacks( +ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); +if (invokedEvent.error().isPresent()) { +throw invokedEvent.error().get(); Review Comment: However, this implementation has the interesting property that it will both throw the exception _and_ continue processing. It seems like this could potentially yield **two** exceptions, if, say, both the `onPartitionsRevoked()` _and_ `onPartitionsAssigned()` threw exceptions. Is that the intent? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: move topic configuration defaults [kafka]
cmccabe closed pull request #10589: MINOR: move topic configuration defaults URL: https://github.com/apache/kafka/pull/10589 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1569214320 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2117,6 +2117,20 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult getPartitionElrUpdatesForConfigChanges(Optional topicName) { +if (!isElrEnabled()) return ControllerResult.of(Collections.emptyList(), null); + +List records = new ArrayList<>(); +if (topicName.isPresent()) { +generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, NO_LEADER, NO_LEADER, records, + brokersToElrs.partitionsWithElr(topicsByName.get(topicName.get(; +} else { +generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, NO_LEADER, NO_LEADER, records, +brokersToElrs.partitionsWithElr()); +} Review Comment: Yes, we should only update the partition(remove the ELRs) when min isr decreases. Because if the min isr decreases, the partition can advance the HWM with fewer ISR members, this can invalidate the ELR as a potential leader. Sure, I can add them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1569214270 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3003,9 +3008,10 @@ class KafkaApis(val requestChannel: RequestChannel, } } +// Forwarding has not happened yet, so handle both ZK and KRaft cases here if (remaining.resources().isEmpty) { sendResponse(Some(new IncrementalAlterConfigsResponseData())) -} else if ((!request.isForwarded) && metadataSupport.canForward()) { +} else if ((!request.isForwarded) && metadataSupport.canForward() && isKRaftController) { Review Comment: It's kind of inconsequential that the ZK controller was handling a forwarded IncrementalAlterConfigs. All of the IAC handling is done in KafkaApis and never even checks if it's the active controller. Basically when handling the broker-specific configs, the broker handling the forwarded request (i.e., the ZK controller) would see that the broker ID didn't match and fail. TBH, a better fix would be to always forward to the ZK controller, and update the ZK brokers to react to the ZNode change event to process BROKER and BROKER_LOGGER changes. But that's a much bigger change :-/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix some flaky behavior in DeleteConsumerGroupsTest and clean up [kafka]
cmccabe closed pull request #8115: MINOR: Fix some flaky behavior in DeleteConsumerGroupsTest and clean up URL: https://github.com/apache/kafka/pull/8115 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14370: Simplify the ImageWriter API by adding freeze [kafka]
cmccabe closed pull request #12833: KAFKA-14370: Simplify the ImageWriter API by adding freeze URL: https://github.com/apache/kafka/pull/12833 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1569210550 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: Make sense, I have some misunderstanding about the controller events. Will update. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1569208039 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerRegistration = getControllerRegistration match { + case Some(registration) => registration + case None => +// This case is mainly here to make tests less flaky. In practice, there will always be a /controller ZNode +throw new ControllerMovedException(s"Cannot set entity configs when there is no controller.") +} + +// If there is a KRaft controller defined, don't even attempt this write. The broker will soon get a UMR +// from the new KRaft controller that lets it know about the new controller. It will then forward +// IncrementalAlterConfig requests instead of processing directly. +if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) { + throw new ControllerMovedException(s"Cannot set entity configs directly when there is a KRaft controller.") +} Review Comment: I don't think this method is in the dual-write path. All of the ZK writes for configs in dual-write mode should happen in ZkConfigMigrationClient -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-16570: --- Description: When we want to fence a producer using the admin client, we send an InitProducerId request. There is logic in that API to fence (and abort) any ongoing transactions and that is what the API relies on to fence the producer. However, this handling also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because we want to actually get a new producer ID and want to retry until the the ID is supplied or we time out. [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] In the case of fence producer, we don't retry and instead we have no handling for concurrent transactions and log a message about an unexpected error. [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] This is not unexpected though and the operation was successful. We should just swallow this error and treat this as a successful run of the command. was: When we want to fence a producer using the admin client, we send an InitProducerId request. There is logic in that API to fence (and abort) any ongoing transactions and that is what the API relies on to fence the producer. However, this handling also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because we want to actually get a new producer ID and want to retry until the the ID is supplied or we time out. [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] In the case of fence producer, we don't retry and instead we have no handling for concurrent transactions and log a message about an unexpected error. [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] This is not unexpected though and the operation was successful. We should just swallow this error and treat this as a successful run of the command. > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]
ableegoldman commented on PR #13283: URL: https://github.com/apache/kafka/pull/13283#issuecomment-2061827689 No pressure! Just wanted to make sure you weren't still waiting on me for this. Or rather, just wanted to say, if you do want to try and get this in to 3.8 I will make sure to help you. But it's no problem if you don't have time or have other things on your plate Again, just sorry this one was neglected for so long. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]
C0urante commented on code in PR #15728: URL: https://github.com/apache/kafka/pull/15728#discussion_r1569197938 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java: ## @@ -169,6 +170,7 @@ static Map sourceConsumerConfig(Map props) { result.putAll(Utils.entriesWithPrefix(props, CONSUMER_CLIENT_PREFIX)); result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX + CONSUMER_CLIENT_PREFIX)); result.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); +result.put(ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); Review Comment: While this may be more ideal for many use cases, it's a potentially-breaking change that I don't believe we can make without waiting for a major release (right now, that'd be 4.0.0), and an accepted KIP. If you're unfamiliar with the KIP process, you can read more about it here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-Purpose -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16461: New consumer fails to consume records in security_test.py system test [kafka]
kirktrue opened a new pull request, #15746: URL: https://github.com/apache/kafka/pull/15746 The system test was failing because the `VerifiableConsumer` failed with a `NullPointerException` during startup. The reason for the NPE was an attempt to put a `null` as the value of `--group-remote-assignor` in the `Consumer`'s configuration. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
akhileshchg commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1569190991 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerRegistration = getControllerRegistration match { + case Some(registration) => registration + case None => +// This case is mainly here to make tests less flaky. In practice, there will always be a /controller ZNode +throw new ControllerMovedException(s"Cannot set entity configs when there is no controller.") +} + +// If there is a KRaft controller defined, don't even attempt this write. The broker will soon get a UMR +// from the new KRaft controller that lets it know about the new controller. It will then forward +// IncrementalAlterConfig requests instead of processing directly. +if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) { + throw new ControllerMovedException(s"Cannot set entity configs directly when there is a KRaft controller.") +} Review Comment: Wouldn't this fail when KRaft controller tries to perform dual write to both KRaft log and Zk ? ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3003,9 +3008,10 @@ class KafkaApis(val requestChannel: RequestChannel, } } +// Forwarding has not happened yet, so handle both ZK and KRaft cases here if (remaining.resources().isEmpty) { sendResponse(Some(new IncrementalAlterConfigsResponseData())) -} else if ((!request.isForwarded) && metadataSupport.canForward()) { +} else if ((!request.isForwarded) && metadataSupport.canForward() && isKRaftController) { Review Comment: I have one question. Why can't the Zk controller take care of the forwarded alterConfig requests for other BROKER config entities? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
gaurav-narula commented on code in PR #15745: URL: https://github.com/apache/kafka/pull/15745#discussion_r1569160539 ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -93,7 +93,7 @@ public List getAdditionalExtensions() { TestKitNodes nodes = new TestKitNodes.Builder(). setBootstrapMetadataVersion(clusterConfig.metadataVersion()). setCombined(isCombined). -setNumBrokerNodes(clusterConfig.numBrokers()). +setNumBrokerNodes(clusterConfig.numBrokers(), clusterConfig.numDisksPerBroker()). Review Comment: ```suggestion setBrokerNodes(clusterConfig.numBrokers(), clusterConfig.numDisksPerBroker()). ``` ## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ## @@ -114,6 +118,25 @@ public void testClusterTests() { } } +@ClusterTests({ +@ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, disksPerBroker = 1), +@ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, disksPerBroker = 2), +@ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, disksPerBroker = 2) +}) +public void testClusterTestWithDisksPerBroker() throws ExecutionException, InterruptedException { +Admin admin = clusterInstance.createAdminClient(); + +DescribeLogDirsResult result = admin.describeLogDirs(clusterInstance.brokerIds()); +result.allDescriptions().get().forEach((brokerId, logDirDescriptionMap) -> { +if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) { +Assertions.assertEquals(1, logDirDescriptionMap.size()); +} else if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.RAFT)) { +Assertions.assertEquals(2, logDirDescriptionMap.size()); +} else { +Assertions.fail("Unknown cluster type " + clusterInstance.clusterType()); +} +}); +} Review Comment: Nit: new line after the function -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2061751961 @FrankYang0529 Could you reduce the partition number of offsets topic? It seems the timeout is caused by that coordinator is waiting for the offset partition, and our CI could be too busy to complete the assignments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
junrao commented on PR #15631: URL: https://github.com/apache/kafka/pull/15631#issuecomment-2061739168 @kamalcph : Do you want this PR to be in 3.7 and 3.6? If so, could you create a separate cherry-pick RP? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
junrao merged PR #15631: URL: https://github.com/apache/kafka/pull/15631 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
junrao commented on PR #15631: URL: https://github.com/apache/kafka/pull/15631#issuecomment-2061732009 @kamalcph : Thanks for triaging the failed tests. The PR LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1569131419 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,50 +1216,61 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) +} + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { +val topicPartition = destLog.topicPartition +info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { -cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -resumeCleaning(topicPartition) - } +destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) +// the metrics tags still contain "future", so we have to remove it. +// we will add metrics back after sourceLog remove the metrics +destLog.removeLogMetrics() +if (updateHighWatermark && sourceLog.isDefined) { + destLog.updateHighWatermark(sourceLog.get.highWatermark) +} - try { -sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true) +// Now that future replica has been successfully renamed to be the current replica +// Update the cached map and log cleaner as appropriate. +futureLogs.remove(topicPartition) +currentLogs.put(topicPartition, destLog) +if (cleaner != null) { + cleaner.alterCheckpointDir(topicPartition, sourceLog.map(_.parentDirFile), destLog.parentDirFile) Review Comment: Addressed in [062e932](https://github.com/apache/kafka/pull/15136/commits/062e932f260ce9e1df9571b2fc982c63cbaf0f7c) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16573) Streams does not specify where a Serde is needed
Ayoub Omari created KAFKA-16573: --- Summary: Streams does not specify where a Serde is needed Key: KAFKA-16573 URL: https://issues.apache.org/jira/browse/KAFKA-16573 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 3.7.0 Reporter: Ayoub Omari Example topology: {code:java} builder .table("input", Consumed.`with`(Serdes.String(), Serdes.String())) .groupBy((key, value) => new KeyValue(value, key)) .count() .toStream() .to("output", Produced.`with`(Serdes.String(), Serdes.Long())) {code} At runtime, we get the following exception {code:java} Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG at org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92) at org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47) at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63) at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code} The error does not give information about the line or the processor causing the issue. Here a Grouped was missing inside the groupBy, but because the groupBy api doesn't force to define Grouped, this one can be missed, and it could be difficult to spot on a more complex topology. Also, for someone who needs control over serdes in the topology and doesn't want to define default serdes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
mimaison commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1569073857 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -79,10 +80,10 @@ class ZkAdminManager(val config: KafkaConfig, private val configHelper = new ConfigHelper(metadataCache, config, new ZkConfigRepository(adminZkClient)) private val createTopicPolicy = - Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy])) +Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[CreateTopicPolicy])) Review Comment: This should be `CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG` ## core/src/test/scala/unit/kafka/log/LogConfigTest.scala: ## @@ -399,18 +400,18 @@ class LogConfigTest { } } - /* Verify that when the deprecated config LogMessageTimestampDifferenceMaxMsProp has non default value the new configs - * LogMessageTimestampBeforeMaxMsProp and LogMessageTimestampAfterMaxMsProp are not changed from the default we are using + /* Verify that when the deprecated config LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_PROP has non default value the new configs Review Comment: `LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_PROP` has been renamed to `LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG` Same below ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1152,12 +1151,12 @@ class KafkaConfigTest { defaults.setProperty(KafkaConfig.BrokerIdProp, "1") defaults.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:1122") defaults.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3") -defaults.setProperty(KafkaConfig.LogDirProp, "/tmp1,/tmp2") -defaults.setProperty(KafkaConfig.LogRollTimeHoursProp, "12") -defaults.setProperty(KafkaConfig.LogRollTimeJitterHoursProp, "11") -defaults.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "10") -//For LogFlushIntervalMsProp -defaults.setProperty(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123") +defaults.setProperty(KafkaLogConfigs.LOG_DIR_CONFIG, "/tmp1,/tmp2") +defaults.setProperty(KafkaLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG, "12") +defaults.setProperty(KafkaLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG, "11") +defaults.setProperty(KafkaLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, "10") +//For LOG_FLUSH_INTERVAL_MS_PROP Review Comment: This has been renamed to `LOG_FLUSH_INTERVAL_MS_CONFIG` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
FrankYang0529 opened a new pull request, #15745: URL: https://github.com/apache/kafka/pull/15745 We introduced `disksPerBroker` in `TestKitNodes` from https://issues.apache.org/jira/browse/KAFKA-16559. We can support to config it in `ClusterTest`, so it's more convenient for integration tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1569103206 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerRegistration = getControllerRegistration match { + case Some(registration) => registration + case None => +// This case is mainly here to make tests less flaky. In practice, there will always be a /controller ZNode Review Comment: I found that the integration test would fail occasionally because the config change would happen before an active controller was seen (so zkVersion was 0). Adding this exception allows the test to retry to alter config call. In production, this could only be hit if someone was actively calling alter configs as the cluster was being deployed and the call was handled in between the time that the first broker came up and when the controller became active. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dajac commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1569077848 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -777,6 +778,59 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { +if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { +log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", +consumerGroup.groupId()); +return false; +} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { +log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() <= 1) { +log.info("Skip downgrading the consumer group {} to classic group because it's empty.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { +log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", +consumerGroup.groupId()); +} +return true; +} + +public CompletableFuture convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List records) { +consumerGroup.createGroupTombstoneRecords(records); +ClassicGroup classicGroup; +try { +classicGroup = consumerGroup.toClassicGroup( +leavingMemberId, +logContext, +time, +consumerGroupSessionTimeoutMs, +metadataImage, +records +); +} catch (SchemaException e) { +log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " + +"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e); + +throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", +consumerGroup.groupId(), e.getMessage())); +} + +groups.put(consumerGroup.groupId(), classicGroup); Review Comment: I think that we should explicitly remove the previous group before adding the new one because we update metrics when the previous group is removed. We could likely call `removeGroup` for this purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org