Re: [PR] KAFKA-15879: Add documentation and examples for docker image [kafka]
omkreddy commented on code in PR #14938: URL: https://github.com/apache/kafka/pull/14938#discussion_r1418474509 ## docker/examples/jvm/cluster/docker-compose.yml: ## @@ -0,0 +1,110 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +version: '1' Review Comment: It will be great if we can add a multi-node example with multiple listeners like SSL, SASL_SSL (plain mechanism) with authorizer configured. Can be added in another PR ## docker/examples/jvm/cluster/docker-compose.yml: ## @@ -0,0 +1,110 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +version: '1' Review Comment: can we also add compose file for multi node combined mode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
mdedetrich commented on PR #12728: URL: https://github.com/apache/kafka/pull/12728#issuecomment-1844822620 I have just pushed a merge commit which adds the `testCreateConnectorWithStoppedInitialState()` test. The translation of the test was quite straight forward, only thing to note is that the original test used `PowerMock.verifyAll();` as an escape hatch, I replaced it with `verify(loaderSwap).close();`, not sure if you want to add any more verifications (or even remove `verify(loaderSwap).close();` as being misleading). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15971: Re-enable consumer integration tests for new consumer [kafka]
dajac commented on PR #14925: URL: https://github.com/apache/kafka/pull/14925#issuecomment-1844810244 Triggered a new build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
dajac commented on PR #14774: URL: https://github.com/apache/kafka/pull/14774#issuecomment-1844806301 @junrao @jolshan I work on implementing the transactional offset commits in the new world. I will take care of adding the verification steps there when this PR is done. We can replicate the same pattern there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]
tledkov commented on code in PR #14856: URL: https://github.com/apache/kafka/pull/14856#discussion_r1418448844 ## tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java: ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumergroup; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class ConsumerGroupCommandOptions extends CommandDefaultOptions { +public static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommandOptions.class); + +public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to."; +public static final String GROUP_DOC = "The consumer group we wish to act on."; +public static final String TOPIC_DOC = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " + +"In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " + +"Reset-offsets also supports multiple topic inputs."; +public static final String ALL_TOPICS_DOC = "Consider all topics assigned to a group in the `reset-offsets` process."; +public static final String LIST_DOC = "List all consumer groups."; +public static final String DESCRIBE_DOC = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group."; +public static final String ALL_GROUPS_DOC = "Apply to all consumer groups."; +public static final String NL = System.getProperty("line.separator"); +public static final String DELETE_DOC = "Pass in groups to delete topic partition offsets and ownership information " + +"over the entire consumer group. For instance --group g1 --group g2"; +public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + +"to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " + +"or is going through some changes)."; +public static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client and Consumer."; +public static final String RESET_OFFSETS_DOC = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + NL + +"Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. " + +"Additionally, the --export option is used to export the results to a CSV format." + NL + +"You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " + +"--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL + +"To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'."; +public static final String DRY_RUN_DOC = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets."; +public static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets."; +public static final String EXPORT_DOC = "Export operation execution to a CSV file. Supported operations: reset-offsets."; +public static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset."; +public static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file."; +public static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: '-MM-DDTHH:mm:SS.sss'"; +public static final String RESET_BY_DURATION_DOC = "Reset offsets to offset by duration from current timestamp. Format:
Re: [PR] KAFKA-15903: Add Github Actions Workflow for RC Docker Image [kafka]
omkreddy merged PR #14940: URL: https://github.com/apache/kafka/pull/14940 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15880: Add Github Actions Workflow for promoting docker image [kafka]
omkreddy merged PR #14941: URL: https://github.com/apache/kafka/pull/14941 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]
tledkov commented on code in PR #14856: URL: https://github.com/apache/kafka/pull/14856#discussion_r1418426538 ## tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java: ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumergroup; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class ConsumerGroupCommandOptions extends CommandDefaultOptions { +public static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupCommandOptions.class); + +public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to."; +public static final String GROUP_DOC = "The consumer group we wish to act on."; +public static final String TOPIC_DOC = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " + +"In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " + +"Reset-offsets also supports multiple topic inputs."; +public static final String ALL_TOPICS_DOC = "Consider all topics assigned to a group in the `reset-offsets` process."; +public static final String LIST_DOC = "List all consumer groups."; +public static final String DESCRIBE_DOC = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group."; +public static final String ALL_GROUPS_DOC = "Apply to all consumer groups."; +public static final String NL = System.getProperty("line.separator"); Review Comment: ```suggestion public static final String NL = System.lineSeparator(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15980: add KIP-1001 CurrentControllerId metric [kafka]
cmccabe merged PR #14749: URL: https://github.com/apache/kafka/pull/14749 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15956: MetadataShell must take the log directory lock when reading [kafka]
cmccabe commented on PR #14899: URL: https://github.com/apache/kafka/pull/14899#issuecomment-1844406974 Jenkins had an internal issue. Re-pushing to get a clean build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: fix unit test [kafka]
mjsax merged PR #14947: URL: https://github.com/apache/kafka/pull/14947 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException [kafka]
github-actions[bot] commented on PR #13726: URL: https://github.com/apache/kafka/pull/13726#issuecomment-1844200113 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15178: Improve ConsumerCoordinator.poll perf [kafka]
github-actions[bot] commented on PR #13993: URL: https://github.com/apache/kafka/pull/13993#issuecomment-1844199926 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax opened a new pull request, #14951: URL: https://github.com/apache/kafka/pull/14951 Stacked on other PRs. Still not completed. But early feedback is welcome. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAKFA-15629: Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax merged PR #14898: URL: https://github.com/apache/kafka/pull/14898 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAKFA-15629: Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax commented on code in PR #14898: URL: https://github.com/apache/kafka/pull/14898#discussion_r1418202440 ## docs/streams/upgrade-guide.html: ## @@ -139,6 +139,10 @@ Streams API https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2;>KIP-985 extends this functionality by adding .withDescendingKeys() to allow user to receive data in descending order. + +https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery;>KIP-992 add two new query types, namely TimestampedKeyQuery and TimestampedRangeQuery. Both should be used to query a timestamped key-value store, to retrieve a ValueAndTimestamp result. The existing KeyQuery and RangeQuery are changes to always return the value only, also for timestamped key-value stores. Review Comment: ```suggestion https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery;>KIP-992 adds two new query types, namely TimestampedKeyQuery and TimestampedRangeQuery. Both should be used to query a timestamped key-value store, to retrieve a ValueAndTimestamp result. The existing KeyQuery and RangeQuery are changed to always return the value only for timestamped key-value stores. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
kirktrue commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1418183946 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) { } void cleanup() { +log.trace("Closing the consumer network thread"); +Timer timer = time.timer(closeTimeout); try { -log.trace("Closing the consumer network thread"); -Timer timer = time.timer(closeTimeout); -maybeAutocommitOnClose(timer); runAtClose(requestManagers.entries(), networkClientDelegate, timer); -maybeLeaveGroup(timer); } catch (Exception e) { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { +networkClientDelegate.awaitPendingRequests(timer); Review Comment: Network requests are tied to the `CompletableApplicationEvent`s, right? Can we just rely on the events to wait for their network I/O to complete via the `addAndGet()` method.? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java: ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class ConsumerCloseApplicationEvent extends CompletableApplicationEvent { Review Comment: Why not just have separate event types as per the rest of the codebase? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java: ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class ConsumerCloseApplicationEvent extends CompletableApplicationEvent { Review Comment: I'm happy to have a superclass for 'close' events, but having a type and a task gets a bit muddy, doesn't it? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -45,15 +46,19 @@ public class ApplicationEventProcessor extends EventProcessor private final Logger log; private final ConsumerMetadata metadata; private final RequestManagers requestManagers; +private final NetworkClientDelegate networkClientDelegate; Review Comment: I'm uncomfortable with introducing the `NetworkClientDelegate` at this layer. It's centralized in `ConsumerNetworkThread` for the reason that we can reason on where the various network I/O is performed. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -113,6 +118,10 @@ public void process(ApplicationEvent event) { process((UnsubscribeApplicationEvent) event); return; +case PREP_CLOSING: +processPrepClosingEvent((ConsumerCloseApplicationEvent) event); +return; + Review Comment: Any reason we can't have these as separate types like the other events? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -223,23 +232,54 @@ private void process(final TopicMetadataApplicationEvent event) { event.chain(future); }
[jira] [Commented] (KAFKA-12670) KRaft support for unclean.leader.election.enable
[ https://issues.apache.org/jira/browse/KAFKA-12670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793974#comment-17793974 ] Luke Chen commented on KAFKA-12670: --- [~cmccabe], do you have any idea when KRaft will support unclean leader election? > KRaft support for unclean.leader.election.enable > > > Key: KAFKA-12670 > URL: https://issues.apache.org/jira/browse/KAFKA-12670 > Project: Kafka > Issue Type: Task >Reporter: Colin McCabe >Assignee: Ryan Dielhenn >Priority: Major > > Implement KRaft support for the unclean.leader.election.enable > configurations. These configurations can be set at the topic, broker, or > cluster level. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: remove the use of ConsumerTestBuilder from MembershipManagerImplTest [kafka]
kirktrue commented on PR #14950: URL: https://github.com/apache/kafka/pull/14950#issuecomment-1843973958 @lucasbru—please add the `ctr` tag. If you can also review the code, even better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: remove the use of ConsumerTestBuilder from MembershipManagerImplTest [kafka]
kirktrue opened a new pull request, #14950: URL: https://github.com/apache/kafka/pull/14950 This is one in a series of changes to reduce/remove reliance on the `ConsumerTestBuilder` for CTR-related tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15984) Client disconnections can cause hanging transactions on __consumer_offsets
Justine Olshan created KAFKA-15984: -- Summary: Client disconnections can cause hanging transactions on __consumer_offsets Key: KAFKA-15984 URL: https://issues.apache.org/jira/browse/KAFKA-15984 Project: Kafka Issue Type: Task Reporter: Justine Olshan When investigating frequent hanging transactions on __consumer_offsets partitions, we realized that many of them were cause by the same offset being committed with duplicates and one with `"isDisconnectedClient":true`. TxnOffsetCommits do not have sequence numbers and thus are not protected against duplicates in the same way idempotent produce requests are. Thus, when a client is disconnected (and flushes its requests), we may see the duplicate get appended to the log. KIP-890 part 1 should protect against this as the duplicate will not succeed verification. KIP-890 part 2 strengthens this further as duplicates (from previous transactions) can not be added to new transactions if the partitions is re-added since the epoch will be bumped. Another possible solution is to do duplicate checking on the group coordinator side when the request comes in. This solution could be used instead of KIP-890 part 1 to prevent hanging transactions but given that part 1 only has one open PR remaining, we may not need to do this. However, this can also prevent duplicates from being added to a new transaction – something only part 2 will protect against. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect
[ https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793972#comment-17793972 ] Phuc Hong Tran edited comment on KAFKA-15556 at 12/7/23 12:51 AM: -- There is a duplicated method "nodeUnavailable" which have the same functionality as "isUnavailable" in the NetworkClientDelegate class was (Author: JIRAUSER301295): There is a duplicated method "nodeUnavailable" which have the same functionalit as "isUnavailable" in the NetworkClientDelegate class > Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, > and tryConnect > - > > Key: KAFKA-15556 > URL: https://issues.apache.org/jira/browse/KAFKA-15556 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor > > The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to > handle networking details in a more centralized way. However, in order to > reuse code between the existing {{KafkaConsumer}} and the new > {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the > {{NetworkClientDelegate}} capitulated and -stole- copied three methods from > {{ConsumerNetworkClient}} related to detecting node status: > # {{isUnavailable}} > # {{maybeThrowAuthFailure}} > # {{tryConnect}} > Unfortunately, these have found their way into the {{FetchRequestManager}} > and {{OffsetsRequestManager}} implementations. We should review if we can > clean up—or even remove—this leaky abstraction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect
[ https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793972#comment-17793972 ] Phuc Hong Tran commented on KAFKA-15556: There is a duplicated method "nodeUnavailable" which have the same functionalit as "isUnavailable" in the NetworkClientDelegate class > Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, > and tryConnect > - > > Key: KAFKA-15556 > URL: https://issues.apache.org/jira/browse/KAFKA-15556 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor > > The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to > handle networking details in a more centralized way. However, in order to > reuse code between the existing {{KafkaConsumer}} and the new > {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the > {{NetworkClientDelegate}} capitulated and -stole- copied three methods from > {{ConsumerNetworkClient}} related to detecting node status: > # {{isUnavailable}} > # {{maybeThrowAuthFailure}} > # {{tryConnect}} > Unfortunately, these have found their way into the {{FetchRequestManager}} > and {{OffsetsRequestManager}} implementations. We should review if we can > clean up—or even remove—this leaky abstraction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15983) Kafka-acls should return authorization already done if repeating work is issued
[ https://issues.apache.org/jira/browse/KAFKA-15983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen He updated KAFKA-15983: Summary: Kafka-acls should return authorization already done if repeating work is issued (was: Kafka-acls should return authorization already done if repeat work is issued) > Kafka-acls should return authorization already done if repeating work is > issued > --- > > Key: KAFKA-15983 > URL: https://issues.apache.org/jira/browse/KAFKA-15983 > Project: Kafka > Issue Type: Improvement > Components: security >Affects Versions: 3.6.0 >Reporter: Chen He >Priority: Minor > Labels: newbie > > kafka-acls.sh cmd will always issue normal operation for a cmd if customer > already authorized a user. It should reports something like "user {} already > authorized with {} resources" instead of do it again and again. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15983) Kafka-acls should return authorization already done if repeat work is issued
[ https://issues.apache.org/jira/browse/KAFKA-15983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen He updated KAFKA-15983: Labels: newbie (was: ) > Kafka-acls should return authorization already done if repeat work is issued > > > Key: KAFKA-15983 > URL: https://issues.apache.org/jira/browse/KAFKA-15983 > Project: Kafka > Issue Type: Improvement > Components: security >Affects Versions: 3.6.0 >Reporter: Chen He >Priority: Minor > Labels: newbie > > kafka-acls.sh cmd will always issue normal operation for a cmd if customer > already authorized a user. It should reports something like "user {} already > authorized with {} resources" instead of do it again and again. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15983) Kafka-acls should return authorization already done if repeat work is issued
Chen He created KAFKA-15983: --- Summary: Kafka-acls should return authorization already done if repeat work is issued Key: KAFKA-15983 URL: https://issues.apache.org/jira/browse/KAFKA-15983 Project: Kafka Issue Type: Improvement Components: security Affects Versions: 3.6.0 Reporter: Chen He kafka-acls.sh cmd will always issue normal operation for a cmd if customer already authorized a user. It should reports something like "user {} already authorized with {} resources" instead of do it again and again. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15208: Upgrade Jackson dependencies to version 2.15.3 [kafka]
bmscomp commented on PR #13662: URL: https://github.com/apache/kafka/pull/13662#issuecomment-1843889093 @mimaison Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1418129061 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) { } void cleanup() { Review Comment: really not much to do when shutting down the network thread - we will try one more time to send the unsent and poll the network client to make sure all requests and sent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1418128177 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1024,15 +1037,14 @@ private void close(Duration timeout, boolean swallowException) { final Timer closeTimer = time.timer(timeout); clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); - +// Prepare shutting down the network thread +prepareShutdown(closeTimer, firstException); Review Comment: Completing the tasks before shutting down the network thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
mdedetrich commented on PR #12728: URL: https://github.com/apache/kafka/pull/12728#issuecomment-1843872683 > @mdedetrich Thanks, the changes look good to go. But the build is failing because there are new tests on trunk that still use EasyMock/PowerMock. Can you rebase on the latest from trunk? Yup doing so now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
hachikuji commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r1418127368 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -941,39 +851,209 @@ class ReplicaManager(val config: KafkaConfig, } } - private def partitionEntriesForVerification(verificationGuards: mutable.Map[TopicPartition, VerificationGuard], - entriesPerPartition: Map[TopicPartition, MemoryRecords], - verifiedEntries: mutable.Map[TopicPartition, MemoryRecords], - unverifiedEntries: mutable.Map[TopicPartition, MemoryRecords], - errorEntries: mutable.Map[TopicPartition, Errors]): Unit= { -val transactionalProducerIds = mutable.HashSet[Long]() + /** + * Handles the produce request by starting any transactional verification before appending. + * + * @param timeout maximum time we will wait to append before returning + * @param requiredAcks number of replicas who must acknowledge the append before sending the response + * @param internalTopicsAllowed boolean indicating whether internal topics can be appended to + * @param originsource of the append request (ie, client, replication, coordinator) + * @param entriesPerPartition the records per partition to be appended + * @param responseCallback callback for sending the response + * @param delayedProduceLocklock for the delayed actions + * @param recordValidationStatsCallback callback for updating stats on record conversions + * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the + * thread calling this method + * @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default. + * @param verificationGuardsthe mapping from topic partition to verification guards if transaction verification is used + */ + def handleProduceAppend(timeout: Long, + requiredAcks: Short, + internalTopicsAllowed: Boolean, + origin: AppendOrigin, + transactionalId: String, + entriesPerPartition: Map[TopicPartition, MemoryRecords], + responseCallback: Map[TopicPartition, PartitionResponse] => Unit, + recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), + requestLocal: RequestLocal = RequestLocal.NoCaching, + actionQueue: ActionQueue = this.defaultActionQueue): Unit = { + +val transactionalProducerInfo = mutable.HashSet[(Long, Short)]() +val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]() entriesPerPartition.foreach { case (topicPartition, records) => - try { -// Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe. -val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) -transactionalBatches.foreach(batch => transactionalProducerIds.add(batch.producerId)) - -if (transactionalBatches.nonEmpty) { - // We return VerificationGuard if the partition needs to be verified. If no state is present, no need to verify. - val firstBatch = records.firstBatch - val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId, firstBatch.baseSequence, firstBatch.producerEpoch) - if (verificationGuard != VerificationGuard.SENTINEL) { -verificationGuards.put(topicPartition, verificationGuard) -unverifiedEntries.put(topicPartition, records) - } else -verifiedEntries.put(topicPartition, records) -} else { - // If there is no producer ID or transactional records in the batches, no need to verify. - verifiedEntries.put(topicPartition, records) -} - } catch { -case e: Exception => errorEntries.put(topicPartition, Errors.forException(e)) - } + // Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe. + val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) + transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch)) + if (!transactionalBatches.isEmpty)
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1843871662 Hi @kirktrue - I rewrote the previous PR based on your feedback. I thought driving the close via event is a better and clearer pattern, so thanks for the suggestions. Would you have time to take a look at this PR? @lucasbru - Thanks for reviewing the PR - I've decided according to your suggestion to use Kirk's approach to close the consumer. Let me know what do you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15215: docs for KIP-954 [kafka]
agavra opened a new pull request, #14949: URL: https://github.com/apache/kafka/pull/14949 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]
philipnee commented on PR #14920: URL: https://github.com/apache/kafka/pull/14920#issuecomment-1843868318 Thanks @lucasbru and @kirktrue - i'm closing this one and reopening another one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]
philipnee closed pull request #14920: KAFKA-15696: Refactor AsyncConsumer close procedure URL: https://github.com/apache/kafka/pull/14920 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax opened a new pull request, #14948: URL: https://github.com/apache/kafka/pull/14948 Part of KIP-714. Adds support to expose producer client instance id. This PR only adds support for thread producer and state-updater disabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1418122441 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1050,6 +1062,81 @@ private void close(Duration timeout, boolean swallowException) { } } +/** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ +private void prepareShutdown(final Timer timer, final AtomicReference firstException) { +if (!groupMetadata.isPresent()) +return; +maybeAutoCommitSync(timer, firstException); +timer.update(); +waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, timer.remainingMs()), timer, firstException); +maybeInvokeCommitCallbacks(); +maybeRevokePartitions(timer, firstException); +waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, timer.remainingMs()), timer, firstException); +} + +private void waitOnEventCompletion(final ConsumerCloseApplicationEvent event, + final Timer timer, + final AtomicReference firstException) { +try { +applicationEventHandler.addAndGet(event, timer); +} catch (TimeoutException e) { Review Comment: We don't really throw timeout exceptions during closing because if user tries to close with 0 duration then all ops will be timedout. The current implementation just polls, but since we cannot directly polls the client, we need to either wait till the future is completed or times out and keep going. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
aliehsaeedii commented on PR #14626: URL: https://github.com/apache/kafka/pull/14626#issuecomment-1843861522 > @aliehsaeedii @mjsax The last build has many failed tests: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14626/26/tests. I also see them in other builds now. Are they related to this PR? Thanks @dajac. We will merge our fixing PR after Jenkins builds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
jolshan commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r1418097344 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -349,146 +468,68 @@ class GroupMetadataManager(brokerId: Int, consumerId: String, offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, - transactionalId: String = null, producerId: Long = RecordBatch.NO_PRODUCER_ID, producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH, requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { -// first filter out partitions with offset metadata size exceeding limit -val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) => - validateOffsetMetadataLength(offsetAndMetadata.metadata) -} - group.inLock { if (!group.hasReceivedConsistentOffsetCommits) warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has received offset commits from consumers as well " + s"as transactional producers. Mixing both types of offset commits will generally result in surprises and " + s"should be avoided.") } -val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID -// construct the message set to append +val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) => + validateOffsetMetadataLength(offsetAndMetadata.metadata) +} if (filteredOffsetMetadata.isEmpty) { // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (k, _) => k -> Errors.OFFSET_METADATA_TOO_LARGE } responseCallback(commitStatus) -} else { - getMagic(partitionFor(group.groupId)) match { -case Some(magicValue) => - // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. - val timestampType = TimestampType.CREATE_TIME - val timestamp = time.milliseconds() - - val records = filteredOffsetMetadata.map { case (topicIdPartition, offsetAndMetadata) => -val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicIdPartition.topicPartition) -val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion) -new SimpleRecord(timestamp, key, value) - } - val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) - val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava)) - - if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2) -throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to make a transaction offset commit with an invalid magic: " + magicValue) - - val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L, time.milliseconds(), -producerId, producerEpoch, 0, isTxnOffsetCommit, RecordBatch.NO_PARTITION_LEADER_EPOCH) - - records.foreach(builder.append) - val entries = Map(offsetTopicPartition -> builder.build()) - - // set the callback function to insert offsets into cache after log append completed - def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { -// the append response should only contain the topics partition -if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition)) - throw new IllegalStateException("Append status %s should only have one partition %s" -.format(responseStatus, offsetTopicPartition)) - -// construct the commit response status and insert -// the offset and metadata to cache if the append status has no error -val status = responseStatus(offsetTopicPartition) - -val responseError = group.inLock { - if (status.error == Errors.NONE) { -if (!group.is(Dead)) { - filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) => -if (isTxnOffsetCommit) - group.onTxnOffsetCommitAppend(producerId, topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) -else - group.onOffsetCommitAppend(topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) - } -} - -// Record the number of offsets committed to the log -offsetCommitsSensor.record(records.size) - -
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
jolshan commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r1418096753 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -339,156 +340,183 @@ class GroupMetadataManager(brokerId: Int, delayedProduceLock = Some(group.lock), responseCallback = callback, requestLocal = requestLocal, - transactionalId = transactionalId) + verificationGuards = verificationGuards, + preAppendErrors = preAppendErrors) + } + + def generateOffsetRecords(magicValue: Byte, +isTxnOffsetCommit: Boolean, +groupId: String, +offsetTopicPartition: TopicPartition, +filteredOffsetMetadata: Map[TopicIdPartition, OffsetAndMetadata], +producerId: Long, +producerEpoch: Short): Map[TopicPartition, MemoryRecords] = { + // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. + val timestampType = TimestampType.CREATE_TIME + val timestamp = time.milliseconds() + + val records = filteredOffsetMetadata.map { case (topicIdPartition, offsetAndMetadata) => +val key = GroupMetadataManager.offsetCommitKey(groupId, topicIdPartition.topicPartition) +val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion) +new SimpleRecord(timestamp, key, value) + } + val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava)) + + if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2) +throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to make a transaction offset commit with an invalid magic: " + magicValue) + + val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L, time.milliseconds(), +producerId, producerEpoch, 0, isTxnOffsetCommit, RecordBatch.NO_PARTITION_LEADER_EPOCH) + + records.foreach(builder.append) + Map(offsetTopicPartition -> builder.build()) + } + + def createPutCacheCallback(isTxnOffsetCommit: Boolean, + group: GroupMetadata, + consumerId: String, + offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], + filteredOffsetMetadata: Map[TopicIdPartition, OffsetAndMetadata], + responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, + producerId: Long = RecordBatch.NO_PRODUCER_ID, + records: Map[TopicPartition, MemoryRecords], + preAppendErrors: Map[TopicPartition, LogAppendResult] = Map.empty): Map[TopicPartition, PartitionResponse] => Unit = { +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) +// set the callback function to insert offsets into cache after log append completed +def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { + // the append response should only contain the topics partition + if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition)) +throw new IllegalStateException("Append status %s should only have one partition %s" + .format(responseStatus, offsetTopicPartition)) + + // construct the commit response status and insert + // the offset and metadata to cache if the append status has no error + val status = responseStatus(offsetTopicPartition) + + val responseError = group.inLock { +if (status.error == Errors.NONE) { + if (!group.is(Dead)) { +filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) => + if (isTxnOffsetCommit) +group.onTxnOffsetCommitAppend(producerId, topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) + else +group.onOffsetCommitAppend(topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) +} + } + + // Record the number of offsets committed to the log + offsetCommitsSensor.record(records.size) + + Errors.NONE +} else { + if (!group.is(Dead)) { +if (!group.hasPendingOffsetCommitsFromProducer(producerId)) + removeProducerGroup(producerId, group.groupId) +filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) => + if (isTxnOffsetCommit) +group.failPendingTxnOffsetCommit(producerId, topicIdPartition) + else +
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
jolshan commented on PR #14774: URL: https://github.com/apache/kafka/pull/14774#issuecomment-1843841126 @junrao Thanks for taking a look. I was just rewriting the code to make this clearer. I will take a look at @artemlivshits and your questions about locking now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
jolshan commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r1418093193 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -708,23 +708,40 @@ class KafkaApis(val requestChannel: RequestChannel, } } -if (authorizedRequestInfo.isEmpty) - sendResponseCallback(Map.empty) -else { - val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID +val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID +val transactionVerificationEntries = new ReplicaManager.TransactionVerificationEntries - // call the replica manager to append messages to the replicas +def postVerificationCallback(newRequestLocal: RequestLocal) Review Comment: The new code fixes this. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -941,39 +857,129 @@ class ReplicaManager(val config: KafkaConfig, } } - private def partitionEntriesForVerification(verificationGuards: mutable.Map[TopicPartition, VerificationGuard], - entriesPerPartition: Map[TopicPartition, MemoryRecords], - verifiedEntries: mutable.Map[TopicPartition, MemoryRecords], - unverifiedEntries: mutable.Map[TopicPartition, MemoryRecords], - errorEntries: mutable.Map[TopicPartition, Errors]): Unit= { + private def sendInvalidRequiredAcksResponse(entries: Map[TopicPartition, MemoryRecords], + responseCallback: Map[TopicPartition, PartitionResponse] => Unit): Unit = { +// If required.acks is outside accepted range, something is wrong with the client +// Just return an error and don't handle the request at all +val responseStatus = entries.map { case (topicPartition, _) => + topicPartition -> new PartitionResponse( +Errors.INVALID_REQUIRED_ACKS, +LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset, +RecordBatch.NO_TIMESTAMP, +LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset + ) +} +responseCallback(responseStatus) + } + + /** + * Apply the postVerificationCallback asynchronously only after verifying the partitions have been added to the transaction. + * The postVerificationCallback takes the arguments of the requestLocal for the thread that will be doing the append as + * well as a mapping of topic partitions to LogAppendResult for the partitions that saw errors when verifying. + * + * This method will start the verification process for all the topicPartitions in entriesPerPartition and supply the + * postVerificationCallback to be run on a request handler thread when the response is received. + * + * @param entriesPerPartitionthe records per partition to be appended and therefore need verification + * @param transactionVerificationEntries the object that will store the entries to verify, the errors, and the verification guards + * @param transactionalIdthe id for the transaction + * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the + * thread calling this method + * @param postVerificationCallback the method to be called when verification completes and the verification errors + * and the thread's RequestLocal are supplied + */ + def appendRecordsWithTransactionVerification(entriesPerPartition: Map[TopicPartition, MemoryRecords], + transactionVerificationEntries: TransactionVerificationEntries, + transactionalId: String, + requestLocal: RequestLocal, + postVerificationCallback: RequestLocal => Map[TopicPartition, LogAppendResult] => Unit): Unit = { +if (transactionalId != null && config.transactionPartitionVerificationEnable && addPartitionsToTxnManager.isDefined) + partitionEntriesForVerification(transactionVerificationEntries, entriesPerPartition) + +val onVerificationComplete: (RequestLocal, Map[TopicPartition, Errors]) => Unit = + executePostVerificationCallback( +transactionVerificationEntries, +postVerificationCallback, + ) + +if (transactionVerificationEntries.unverified.isEmpty) { + onVerificationComplete(requestLocal, transactionVerificationEntries.errors.toMap) +} else { + // For unverified entries, send a request to verify. When verified, the append process will proceed via the callback. + // We verify above that all partitions use the same producer ID. + val batchInfo =
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1843813913 @junrao: Thanks for revewing this. It seems there are currently some issues with faling tests. See https://github.com/apache/kafka/pull/14838#issuecomment-1843693525 The test failures for this PR don't seem to be related to the changes, but to be sure we can also wait a bit longer and see if the state of the build improves. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: fix unit test [kafka]
mjsax commented on code in PR #14947: URL: https://github.com/apache/kafka/pull/14947#discussion_r1418059234 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java: ## @@ -230,13 +230,19 @@ public void shouldFindAll() { segmentValue.findAll(testCase.records.get(testCase.records.size() - 1).timestamp, testCase.records.get(0).timestamp); int i = 0; +int index = 0; for (final TestRecord expectedRecord : testCase.records) { -final long expectedValidTo = i == 0 ? testCase.nextTimestamp : testCase.records.get(i - 1).timestamp; -assertThat(results.get(i).index(), equalTo(i)); +if (expectedRecord.value == null) { // should not return tombstones +index++; +continue; +} +final long expectedValidTo = index== 0 ? testCase.nextTimestamp : testCase.records.get(index - 1).timestamp; Review Comment: ```suggestion final long expectedValidTo = index == 0 ? testCase.nextTimestamp : testCase.records.get(index - 1).timestamp; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
wcarlson5 commented on code in PR #14935: URL: https://github.com/apache/kafka/pull/14935#discussion_r1418058902 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -1477,6 +1528,27 @@ public Object getStateLock() { return stateLock; } +public Map> consumerClientInstanceIds(final Duration timeout) { Review Comment: and here ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -454,4 +485,19 @@ public void shutdown() { public Map consumerMetrics() { return Collections.unmodifiableMap(globalConsumer.metrics()); } + +public KafkaFuture globalConsumerInstanceId(final Duration timeout) { Review Comment: Same comment here if you don't mind -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
wcarlson5 commented on code in PR #14922: URL: https://github.com/apache/kafka/pull/14922#discussion_r1418054625 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -1802,7 +1805,7 @@ protected int processStreamThread(final Consumer consumer) { * @throws TimeoutException Indicates that a request timed out. * @throws StreamsException For any other error that might occur. */ -public ClientInstanceIds clientInstanceIds(final Duration timeout) { +public synchronized ClientInstanceIds clientInstanceIds(final Duration timeout) { Review Comment: I think this should take care of most of the threading issues, but it does leave it easy to introduce bugs in the future ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -1477,6 +1528,27 @@ public Object getStateLock() { return stateLock; } +public Map> consumerClientInstanceIds(final Duration timeout) { Review Comment: Can we add a comment here that this isn't thread safe? Just for the next person who tries to use it an introduces a nasty race condition. The fact that it returns futures might make people think it is ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -713,6 +727,48 @@ boolean runLoop() { return true; } +// visible for testing +void maybeGetClientInstanceIds() { Review Comment: That is actually okay as each time the call is made it overwrites `mainConsumerInstanceIdFuture` anyways so there is only ever one future to complete. Not that this isn't an issue but with the caller being synchronized it won't be for this feature -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
rondagostino commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1418016489 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -76,13 +83,69 @@ class ReplicaAlterLogDirsThread(name: String, futureLog.updateHighWatermark(partitionData.highWatermark) futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) -if (partition.maybeReplaceCurrentWithFutureReplica()) - removePartitions(Set(topicPartition)) +directoryEventHandler match { + case DirectoryEventHandler.NOOP => +if (partition.maybeReplaceCurrentWithFutureReplica()) + removePartitions(Set(topicPartition)) + case _ => +maybePromoteFutureReplica(topicPartition, partition) +} quota.record(records.sizeInBytes) logAppendInfo } + override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { +// Schedule assignment request to revert any queued request before cancelling +for { + topicPartition <- topicPartitions + partitionState <- partitionAssignmentRequestState(topicPartition) + if partitionState == QUEUED + partition = replicaMgr.getPartitionOrException(topicPartition) + topicId <- partition.topicId + directoryId <- partition.logDirectoryId() + topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) +} yield directoryEventHandler.handleAssignment(topicIdPartition, directoryId) + +super.removePartitions(topicPartitions) + } + + // Visible for testing + def updatedAssignmentRequestState(topicPartition: TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): Unit = { +assignmentRequestStates.put(topicPartition, state) + } + private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { Review Comment: nit: missing newline separator between defs ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -76,13 +83,69 @@ class ReplicaAlterLogDirsThread(name: String, futureLog.updateHighWatermark(partitionData.highWatermark) futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) -if (partition.maybeReplaceCurrentWithFutureReplica()) - removePartitions(Set(topicPartition)) +directoryEventHandler match { + case DirectoryEventHandler.NOOP => +if (partition.maybeReplaceCurrentWithFutureReplica()) + removePartitions(Set(topicPartition)) + case _ => +maybePromoteFutureReplica(topicPartition, partition) +} quota.record(records.sizeInBytes) logAppendInfo } + override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { +// Schedule assignment request to revert any queued request before cancelling +for { + topicPartition <- topicPartitions + partitionState <- partitionAssignmentRequestState(topicPartition) + if partitionState == QUEUED + partition = replicaMgr.getPartitionOrException(topicPartition) + topicId <- partition.topicId + directoryId <- partition.logDirectoryId() + topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) +} yield directoryEventHandler.handleAssignment(topicIdPartition, directoryId) Review Comment: Seems odd to use `yield` when `directoryEventHandler.handleAssignment()` is a void method. Is there a reason to use `yield`? ## core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala: ## @@ -268,6 +268,200 @@ class ReplicaAlterLogDirsThreadTest { assertEquals(0, thread.partitionCount) } + def updateAssignmentRequestState(thread: ReplicaAlterLogDirsThread, partitionId:Int, newState: ReplicaAlterLogDirsThread.DirectoryEventRequestState) = { +topicNames.get(topicId).map(topicName => { + thread.updatedAssignmentRequestState(new TopicPartition(topicName, partitionId))(newState) +}) + } + @Test Review Comment: nit: missing newline separator ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -76,13 +83,69 @@ class ReplicaAlterLogDirsThread(name: String, futureLog.updateHighWatermark(partitionData.highWatermark) futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) -if (partition.maybeReplaceCurrentWithFutureReplica()) - removePartitions(Set(topicPartition)) +directoryEventHandler match { + case DirectoryEventHandler.NOOP => +if (partition.maybeReplaceCurrentWithFutureReplica()) + removePartitions(Set(topicPartition)) + case _ => +maybePromoteFutureReplica(topicPartition, partition) +
Re: [PR] KAFKA-15978: Update member information on HB response [kafka]
AndrewJSchofield commented on PR #14945: URL: https://github.com/apache/kafka/pull/14945#issuecomment-1843752569 Investigating failure of new test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
rondagostino merged PR #14838: URL: https://github.com/apache/kafka/pull/14838 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15982) Move GenericGroup state metrics to `GroupCoordinatorMetricsShard`
Jeff Kim created KAFKA-15982: Summary: Move GenericGroup state metrics to `GroupCoordinatorMetricsShard` Key: KAFKA-15982 URL: https://issues.apache.org/jira/browse/KAFKA-15982 Project: Kafka Issue Type: Sub-task Reporter: Jeff Kim Assignee: Jeff Kim Currently, the generic group state metrics exist inside `GroupCoordinatorMetrics` as global metrics. This causes issues as during unload, we need to traverse through all groups and decrement the group size counters. Move the generic group state metrics to the shard level so that when a partition is unloaded we automatically remove the counter. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
junrao commented on PR #14774: URL: https://github.com/apache/kafka/pull/14774#issuecomment-1843729483 @jolshan : Thanks for the PR. Should we fix the `append` call in `CoordinatorRuntime` (https://github.com/apache/kafka/pull/14705) too? There, a partition level lock in `CoordinatorRuntime` is held while checking/updating the coordinator state and calling `append`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on PR #14838: URL: https://github.com/apache/kafka/pull/14838#issuecomment-1843728301 @rondagostino : I think we can merge this now because I don't think it's adding more failures. I do think we need to do another sweep looking for people calling `Exit.exit`, but that doesn't need to block here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15981) only record GenericGroup state metrics once per group during load
Jeff Kim created KAFKA-15981: Summary: only record GenericGroup state metrics once per group during load Key: KAFKA-15981 URL: https://issues.apache.org/jira/browse/KAFKA-15981 Project: Kafka Issue Type: Sub-task Reporter: Jeff Kim Assignee: Jeff Kim Currently, we increment generic group metrics whenever we create a new GenericGroup object while we load the partition. This is incorrect as the partition may contain several records for the same group if in the active segment or if the segment has not yet been compacted. Instead, increment the metrics for each group we successfully loaded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
gharris1727 commented on code in PR #14562: URL: https://github.com/apache/kafka/pull/14562#discussion_r1418003736 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -771,6 +778,104 @@ private Map defaultSinkConnectorProps(String topics) { return props; } +@Test +public void testRequestTimeouts() throws Exception { +final String configTopic = "test-request-timeout-configs"; +workerProps.put(CONFIG_TOPIC_CONFIG, configTopic); +// Workaround for KAFKA-15676, which can cause the scheduled rebalance delay to +// be spuriously triggered after the group coordinator for a Connect cluster is bounced +// Set to 1 instead of 0 as another workaround for KAFKA-15693, which can cause +// connectors and tasks to be unassigned indefinitely if the scheduled rebalance delay +// is set to 0 +workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1"); +connect = connectBuilder +.numBrokers(1) +.numWorkers(1) +.build(); +connect.start(); +connect.assertions().assertAtLeastNumWorkersAreUp(1, +"Worker did not start in time"); + +Map connectorConfig1 = defaultSourceConnectorProps(TOPIC_NAME); +Map connectorConfig2 = new HashMap<>(connectorConfig1); +connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 1)); + +// Create a connector to ensure that the worker has completed startup +log.info("Creating initial connector"); +connect.configureConnector(CONNECTOR_NAME, connectorConfig1); +connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( +CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start in time" +); + +// Bring down Kafka, which should cause some REST requests to fail +log.info("Stopping Kafka cluster"); +connect.kafka().stopOnlyKafka(); +// Allow for the workers to discover that the coordinator is unavailable, wait is +// heartbeat timeout * 2 + 4sec +Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + +connect.requestTimeout(5_000); +// Try to reconfigure the connector, which should fail with a timeout error +log.info("Trying to reconfigure connector while Kafka cluster is down"); +ConnectRestException e = assertThrows( +ConnectRestException.class, +() -> connect.configureConnector(CONNECTOR_NAME, connectorConfig2) +); +assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); +assertNotNull(e.getMessage()); +assertTrue( +"Message '" + e.getMessage() + "' does not match expected format", +e.getMessage().contains("Request timed out. The worker is currently flushing updates to the status topic") Review Comment: Ah I see. So the `polling the group coordinator for up to ... or until interrupted` stage is only temporary and `flushing updates to the status topic` is permanent, so it'll always eventually get stuck on this stage. Do you think it's possible for the preceeding `Thread.sleep()` to cause a flake here, if the worker is in the "polling the group coordinator" stage for too long? Perhaps we could replace the sleep with a wait-until-condition that repeatedly makes the request until the flushing status store error appears. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
rondagostino commented on PR #14838: URL: https://github.com/apache/kafka/pull/14838#issuecomment-1843693525 Looking at the [trunk build jobs](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/activity?branch=trunk): https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2432/tests completed 6 days ago with `25 tests have failed. There are 21 new tests failing, 4 existing failing ` https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2437/tests completed 5 days ago with `60 tests have failed. There are 36 new tests failing, 24 existing failing` https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2442/tests completed 3 days ago with `30 tests have failed. There are 14 new tests failing, 16 existing failing` https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2444/tests completed 2 days ago with `53 tests have failed. There are 37 new tests failing, 16 existing failing` https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2445/tests completed 2 days ago with `92 tests have failed. There are 17 new tests failing, 75 existing failing` https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2452/tests completed 11 hours ago with `111 tests have failed. There are 49 new tests failing, 62 existing failing` https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2453/tests completed 7 hours ago with `24 tests have failed. There are 24 new tests failing, 0 existing failing` https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2454/tests completed 1 hour ago with `101 tests have failed. There are 39 new tests failing, 62 existing failing` It feels to me like this is just flakiness -- either in the test or in the infrastructure. There's no rhyme or reason to it. I am inclined to commit this PR. WDYT @cmccabe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
rondagostino commented on PR #14838: URL: https://github.com/apache/kafka/pull/14838#issuecomment-1843668221 I think the issue is not with our PR. Here's another [PR that is basically a 2-line change in a single test](https://github.com/apache/kafka/pull/14944) and that has [144 test failures!](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14944/2/tests). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
rondagostino commented on PR #14838: URL: https://github.com/apache/kafka/pull/14838#issuecomment-1843665517 [Latest builds for JDKs 8 and 21 failed](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14838/16/pipeline/). Restarted the build: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14838/17/pipeline So frustrating. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15347: fix unit test [kafka]
aliehsaeedii opened a new pull request, #14947: URL: https://github.com/apache/kafka/pull/14947 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1417911671 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -364,10 +377,30 @@ class BrokerLifecycleManager( } _channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data), new BrokerRegistrationResponseHandler()) +communicationInFlight = true } + // the response handler is not invoked from the event handler thread, + // so it is not safe to update state here, instead, schedule an event + // to continue handling the response on the event handler thread private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { + eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false)) Review Comment: @cmccabe : Do you have concerns on the usage of `prepend` here? ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -166,6 +166,19 @@ class BrokerLifecycleManager( */ private var registered = false + /** + * True if a request has been sent and a response or timeout has not yet been processed. + * This variable can only be read or written from the event queue thread. + */ + private var communicationInFlight = false Review Comment: Yes, I agree that it's not straightforward. We can just leave the code as it is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1417650715 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -778,18 +783,23 @@ public void testGroupIdNull() { @Test public void testGroupIdNotNullAndValid() { +// close the default consumer +shutDown(); Review Comment: The test spins up another consumer so we should shutdown the BeforeEach one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1843583099 Hi @lianetm @cadonna @lucasbru - Thanks for taking the time reviewing my PR. I addressed your comments and left one for @cadonna. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1417898446 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -399,14 +400,45 @@ public void testHeartbeatState() { new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() -.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) -.setMemberId(memberId) -.setMemberEpoch(1) -.setAssignment(assignmentTopic1)); +.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) +.setMemberId(memberId) +.setMemberEpoch(1) +.setAssignment(assignmentTopic1)); membershipManager.onHeartbeatResponseReceived(rs1.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } +@Test +public void testEnsureLeaveGroupWhenPollTimerExpires() { +membershipManager.transitionToJoining(); Review Comment: you are 100% correct. I submit a patch for this specific test. Thanks for catching this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15978: Update member information on HB response [kafka]
lianetm commented on code in PR #14945: URL: https://github.com/apache/kafka/pull/14945#discussion_r1417895767 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -224,6 +224,13 @@ boolean canAutoCommit() { return autoCommitState.isPresent() && !subscriptions.allConsumed().isEmpty(); } +/** + * Updates the member ID and epoch upon receiving ConsumerGroupHeartbeatResponse. + */ +void updateMemberInformation(String memberId, int memberEpoch) { +groupState.generation = new GroupState.Generation(memberEpoch, memberId, null); +} Review Comment: Just for the record, with the fix in the commit PR the approach is having this CommitManager as a StateListener, that gets notified when epoch changes, so it gets the last value from the MembershipManager. I did the listener approach to avoid a circular dep (the MembershipMgr uses the CommitMgr to commit), and truly, this Commit does not need the whole MembershipMgr, it just needs to have the latest ID/epoch, so the listener seems appropiate. Anyways, let's have this conversation on the other PR when in review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15978: Update member information on HB response [kafka]
lianetm commented on code in PR #14945: URL: https://github.com/apache/kafka/pull/14945#discussion_r1417895767 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -224,6 +224,13 @@ boolean canAutoCommit() { return autoCommitState.isPresent() && !subscriptions.allConsumed().isEmpty(); } +/** + * Updates the member ID and epoch upon receiving ConsumerGroupHeartbeatResponse. + */ +void updateMemberInformation(String memberId, int memberEpoch) { +groupState.generation = new GroupState.Generation(memberEpoch, memberId, null); +} Review Comment: Just for the record, with the fix in the commit PR the approach is having this CommitManager as a StateListener, that gets notified when epoch changes, so it gets the last value from the MembershipManager. I did the listener approach to avoid a circular dep (the MembershipMgr uses the CommitMgr to commit), and truly, this Commit does not need the whole MembershipMgr, it just needs to have the latest ID/epoch, so the listener seemed enough. Anyways, let's have this conversation on the other PR when in review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15978: Update member information on HB response [kafka]
lianetm commented on code in PR #14945: URL: https://github.com/apache/kafka/pull/14945#discussion_r1417892970 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -224,6 +224,13 @@ boolean canAutoCommit() { return autoCommitState.isPresent() && !subscriptions.allConsumed().isEmpty(); } +/** + * Updates the member ID and epoch upon receiving ConsumerGroupHeartbeatResponse. + */ +void updateMemberInformation(String memberId, int memberEpoch) { +groupState.generation = new GroupState.Generation(memberEpoch, memberId, null); +} Review Comment: Yes, I'm on this and will come out in the commit PR, so if we want to unblock the e2e I'm ok with keeping this as temporary fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15978: Update member information on HB response [kafka]
dajac commented on code in PR #14945: URL: https://github.com/apache/kafka/pull/14945#discussion_r1417891269 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -224,6 +224,13 @@ boolean canAutoCommit() { return autoCommitState.isPresent() && !subscriptions.allConsumed().isEmpty(); } +/** + * Updates the member ID and epoch upon receiving ConsumerGroupHeartbeatResponse. + */ +void updateMemberInformation(String memberId, int memberEpoch) { +groupState.generation = new GroupState.Generation(memberEpoch, memberId, null); +} Review Comment: There are two other places where the member epoch is updated. Should we also call `updateMemberInformation` for those for completeness? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac commented on PR #14845: URL: https://github.com/apache/kafka/pull/14845#issuecomment-1843565757 It looks like the new failing tests are coming from https://github.com/apache/kafka/pull/14626. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]
nizhikov commented on code in PR #14471: URL: https://github.com/apache/kafka/pull/14471#discussion_r1417877630 ## tools/src/main/java/org/apache/kafka/tools/consumergroup/CsvUtils.java: ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumergroup; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +public class CsvUtils { +private final CsvMapper mapper = new CsvMapper(); + +ObjectReader readerFor(Class clazz) { +return mapper.readerFor(clazz).with(getSchema(clazz)); +} + +ObjectWriter writerFor(Class clazz) { +return mapper.writerFor(clazz).with(getSchema(clazz)); +} + +private CsvSchema getSchema(Class clazz) { +String[] fields; +if (CsvRecordWithGroup.class == clazz) +fields = CsvRecordWithGroup.FIELDS; +else if (CsvRecordNoGroup.class == clazz) +fields = CsvRecordNoGroup.FIELDS; +else +throw new IllegalStateException("Unhandled class " + clazz); + +return mapper.schemaFor(clazz).sortedBy(fields); +} + +public interface CsvRecord { +} + +public static class CsvRecordWithGroup implements CsvRecord { Review Comment: fixed. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Support to update Async Consumer group member label (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14946: URL: https://github.com/apache/kafka/pull/14946#discussion_r1417877859 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -321,6 +332,9 @@ public void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData respo this.memberEpoch = response.memberEpoch(); ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); +clientTelemetryReporter.ifPresent(reporter -> reporter.updateMetricsLabels( +Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, memberId))); Review Comment: @AndrewJSchofield I have added the check with comment, please see if it makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
dajac commented on PR #14626: URL: https://github.com/apache/kafka/pull/14626#issuecomment-1843551173 @aliehsaeedii @mjsax The last build has many failed tests: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14626/26/tests. I also see them in other builds now. Are they related to this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15972) Add support to exclude labels in client telemetry
[ https://issues.apache.org/jira/browse/KAFKA-15972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal resolved KAFKA-15972. --- Resolution: Done > Add support to exclude labels in client telemetry > - > > Key: KAFKA-15972 > URL: https://issues.apache.org/jira/browse/KAFKA-15972 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > > Some of the labels/tags which are present in metric should be skipped while > collecting telemetry as data might already be known to broker hence, we > should minimize the data transfer. One of such labels is client_id which is > already present in RequestContext hence broker can append that label prior > emitting metrics to telemetry backend. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15877) Support change of temporality in Java client
[ https://issues.apache.org/jira/browse/KAFKA-15877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal resolved KAFKA-15877. --- Resolution: Done > Support change of temporality in Java client > - > > Key: KAFKA-15877 > URL: https://issues.apache.org/jira/browse/KAFKA-15877 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > > Details: https://github.com/apache/kafka/pull/14620#discussion_r1401554867 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15684) Add support to describe all subscriptions through utility
[ https://issues.apache.org/jira/browse/KAFKA-15684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-15684. - Fix Version/s: 3.7.0 Resolution: Fixed merged the PR to trunk. > Add support to describe all subscriptions through utility > - > > Key: KAFKA-15684 > URL: https://issues.apache.org/jira/browse/KAFKA-15684 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > Fix For: 3.7.0 > > > Open PR to support client-metrics through kafka-configs.sh doesn't list all > subscriptions. The functionality is missing because of missing support to > list client subscription in config repository and admin client. This task > should provide a workaround to fetch all subscriptions from config repository > by adding a method in KRaftMetadataCache. Later a KIP might be needed to add > support in AdminClient. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15877) Support change of temporality in Java client
[ https://issues.apache.org/jira/browse/KAFKA-15877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793896#comment-17793896 ] Apoorv Mittal commented on KAFKA-15877: --- Changes were committed as part of PR: https://github.com/apache/kafka/pull/14620 https://github.com/apache/kafka/pull/14620#issuecomment-1826401674 > Support change of temporality in Java client > - > > Key: KAFKA-15877 > URL: https://issues.apache.org/jira/browse/KAFKA-15877 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > > Details: https://github.com/apache/kafka/pull/14620#discussion_r1401554867 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
junrao merged PR #14933: URL: https://github.com/apache/kafka/pull/14933 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15979) Add KIP-1001 CurrentControllerId metric
Colin McCabe created KAFKA-15979: Summary: Add KIP-1001 CurrentControllerId metric Key: KAFKA-15979 URL: https://issues.apache.org/jira/browse/KAFKA-15979 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe Assignee: Colin McCabe -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15980) Add KIP-1001 CurrentControllerId metric
Colin McCabe created KAFKA-15980: Summary: Add KIP-1001 CurrentControllerId metric Key: KAFKA-15980 URL: https://issues.apache.org/jira/browse/KAFKA-15980 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe Assignee: Colin McCabe -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on PR #14933: URL: https://github.com/apache/kafka/pull/14933#issuecomment-1843520738 > @apoorvmittal10 : Thanks for the updated PR. Just a minor comment. @junrao Thanks for again reviewing the PR. I have added thoughts for the comment and verified same locally as well. I have also verified the build and tests which failed, are not related to PR changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Support to update Async Consumer group member label (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14946: URL: https://github.com/apache/kafka/pull/14946#discussion_r1417849266 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -321,6 +332,9 @@ public void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData respo this.memberEpoch = response.memberEpoch(); ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); +clientTelemetryReporter.ifPresent(reporter -> reporter.updateMetricsLabels( +Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, memberId))); Review Comment: Rechecked the code and it seems the placement of update code is right but I can optimize with the check that there occurred a change in `memberId` prior updating it. Making the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15816) Typos in tests leak network sockets
[ https://issues.apache.org/jira/browse/KAFKA-15816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15816: Description: There are a few tests which leak network sockets due to small typos in the tests themselves. Clients: [https://github.com/apache/kafka/pull/14750] (DONE) * NioEchoServer * KafkaConsumerTest * KafkaProducerTest * SelectorTest * SslTransportLayerTest * SslTransportTls12Tls13Test * SslVersionsTransportLayerTest * SaslAuthenticatorTest Core: [https://github.com/apache/kafka/pull/14754] * MiniKdc * GssapiAuthenticationTest * MirrorMakerIntegrationTest * SocketServerTest * EpochDrivenReplicationProtocolAcceptanceTest * LeaderEpochIntegrationTest Trogdor: [https://github.com/apache/kafka/pull/14771] * AgentTest Mirror: [https://github.com/apache/kafka/pull/14761] (DONE) * DedicatedMirrorIntegrationTest * MirrorConnectorsIntegrationTest * MirrorConnectorsWithCustomForwardingAdminIntegrationTest Runtime: [https://github.com/apache/kafka/pull/14764] * ConnectorTopicsIntegrationTest * ExactlyOnceSourceIntegrationTest * WorkerTest * WorkerGroupMemberTest Streams: [https://github.com/apache/kafka/pull/14769] (DONE) * IQv2IntegrationTest * MetricsReporterIntegrationTest * NamedTopologyIntegrationTest * PurgeRepartitionTopicIntegrationTest These can be addressed by just fixing the tests. was: There are a few tests which leak network sockets due to small typos in the tests themselves. Clients: [https://github.com/apache/kafka/pull/14750] (DONE) * NioEchoServer * KafkaConsumerTest * KafkaProducerTest * SelectorTest * SslTransportLayerTest * SslTransportTls12Tls13Test * SslVersionsTransportLayerTest * SaslAuthenticatorTest Core: [https://github.com/apache/kafka/pull/14754] * MiniKdc * GssapiAuthenticationTest * MirrorMakerIntegrationTest * SocketServerTest * EpochDrivenReplicationProtocolAcceptanceTest * LeaderEpochIntegrationTest Trogdor: [https://github.com/apache/kafka/pull/14771] * AgentTest Mirror: [https://github.com/apache/kafka/pull/14761] * DedicatedMirrorIntegrationTest * MirrorConnectorsIntegrationTest * MirrorConnectorsWithCustomForwardingAdminIntegrationTest Runtime: [https://github.com/apache/kafka/pull/14764] * ConnectorTopicsIntegrationTest * ExactlyOnceSourceIntegrationTest * WorkerTest * WorkerGroupMemberTest Streams: [https://github.com/apache/kafka/pull/14769] (DONE) * IQv2IntegrationTest * MetricsReporterIntegrationTest * NamedTopologyIntegrationTest * PurgeRepartitionTopicIntegrationTest These can be addressed by just fixing the tests. > Typos in tests leak network sockets > --- > > Key: KAFKA-15816 > URL: https://issues.apache.org/jira/browse/KAFKA-15816 > Project: Kafka > Issue Type: Bug > Components: unit tests >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > There are a few tests which leak network sockets due to small typos in the > tests themselves. > Clients: [https://github.com/apache/kafka/pull/14750] (DONE) > * NioEchoServer > * KafkaConsumerTest > * KafkaProducerTest > * SelectorTest > * SslTransportLayerTest > * SslTransportTls12Tls13Test > * SslVersionsTransportLayerTest > * SaslAuthenticatorTest > Core: [https://github.com/apache/kafka/pull/14754] > * MiniKdc > * GssapiAuthenticationTest > * MirrorMakerIntegrationTest > * SocketServerTest > * EpochDrivenReplicationProtocolAcceptanceTest > * LeaderEpochIntegrationTest > Trogdor: [https://github.com/apache/kafka/pull/14771] > * AgentTest > Mirror: [https://github.com/apache/kafka/pull/14761] (DONE) > * DedicatedMirrorIntegrationTest > * MirrorConnectorsIntegrationTest > * MirrorConnectorsWithCustomForwardingAdminIntegrationTest > Runtime: [https://github.com/apache/kafka/pull/14764] > * ConnectorTopicsIntegrationTest > * ExactlyOnceSourceIntegrationTest > * WorkerTest > * WorkerGroupMemberTest > Streams: [https://github.com/apache/kafka/pull/14769] (DONE) > * IQv2IntegrationTest > * MetricsReporterIntegrationTest > * NamedTopologyIntegrationTest > * PurgeRepartitionTopicIntegrationTest > These can be addressed by just fixing the tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15816: Fix leaked sockets in mirror tests [kafka]
gharris1727 merged PR #14761: URL: https://github.com/apache/kafka/pull/14761 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15816: Fix leaked sockets in mirror tests [kafka]
gharris1727 commented on PR #14761: URL: https://github.com/apache/kafka/pull/14761#issuecomment-1843486780 Test failures appear unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1417826343 ## core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala: ## @@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging { } @Test - def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = { + def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = { val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", "--entity-type", "client-metrics", "--describe")) -val exception = assertThrows(classOf[IllegalArgumentException], () => describeOpts.checkArgs()) -assertEquals("an entity name must be specified with --describe of client-metrics", exception.getMessage) +val resourceCustom = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1") +val configEntry = new ConfigEntry("metrics", "*") +val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] +val describeResult: DescribeConfigsResult = mock(classOf[DescribeConfigsResult]) +when(describeResult.all()).thenReturn(future) + +val node = new Node(1, "localhost", 9092) +val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { +assertTrue(options.includeSynonyms()) +assertEquals(1, resources.size) +val resource = resources.iterator.next +assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`) +assertTrue(resourceCustom.name == resource.name) +future.complete(Map(resourceCustom -> new Config(util.Collections.singletonList(configEntry))).asJava) +describeResult + } +} + mockAdminClient.incrementalAlterConfigs(util.Collections.singletonMap(resourceCustom, + util.Collections.singletonList(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))), new AlterConfigsOptions()) +ConfigCommand.describeConfig(mockAdminClient, describeOpts) +verify(describeResult).all() Review Comment: @junrao The verification confirms that `.all` was exactly called/consumed once for the returned result in `ConfigCommand.scala`, by the code mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fully encapsulate user restore listener in the DelegatingRestoreListener [kafka]
ableegoldman merged PR #14886: URL: https://github.com/apache/kafka/pull/14886 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Support to update Async Consumer group member label (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14946: URL: https://github.com/apache/kafka/pull/14946#discussion_r1417815612 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -321,6 +332,9 @@ public void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData respo this.memberEpoch = response.memberEpoch(); ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); +clientTelemetryReporter.ifPresent(reporter -> reporter.updateMetricsLabels( +Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, memberId))); Review Comment: Yeah, I needed that feedback and that's the reason I saw multiple updates happening for labels and optimized duplicate labels. However I think handling duplicates was anyways required but let me check what should be the right place where this update should be called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1417807074 ## core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala: ## @@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging { } @Test - def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = { + def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = { val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", "--entity-type", "client-metrics", "--describe")) -val exception = assertThrows(classOf[IllegalArgumentException], () => describeOpts.checkArgs()) -assertEquals("an entity name must be specified with --describe of client-metrics", exception.getMessage) +val resourceCustom = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1") +val configEntry = new ConfigEntry("metrics", "*") +val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] +val describeResult: DescribeConfigsResult = mock(classOf[DescribeConfigsResult]) +when(describeResult.all()).thenReturn(future) + +val node = new Node(1, "localhost", 9092) +val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { +assertTrue(options.includeSynonyms()) +assertEquals(1, resources.size) +val resource = resources.iterator.next +assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`) +assertTrue(resourceCustom.name == resource.name) +future.complete(Map(resourceCustom -> new Config(util.Collections.singletonList(configEntry))).asJava) +describeResult + } +} + mockAdminClient.incrementalAlterConfigs(util.Collections.singletonMap(resourceCustom, + util.Collections.singletonList(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))), new AlterConfigsOptions()) +ConfigCommand.describeConfig(mockAdminClient, describeOpts) +verify(describeResult).all() Review Comment: `.all` is needed as it verifies the returned result future is actually consumed in `ConfigCommand.scala` here: https://github.com/apache/kafka/blob/46852eea1c620ff786f4c4c1ff4cbd47f912a1d9/core/src/main/scala/kafka/admin/ConfigCommand.scala#L610 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1417795477 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } -private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { +/** + * When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the + * member is in the {@link MemberState#UNSUBSCRIBED} state. + */ +public void resetPollTimer() { +pollTimer.reset(maxPollIntervalMs); +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, + final boolean ignoreResponse) { +NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); +heartbeatRequestState.onSendAttempt(currentTimeMs); +membershipManager.onHeartbeatRequestSent(); +return request; +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( -new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), -coordinatorRequestManager.coordinator()); +new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), +coordinatorRequestManager.coordinator()); return request.whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs()); +// The response is only ignore when the member becomes staled. This is because the member needs to +// rejoin on the next poll regardless the server has responded to the heartbeat. +if (!ignoreResponse) { + membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) response.responseBody()).data()); +maybeSendGroupMetadataUpdateEvent(); Review Comment: @cadonna - can you comment on this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15871) Implement kafka-client-metrics.sh tool
[ https://issues.apache.org/jira/browse/KAFKA-15871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-15871. - Resolution: Fixed merged the PR to trunk > Implement kafka-client-metrics.sh tool > -- > > Key: KAFKA-15871 > URL: https://issues.apache.org/jira/browse/KAFKA-15871 > Project: Kafka > Issue Type: Sub-task > Components: admin >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Fix For: 3.7.0 > > > Implement the `kafka-client-metrics.sh` tool which is introduced in KIP-714 > and enhanced in KIP-1000. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
C0urante commented on PR #12728: URL: https://github.com/apache/kafka/pull/12728#issuecomment-1843418398 @mdedetrich Thanks, the changes look good to go. But the build is failing because there are new tests on trunk that still use EasyMock/PowerMock. Can you rebase on the latest from trunk? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15871: kafka-client-metrics.sh [kafka]
junrao merged PR #14926: URL: https://github.com/apache/kafka/pull/14926 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
junrao commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1417759706 ## core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala: ## @@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging { } @Test - def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = { + def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = { val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", "--entity-type", "client-metrics", "--describe")) -val exception = assertThrows(classOf[IllegalArgumentException], () => describeOpts.checkArgs()) -assertEquals("an entity name must be specified with --describe of client-metrics", exception.getMessage) +val resourceCustom = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1") +val configEntry = new ConfigEntry("metrics", "*") +val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] +val describeResult: DescribeConfigsResult = mock(classOf[DescribeConfigsResult]) +when(describeResult.all()).thenReturn(future) + +val node = new Node(1, "localhost", 9092) +val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { +assertTrue(options.includeSynonyms()) +assertEquals(1, resources.size) +val resource = resources.iterator.next +assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`) +assertTrue(resourceCustom.name == resource.name) +future.complete(Map(resourceCustom -> new Config(util.Collections.singletonList(configEntry))).asJava) +describeResult + } +} + mockAdminClient.incrementalAlterConfigs(util.Collections.singletonMap(resourceCustom, + util.Collections.singletonList(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))), new AlterConfigsOptions()) +ConfigCommand.describeConfig(mockAdminClient, describeOpts) +verify(describeResult).all() Review Comment: It seems that we don't need to call `.all()` since we don't do anything with the return value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Support to update Async Consumer group member label (KIP-714) [kafka]
AndrewJSchofield commented on code in PR #14946: URL: https://github.com/apache/kafka/pull/14946#discussion_r1417760787 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -321,6 +332,9 @@ public void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData respo this.memberEpoch = response.memberEpoch(); ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); +clientTelemetryReporter.ifPresent(reporter -> reporter.updateMetricsLabels( +Collections.singletonMap(ClientTelemetryProvider.GROUP_MEMBER_ID, memberId))); Review Comment: This will work, but you are updating the metrics labels on every single heartbeat response. The member ID and member epoch are always sent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1417751374 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: This causes the passed-in lambda to be executed after `offsetWriteFuture` completes, but it doesn't cause `offsetWriteFuture::get` to block on the completion of that lambda, which means that if the caller of `ConnectorOffsetBackingStore::set` waits for the returned `Future` to complete right now, it'll complete immediately even though it's likely no offsets will have been written to either the primary or secondary store. Also, nit: when researching this behavior, I ran into `CompletableFuture::thenRun`, which is a slightly better fit than `CompletableFuture::thenAccept` (since we don't use the return value). ```suggestion offsetWriteFuture = offsetWriteFuture.thenRun(() -> { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on code in PR #14922: URL: https://github.com/apache/kafka/pull/14922#discussion_r1417761864 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -1477,6 +1533,29 @@ public Object getStateLock() { return stateLock; } +public Map> consumerClientInstanceIds(final Duration timeout) { +final Map> result = new HashMap<>(); + +synchronized (fetchDeadlines) { +boolean addDeadline = false; Review Comment: This PR itself does not need it yet (but we need it later when we add support for restore consumer and producer) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1417677509 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. Review Comment: This largely duplicates information present in the Javadoc for the method. IMO we can remove this entire comment block and, if we really want to make it easy for maintainers to understand not just the "what" but the "why" for this logic, we can link to https://issues.apache.org/jira/browse/KAFKA-15018 in the Javadoc for the method: ```java * * @see https://issues.apache.org/jira/browse/KAFKA-15018;>KAFKA-15018 for context on the three-step * write sequence ``` ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { +Future secondaryWriteFuture = secondaryStore.set(tombstoneOffsets, new FutureCallback<>()); +try { +if (exactlyOnce) { +secondaryWriteFuture.get(); +} else { +secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); +} +} catch (ExecutionException e) { +log.error("{} Flush of tombstone(s) offsets to secondary store threw an unexpected exception: ", this, e.getCause());
[jira] [Resolved] (KAFKA-15932) Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")
[ https://issues.apache.org/jira/browse/KAFKA-15932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield resolved KAFKA-15932. -- Resolution: Fixed > Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer") > --- > > Key: KAFKA-15932 > URL: https://issues.apache.org/jira/browse/KAFKA-15932 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: flaky-test > > Intermittently failing test for the new consumer. > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/ > ```Error > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > Stacktrace > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > at > app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161) > at > app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128) > at > app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at >
Re: [PR] KAFKA-15932: Wait for responses in consumer operations [kafka]
cadonna merged PR #14912: URL: https://github.com/apache/kafka/pull/14912 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15764: Missing Tests for Transactions [kafka]
jolshan commented on PR #14702: URL: https://github.com/apache/kafka/pull/14702#issuecomment-1843376087 I've restarted the build every day since the approval and have yet to get a clean build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Support to update Async Consumer group member label (KIP-714) [kafka]
apoorvmittal10 commented on PR #14946: URL: https://github.com/apache/kafka/pull/14946#issuecomment-1843373904 @kirktrue @philipnee @AndrewJSchofield Can you please review the change and let me know if change is at right place for new consumer? cc: @mjsax @wcarlson5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Support to update Async Consumer group member label (KIP-714) [kafka]
apoorvmittal10 opened a new pull request, #14946: URL: https://github.com/apache/kafka/pull/14946 - As the KIP-848 introduced new Kafka Consumer hence KIP-714 required further changes to capture `group_member_id`. - The change also includes a fix to update labels, as earlier calling `updateLabels` multiple times for same key would have added new resource attribute every time. Now it updates the map and then construct the labels, which updates the existing labels with new value. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org