[GitHub] [kafka] atu-sharm commented on pull request #13633: KAFKA-14839: Exclude protected variable from JavaDocs
atu-sharm commented on PR #13633: URL: https://github.com/apache/kafka/pull/13633#issuecomment-1545201789 Thanks, @vvcephei @mjsax @machi1990 for your valuable suggestions, i think it's better to apply this property to the whole project, made changes according to that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming merged pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse
dengziming merged PR #13679: URL: https://github.com/apache/kafka/pull/13679 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse
dengziming commented on PR #13679: URL: https://github.com/apache/kafka/pull/13679#issuecomment-1545197689 The failed tests are flaky and also failed recently in other PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
dajac commented on code in PR #13678: URL: https://github.com/apache/kafka/pull/13678#discussion_r1191927989 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, this.completedOffsetCommits = new ConcurrentLinkedQueue<>(); this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix); this.interceptors = interceptors; +this.inFlightAsyncCommits = new AtomicInteger(); this.pendingAsyncCommits = new AtomicInteger(); Review Comment: Okay. I understand this a bit better now. `pendingAsyncCommits` is really only meant to track inflight coordinator lookups. Then, in `close`, we wait on those to complete before calling `close` of the super class which waits on all the inflight requests to complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13349) Allow Iterator.remove on KeyValueIterator
[ https://issues.apache.org/jira/browse/KAFKA-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mahesh Madushan reassigned KAFKA-13349: --- Assignee: Mahesh Madushan > Allow Iterator.remove on KeyValueIterator > - > > Key: KAFKA-13349 > URL: https://issues.apache.org/jira/browse/KAFKA-13349 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Mahesh Madushan >Priority: Major > Labels: needs-kip, newbie++ > > Today Stream's state store's range iterator does not support `remove`. We > could consider adding such support for all the built-in state stores: > * RocksDB's native iterator does not support removal, but we can always do a > delete(key) concurrently while the iterator is open on the snapshot. > * In-Memory: straight forward implementation. > The benefit of that is then for range-and-delete truncation operation we do > not necessarily have to be cautious about concurrent modification exceptions. > This could also help GC with in-memory stores. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13349) Allow Iterator.remove on KeyValueIterator
[ https://issues.apache.org/jira/browse/KAFKA-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722007#comment-17722007 ] Mahesh Madushan commented on KAFKA-13349: - [~mjsax] Thank you for the response. Will work on this. > Allow Iterator.remove on KeyValueIterator > - > > Key: KAFKA-13349 > URL: https://issues.apache.org/jira/browse/KAFKA-13349 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip, newbie++ > > Today Stream's state store's range iterator does not support `remove`. We > could consider adding such support for all the built-in state stores: > * RocksDB's native iterator does not support removal, but we can always do a > delete(key) concurrently while the iterator is open on the snapshot. > * In-Memory: straight forward implementation. > The benefit of that is then for range-and-delete truncation operation we do > not necessarily have to be cautious about concurrent modification exceptions. > This could also help GC with in-memory stores. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13349) Allow Iterator.remove on KeyValueIterator
[ https://issues.apache.org/jira/browse/KAFKA-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722005#comment-17722005 ] Matthias J. Sax commented on KAFKA-13349: - Yes, we want to add `remove()` to interface [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java] and thus all implementation will need to support it. > Allow Iterator.remove on KeyValueIterator > - > > Key: KAFKA-13349 > URL: https://issues.apache.org/jira/browse/KAFKA-13349 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip, newbie++ > > Today Stream's state store's range iterator does not support `remove`. We > could consider adding such support for all the built-in state stores: > * RocksDB's native iterator does not support removal, but we can always do a > delete(key) concurrently while the iterator is open on the snapshot. > * In-Memory: straight forward implementation. > The benefit of that is then for range-and-delete truncation operation we do > not necessarily have to be cautious about concurrent modification exceptions. > This could also help GC with in-memory stores. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] aaron-ai opened a new pull request, #13710: MINOR: Fix the outdated comments of ConfigDef
aaron-ai opened a new pull request, #13710: URL: https://github.com/apache/kafka/pull/13710 Fix the outdated comments of `ConfigDef` since the signature of the corresponding method has been updated. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14911) Add system tests for rolling upgrade path of KIP-904
[ https://issues.apache.org/jira/browse/KAFKA-14911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722004#comment-17722004 ] Matthias J. Sax commented on KAFKA-14911: - No worries. And thanks for helping on reviewing! Equally important. > Add system tests for rolling upgrade path of KIP-904 > > > Key: KAFKA-14911 > URL: https://issues.apache.org/jira/browse/KAFKA-14911 > Project: Kafka > Issue Type: Test >Reporter: Farooq Qaiser >Assignee: Victoria Xia >Priority: Major > Fix For: 3.5.0 > > > As per [~mjsax] comment > [here|https://github.com/apache/kafka/pull/10747#pullrequestreview-1376539752], > we should add a system test to test the rolling upgrade path for > [KIP-904|https://cwiki.apache.org/confluence/x/P5VbDg] which introduces a new > serialization format for groupBy internal repartition topics and was > implemented as part of https://issues.apache.org/jira/browse/KAFKA-12446 > There is `StreamsUpgradeTest.java` and `streams_upgrade_test.py` (cf > `test_rolling_upgrade_with_2_bounces`) as a starting point. > Might be best to do a similar thing as for FK-joins, and add a new test > variation. > The tricky thing about the test would be, to ensure that the repartition > topic is not empty when we do the bounce, so the test should be setup > accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14981) Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted
[ https://issues.apache.org/jira/browse/KAFKA-14981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722003#comment-17722003 ] Matthias J. Sax commented on KAFKA-14981: - I was not aware that there was (or maybe still are) issue. Are there any tickets for it? > Set `group.instance.id` in streams consumer so that rebalance will not happen > if a instance is restarted > > > Key: KAFKA-14981 > URL: https://issues.apache.org/jira/browse/KAFKA-14981 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Priority: Minor > > `group.instance.id` enables static membership so that if a consumer is > restarted within `session.timeout.ms`, rebalance will not be triggered and > originally assignment can be returned directly from broker. We can set this > id in Kafka streams using `threadId` so that no rebalance is trigger within > `session.timeout.ms` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon merged pull request #13517: KAFKA-14752: Kafka examples improvements - demo changes
showuon merged PR #13517: URL: https://github.com/apache/kafka/pull/13517 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13517: KAFKA-14752: Kafka examples improvements - demo changes
showuon commented on PR #13517: URL: https://github.com/apache/kafka/pull/13517#issuecomment-1545024000 Failed tests are unrelated: ``` Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testTimeouts() Build / JDK 17 and Scala 2.13 / kafka.admin.ReassignPartitionsIntegrationTest.testAlterReassignmentThrottle(String).quorum=kraft Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients
[ https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721959#comment-17721959 ] tony mancill commented on KAFKA-8154: - Thank you for that observation [~keith.paulson]. We have been seeing this issue with BouncyCastle in our environment. We use client versions 2.8.2 and 3.4.0, but so far I've only tested 2.8.2. The following patch addresses the issue for us with 2.8.2, clearing up both exceptions and the accompanying connection resets. {code:java} diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index b9879ad6da..effd5fb80d 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -344,9 +344,9 @@ public class SslTransportLayer implements TransportLayer { netWriteBuffer.compact(); netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize); netWriteBuffer.flip(); - if (netWriteBuffer.limit() >= currentNetWriteBufferSize) { + if (netWriteBuffer.limit() > currentNetWriteBufferSize) { throw new IllegalStateException("Buffer overflow when available data size (" + netWriteBuffer.limit() + - ") >= network buffer size (" + currentNetWriteBufferSize + ")"); + ") > network buffer size (" + currentNetWriteBufferSize + ")"); } } else if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) { throw new IllegalStateException("Should not have received BUFFER_UNDERFLOW during handshake WRAP."); @@ -564,6 +564,7 @@ public class SslTransportLayer implements TransportLayer { netReadBuffer.flip(); SSLEngineResult unwrapResult; try { + appReadBuffer = Utils.ensureCapacity(appReadBuffer, netReadBufferSize()); unwrapResult = sslEngine.unwrap(netReadBuffer, appReadBuffer); if (state == State.POST_HANDSHAKE && appReadBuffer.position() != 0) { // For TLSv1.3, we have finished processing post-handshake messages since we are now processing data @@ -598,7 +599,7 @@ public class SslTransportLayer implements TransportLayer { ") >= application buffer size (" + currentApplicationBufferSize + ")"); } - // appReadBuffer will extended upto currentApplicationBufferSize + // appReadBuffer will extend up to currentApplicationBufferSize // we need to read the existing content into dst before we can do unwrap again. If there are no space in dst // we can break here. if (dst.hasRemaining()) {code} A couple of notes, which is why I haven't submitted a PR yet - that is, I don't think the patch is complete yet: * First, I'm not sure whether the change in the hunk starting at line 564 is needed. It was in my working tree while I tried to address this previously, so I need to test again without it. * There are other places in SslTransportLayer.java where Utils.ensureCapacity() is called and the following comparison uses >= instead of strictly >. It would be nice to address them all at once. > Buffer Overflow exceptions between brokers and with clients > --- > > Key: KAFKA-8154 > URL: https://issues.apache.org/jira/browse/KAFKA-8154 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Rajesh Nataraja >Priority: Major > Attachments: server.properties.txt > > > https://github.com/apache/kafka/pull/6495 > https://github.com/apache/kafka/pull/5785 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mehbey opened a new pull request, #13709: Added log timestamp validation to prevent records with timestamps tha…
mehbey opened a new pull request, #13709: URL: https://github.com/apache/kafka/pull/13709 ### What changed Added a validation to check if the record timestamp is in the future compared to the broker's timestamp and throw an exception to reject the record. The current validation for checking the record's timestamp based on the configuered ```timestampDiffMaxMs``` remain unchanged. This new validation will take care of scenarios where producers are sending future timestamp for a record. Specific changes are: - Updated validation logic in LogValidator - Added Unit test coverage for the change - Update Unit tests that failed because of the new validation logic ### Why? https://issues.apache.org/jira/browse/KAFKA-14991 Improves the accuracy of the log validation logic and avoids unexpected gotchas for customers ### Testing - Added relevant unit tests - Reproduced the issue by setting nonseconds instead of miliseconds in the producer logic and verified that validation is working as expected. Example API response ``` "responses":[{"name":"myTopic1","partitionResponses":[{"index":0,"errorCode":32,"baseOffset":-1,"logAppendTimeMs":-1,"logStartOffset":0,"recordErrors":[ {"batchIndex":0,"batchIndexErrorMessage":"Timestamp 1755933141855875 of message with offset 0 is ahead of the server's time. 1683838582815"}, {"batchIndex":1,"batchIndexErrorMessage":"Timestamp 1755933754530625 of message with offset 1 is ahead of the server's time. 1683838582815"}, ],"errorMessage":"One or more records have been rejected due to invalid timestamp"}]}] ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mehbey commented on pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito
mehbey commented on PR #13681: URL: https://github.com/apache/kafka/pull/13681#issuecomment-1544909428 > Thank you for the review @clolov - I will address the comments shortly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14992) Add partition limit or improve error msg for adminzkclient
Alyssa Huang created KAFKA-14992: Summary: Add partition limit or improve error msg for adminzkclient Key: KAFKA-14992 URL: https://issues.apache.org/jira/browse/KAFKA-14992 Project: Kafka Issue Type: Improvement Reporter: Alyssa Huang Create topic requests with large number of partitions will yield an exception that's technically unrelated and confusing. `partitions should be a consecutive 0-based integer sequence` What really is happening is that we exceed maxInt at this line [https://github.com/apache/kafka/blame/trunk/core/src/main/scala/kafka/zk/AdminZkClient.scala#L154] which causes the following check to fail. We should account for this case better. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role
[ https://issues.apache.org/jira/browse/KAFKA-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721935#comment-17721935 ] Bikash Adhikari commented on KAFKA-14941: - I will work on this documentation. > Document which configuration options are applicable only to processes with > broker role or controller role > - > > Key: KAFKA-14941 > URL: https://issues.apache.org/jira/browse/KAFKA-14941 > Project: Kafka > Issue Type: Improvement >Reporter: Jakub Scholz >Priority: Major > > When running in KRaft mode, some of the configuration options are applicable > only to nodes with the broker process role and some are applicable only to > the nodes with the controller process roles. It would be great if this > information was part of the documentation (e.g. in the [Broker > Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the > website), but if it was also part of the config classes so that it can be > used in situations when the configuration is dynamically configured to for > example filter the options applicable to different nodes. This would allow > having configuration files with only the actually used configuration options > and for example, help to reduce unnecessary restarts when rolling out new > configurations etc. > For some options, it seems clear and the Kafka node would refuse to start if > they are set - for example the configurations of the non-controler-listeners > in controller-only nodes. For others, it seems a bit less clear (Does > {{compression.type}} option apply to controller-only nodes? Or the > configurations for the offset topic? etc.). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] splett2 commented on a diff in pull request #13707: KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart
splett2 commented on code in PR #13707: URL: https://github.com/apache/kafka/pull/13707#discussion_r1191702662 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1956,10 +1956,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val transactionTopicSegmentBytes = getInt(KafkaConfig.TransactionsTopicSegmentBytesProp) val transactionAbortTimedOutTransactionCleanupIntervalMs = getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp) val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp) - + val transactionPartitionVerificationEnable = getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp) - val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp) + def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp) Review Comment: The dynamic broker config code is quite hairy. I took a look and my high-level understanding what it does is the following: `DynamicBrokerConfig.updateCurrentConfig`: Try to generate a new `KafkaConfig` from the set of properties persisted to Zookeeper. If the current config is equal to the new config, no-op. Otherwise, determine the set of reconfigurables that need to be updated based on the currently registered set of reconfigurables, apply those updates. Then update the current config. I added some logging and it looks like what is happening is the following: 1. During `KafkaServer.startup()` we call `config.dynamicConfig.initialize(Some(zkClient))`. At this point, the set of recconfigurables is empty. 2. Many lines of code later, we call: ``` /* Add all reconfigurables for config change notification before starting config handlers */ config.dynamicConfig.addReconfigurables(this) Option(logManager.cleaner).foreach(config.dynamicConfig.addBrokerReconfigurable) ``` 3. Eventually we start up the config manager which tries to reload props from Zookeeper. Step #1 loads broker overrides from Zookeeper, but doesn't apply any changes since we have not added the reconfigurables yet. This means that the props just get applied to the current KafkaConfig, and the reconfiguration hooks defined in `DynamicBrokerConfig` don't fire. However, we do update the current KafkaConfig to include the updated props. Step #2 adds the reconfigurables so that post-startup configuration changes alter components. Step #3 tries to load from Zookeeper the base props, but because #1 has already updated the current KafkaConfig to match the existing ZK state, we no-op again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
rreddy-22 commented on code in PR #13663: URL: https://github.com/apache/kafka/pull/13663#discussion_r1191678769 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -0,0 +1,958 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; + +/** + * + * This class holds metadata for a generic group. + */ +public class GenericGroup { + +/** + * Empty generation. + */ +public static final int NO_GENERATION = -1; + +/** + * Protocol with empty name. + */ +public static final String NO_PROTOCOL_NAME = ""; + +/** + * No leader. + */ +public static final String NO_LEADER = ""; + +/** + * Delimiter used to join a randomly generated UUID + * with client id or group instance id. + */ +private static final String MEMBER_ID_DELIMITER = "-"; + +/** + * The slf4j log context, used to create new loggers. + */ +private final LogContext logContext; + +/** + * The slf4j logger. + */ +private final Logger log; + +/** + * The group id. + */ +private final String groupId; + +/** + * The time. + */ +private final Time time; + +/** + * The current group state. + */ +private GenericGroupState state; + +/** + * The timestamp of when the group transitioned + * to its current state. + */ +private Optional currentStateTimestamp; + +/** + * The protocol type used for rebalance. + */ +private Optional protocolType = Optional.empty(); + +/** + * The protocol name used for rebalance. + */ +private Optional protocolName = Optional.empty(); + +/** + * The generation id. + */ +private int generationId = 0; + +/** + * The id of the group's leader. + */ +private Optional leaderId = Optional.empty(); + +/** + * The members of the group. + */ +private final Map members = new HashMap<>(); + +/** + * The static members of the group. + */ +private final Map staticMembers = new HashMap<>(); + +/** + * Members who have yet to (re)join the group Review Comment: oya yep mb -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers
jeffkbkim commented on PR #13704: URL: https://github.com/apache/kafka/pull/13704#issuecomment-1544658481 i'm seeing ``` Class Data Abstraction Coupling is 31 (max allowed is 25) classes [ApiMessageAndVersion, ClientAssignor, ConsumerGroupCurrentMemberAssignmentKey, ConsumerGroupCurrentMemberAssignmentValue, ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions, ConsumerGroupMember.Builder, ConsumerGroupMemberMetadataKey, ConsumerGroupMemberMetadataValue, ConsumerGroupMemberMetadataValue.Assignor, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataValue, ConsumerGroupPartitionMetadataValue.TopicMetadata, ConsumerGroupTargetAssignmentMemberKey, ConsumerGroupTargetAssignmentMemberValue, ConsumerGroupTargetAssignmentMemberValue.TopicPartition, ConsumerGroupTargetAssignmentMetadataKey, ConsumerGroupTargetAssignmentMetadataValue, GenericGroup, GenericGroupMember, GroupMetadataKey, GroupMetadataValue, GroupMetadataValue.MemberMetadata, LinkedHashMap, LogContext, MockTime, Protocol, Record, TopicMetadata, VersionedMetadata]. [ClassData AbstractionCoupling] ``` where this suggests from https://checkstyle.sourceforge.io/apidocs/com/puppycrawl/tools/checkstyle/checks/metrics/ClassDataAbstractionCouplingCheck.html#:~:text=Generally%20speaking%2C%20any%20data%20type,the%20structure%20of%20the%20class > Generally speaking, any data type with other data types as members or local variable that is an instantiation (object) of another class has data abstraction coupling (DAC). The higher the DAC, the more complex the structure of the class. Can we suppress this error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
philipnee commented on code in PR #13678: URL: https://github.com/apache/kafka/pull/13678#discussion_r1191665112 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) { } } +private boolean waitForPendingAsyncCommits(Timer timer) { Review Comment: the sendOffsetCommitRequest method basically 1. if the offset is empty, complete the future and return immediately, 2. if there's no coordinator `checkAndGetCoordinator` then complete the future with exception (like completeExceptionally) and return, otherwise 3. create the request data and send it `client.send(coordinator, builder).compose(new OffsetCommitResponseHandler(offsets, generation));` What I'm suggesting is not to change that method, but to change the `commitOffsetsSync`. My suggestions are: Currently, it returns immediately if the offset is empty. But we don't want that, because we also want to check if there's any inflightAsyncCommits. Now, if the we can't return immediately, we will need to send these commits, and the requirement is to `coordinatorUnknownAndUnreadySync`. This check is already in place, so I think you just need to try to send the async commit after this check. So there's not very much code change there :) The main concern me and David have is managing 2 atomic int, because at certain point one might forget to update one of it and causes some weird bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request, #13708: KAFKA-14500; [4/N] Add purgatory interface
jeffkbkim opened a new pull request, #13708: URL: https://github.com/apache/kafka/pull/13708 Adds the purgatory interface that will be used by the new group coordinator. * PurgatoryScheduler: used by GroupMetadataManager (introduced in https://github.com/apache/kafka/pull/13639) to add and remove operations * RebalancePurgatory: used by the new runtime framework to complete and expire operations in the purgatory. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14991) Improving Producer's record timestamp validation
Mehari Beyene created KAFKA-14991: - Summary: Improving Producer's record timestamp validation Key: KAFKA-14991 URL: https://issues.apache.org/jira/browse/KAFKA-14991 Project: Kafka Issue Type: Improvement Components: core, log Reporter: Mehari Beyene When time-based retention is configured, the timestamp provided by the producer is used by default to determine the retention period of the log. Customers have the option of changing the configuration to use the broker's timestamp by overriding the configuration for "log.message.timestamp.type", but by default, the producer's timestamp is used. The producer's record timestamp can be in the past or future. Kafka determines the retention time of the log by comparing the broker's time with the record's time. Arguably, there can be use cases for a producer to send records with timestamps that are in the past (for example, for replaying old data), but it is inaccurate for records to have a timestamp that is far in the future compared to the broker's current time. There is a configurable property called "message.timestamp.difference.max.ms" that customers can use to control the allowed time difference between the broker's current time and the record timestamp. However, the validation from the Kafka code side can be improved by rejecting records with future timestamps from being written in the first place. Customers have run into this issue in the past where a producer is configured erroneously to set the record timestamp in nanoseconds instead of milliseconds, resulting in a record timestamp that is in the future, and the time-based retention policy did not kick in as expected. The improvement I am proposing is to add basic validation in org.apache.kafka.storage.internals.log.LogValidator to reject record timestamps that are in the future compared to the broker current timestamp after accounting for a sensible tolerance for potential clock skew. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on PR #13608: URL: https://github.com/apache/kafka/pull/13608#issuecomment-1544627294 I forgot this also covers the race for end txn marker being written before the produce completes. (We reset verification status to empty on the marker, so a produce request will fail) Will add a test for this and include in description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers
dajac commented on code in PR #13704: URL: https://github.com/apache/kafka/pull/13704#discussion_r1191653253 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -399,8 +403,12 @@ public void testNewCurrentAssignmentTombstoneRecord() { )); } -@Test -public void testNewGroupMetadataRecord() { +@ParameterizedTest +@MethodSource("metadataToExpectedGroupMetadataValue") Review Comment: Ah. I see. I missed the version part. We can keep it as it is but could we move the method to just before this test? It is easier to read them together. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a diff in pull request #13707: KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart
splett2 commented on code in PR #13707: URL: https://github.com/apache/kafka/pull/13707#discussion_r1191611324 ## core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala: ## @@ -183,7 +183,11 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness { ) // Update the expiration time to a low value again. -admin.incrementalAlterConfigs(producerIdExpirationConfig("100")) + admin.incrementalAlterConfigs(producerIdExpirationConfig("100")).all().get() Review Comment: Yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers
jeffkbkim commented on code in PR #13704: URL: https://github.com/apache/kafka/pull/13704#discussion_r1191604696 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -399,8 +403,12 @@ public void testNewCurrentAssignmentTombstoneRecord() { )); } -@Test -public void testNewGroupMetadataRecord() { +@ParameterizedTest +@MethodSource("metadataToExpectedGroupMetadataValue") Review Comment: the reason I used method source is to include the corresponding expected version. if we use EnumSource we can add if/else statements but i thought the existing approach would be cleaner. what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
jeffkbkim commented on code in PR #13663: URL: https://github.com/apache/kafka/pull/13663#discussion_r1191602652 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -0,0 +1,1012 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.Group; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; + +/** + * This class holds metadata for a generic group where the + * member assignment is driven solely from the client side. + * + * The APIs members use to make changes to the group membership + * consist of JoinGroup, SyncGroup, and LeaveGroup. + */ +public class GenericGroup implements Group { + +/** + * Empty generation. + */ +public static final int NO_GENERATION = -1; + +/** + * Protocol with empty name. + */ +public static final String NO_PROTOCOL_NAME = ""; + +/** + * No leader. + */ +public static final String NO_LEADER = ""; + +/** + * Delimiter used to join a randomly generated UUID + * with a client id or a group instance id. + */ +private static final String MEMBER_ID_DELIMITER = "-"; + +/** + * The slf4j logger. + */ +private final Logger log; + +/** + * The group id. + */ +private final String groupId; + +/** + * The time. + */ +private final Time time; + +/** + * The current group state. + */ +private GenericGroupState state; + +/** + * The timestamp of when the group transitioned + * to its current state. + */ +private Optional currentStateTimestamp; + +/** + * The protocol type used for rebalance. + */ +private Optional protocolType = Optional.empty(); + +/** + * The protocol name used for rebalance. + */ +private Optional protocolName = Optional.empty(); + +/** + * The generation id. + */ +private int generationId = 0; + +/** + * The id of the group's leader. + */ +private Optional leaderId = Optional.empty(); + +/** + * The members of the group. + */ +private final Map members = new HashMap<>(); + +/** + * The static members of the group. + */ +private final Map staticMembers = new HashMap<>(); + +/** + * Members who have yet to (re)join the group + * during the join group phase. + */ +private final Set pendingJoinMembers = new HashSet<>(); + +/** + * The number of members awaiting a join response. + */ +private int numMembersAwaitingJoinResponse = 0; + +/** + * Map of protocol names to the number of members that support them. + */ +private final Map supportedProtocols = new HashMap<>(); + +/** + * Members who have yet to sync with the group + * during the sync group
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
jeffkbkim commented on code in PR #13663: URL: https://github.com/apache/kafka/pull/13663#discussion_r1191602126 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -0,0 +1,998 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.Group; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; + +/** + * + * This class holds metadata for a generic group where the + * member assignment is driven solely from the client side. + * + * The APIs members use to make changes to the group membership + * consist of JoinGroup, SyncGroup, and LeaveGroup. + */ +public class GenericGroup implements Group { + +/** + * Empty generation. + */ +public static final int NO_GENERATION = -1; + +/** + * Protocol with empty name. + */ +public static final String NO_PROTOCOL_NAME = ""; + +/** + * No leader. + */ +public static final String NO_LEADER = ""; + +/** + * Delimiter used to join a randomly generated UUID + * with a client id or a group instance id. + */ +private static final String MEMBER_ID_DELIMITER = "-"; + +/** + * The slf4j log context, used to create new loggers. + */ +private final LogContext logContext; + +/** + * The slf4j logger. + */ +private final Logger log; + +/** + * The group id. + */ +private final String groupId; + +/** + * The time. + */ +private final Time time; + +/** + * The current group state. + */ +private GenericGroupState state; + +/** + * The timestamp of when the group transitioned + * to its current state. + */ +private Optional currentStateTimestamp; Review Comment: This is a bit awkward as the existing GroupMetadata updates this field when we read the group metadata record (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1225). So we should expect the new group metadata manager introduced in https://github.com/apache/kafka/pull/13639 to perform this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…
Hangleton commented on code in PR #13584: URL: https://github.com/apache/kafka/pull/13584#discussion_r1191598672 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -65,6 +68,32 @@ class LogSegmentTest { Utils.delete(logDir) } + /** + * LogSegmentOffsetOverflowException should be thrown while appending the logs if: + * 1. largestOffset < 0 + * 2. largestOffset - baseOffset < 0 + * 3. largestOffset - baseOffset > Integer.MAX_VALUE + */ + @ParameterizedTest + @CsvSource(Array( +"0, -2147483648", +"0, 2147483648", +"1, 0", +"100, 10", +"2147483648, 0", +"-2147483648, 0", +"2147483648,4294967296" + )) + def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = { Review Comment: I see, thanks for checking. Could we maybe confirm if the overflow exception comes from this line in `TimeIndex#maybeAppend`: ``` mmap.putInt(relativeOffset(offset)) ``` And if that is the case, why is the relative offset < 0 or > max_int? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…
Hangleton commented on code in PR #13584: URL: https://github.com/apache/kafka/pull/13584#discussion_r1191598230 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -65,6 +68,32 @@ class LogSegmentTest { Utils.delete(logDir) } + /** + * LogSegmentOffsetOverflowException should be thrown while appending the logs if: + * 1. largestOffset < 0 + * 2. largestOffset - baseOffset < 0 + * 3. largestOffset - baseOffset > Integer.MAX_VALUE + */ + @ParameterizedTest + @CsvSource(Array( +"0, -2147483648", +"0, 2147483648", +"1, 0", +"100, 10", +"2147483648, 0", +"-2147483648, 0", +"2147483648,4294967296" + )) + def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = { Review Comment: I see, thanks for checking. Could we maybe confirm if the overflow exception comes from this line in `TimeIndex#maybeAppend`: ``` mmap.putInt(relativeOffset(offset)) ``` And if that is the case, why is the relative offset < 0 or > max_int? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a diff in pull request #13707: KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart
splett2 commented on code in PR #13707: URL: https://github.com/apache/kafka/pull/13707#discussion_r1191586683 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1956,10 +1956,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val transactionTopicSegmentBytes = getInt(KafkaConfig.TransactionsTopicSegmentBytesProp) val transactionAbortTimedOutTransactionCleanupIntervalMs = getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp) val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp) - + val transactionPartitionVerificationEnable = getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp) - val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp) + def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp) Review Comment: This is a good question. It is probably worth making sure we have similar coverage for other dynamic broker configs. Looking at some other dynamic configs, eg: `LogCleanerThreadsProp`, I would expect them to run into similar issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13707: KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart
jolshan commented on code in PR #13707: URL: https://github.com/apache/kafka/pull/13707#discussion_r1191577885 ## core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala: ## @@ -183,7 +183,11 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness { ) // Update the expiration time to a low value again. -admin.incrementalAlterConfigs(producerIdExpirationConfig("100")) + admin.incrementalAlterConfigs(producerIdExpirationConfig("100")).all().get() Review Comment: Does the all + get ensure the call actually completes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13707: KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart
jolshan commented on code in PR #13707: URL: https://github.com/apache/kafka/pull/13707#discussion_r1191576738 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1956,10 +1956,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val transactionTopicSegmentBytes = getInt(KafkaConfig.TransactionsTopicSegmentBytesProp) val transactionAbortTimedOutTransactionCleanupIntervalMs = getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp) val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp) - + val transactionPartitionVerificationEnable = getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp) - val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp) + def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp) Review Comment: Are the other dynamic configs defs here? Just trying to figure out how this was missed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 opened a new pull request, #13707: KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart
splett2 opened a new pull request, #13707: URL: https://github.com/apache/kafka/pull/13707 Dynamic overrides for the producer ID expiration config are not picked up on broker restart in Zookeeper mode. Based on the integration test, this does not apply to KRaft mode. Adds a broker restart that fails without the corresponding KafkaConfig change. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14990) Dynamic producer ID expiration is not applied on broker restart
David Mao created KAFKA-14990: - Summary: Dynamic producer ID expiration is not applied on broker restart Key: KAFKA-14990 URL: https://issues.apache.org/jira/browse/KAFKA-14990 Project: Kafka Issue Type: Bug Reporter: David Mao Assignee: David Mao -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hoangphuocbk commented on pull request #12371: KAFKA-14035: Fix NPE caused by missing null check in SnapshottableHashTable::mergeFrom()
hoangphuocbk commented on PR #12371: URL: https://github.com/apache/kafka/pull/12371#issuecomment-1544342962 @iblislin, I had same problem. Did your solution work? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers
dajac commented on code in PR #13704: URL: https://github.com/apache/kafka/pull/13704#discussion_r1191453623 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -365,6 +365,20 @@ public short listOffsetRequestVersion() { } } +public short groupMetadataValueVersion() { Review Comment: nit: Should we add a unit test in MetadataVersionTest as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers
dajac commented on code in PR #13704: URL: https://github.com/apache/kafka/pull/13704#discussion_r1191452753 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -399,8 +403,12 @@ public void testNewCurrentAssignmentTombstoneRecord() { )); } -@Test -public void testNewGroupMetadataRecord() { +@ParameterizedTest +@MethodSource("metadataToExpectedGroupMetadataValue") Review Comment: You could use: ``` @ParameterizedTest @EnumSource(value = MetadataVersion.class, names = {"IBP_3_0_IV1", "IBP_3_3_IV2"}) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers
jeffkbkim commented on code in PR #13704: URL: https://github.com/apache/kafka/pull/13704#discussion_r1191438825 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -383,4 +398,84 @@ public void testNewCurrentAssignmentTombstoneRecord() { "member-id" )); } + +@Test +public void testNewGroupMetadataRecord() { +Time time = new MockTime(); + +List expectedMembers = new ArrayList<>(); +expectedMembers.add( +new GroupMetadataValue.MemberMetadata() +.setMemberId("member-1") +.setClientId("client-1") +.setClientHost("host-1") +.setRebalanceTimeout(1000) +.setSessionTimeout(1500) +.setGroupInstanceId("group-instance-1") +.setSubscription(new byte[]{0, 1}) +.setAssignment(new byte[]{1, 2}) +); + +expectedMembers.add( +new GroupMetadataValue.MemberMetadata() +.setMemberId("member-2") +.setClientId("client-2") +.setClientHost("host-2") +.setRebalanceTimeout(1000) +.setSessionTimeout(1500) +.setGroupInstanceId("group-instance-2") +.setSubscription(new byte[]{1, 2}) +.setAssignment(new byte[]{2, 3}) +); + +Record expectedRecord = new Record( +new ApiMessageAndVersion( +new GroupMetadataKey() +.setGroup("group-id"), +(short) 2), +new ApiMessageAndVersion( +new GroupMetadataValue() +.setProtocol("range") +.setProtocolType("consumer") +.setLeader("member-1") +.setGeneration(1) +.setCurrentStateTimestamp(time.milliseconds()) +.setMembers(expectedMembers), +(short) 3)); + +GenericGroup group = new GenericGroup( +new LogContext(), +"group-id", +GenericGroupState.PREPARING_REBALANCE, +time +); + +Map memberAssignments = new HashMap<>(); +expectedMembers.forEach(member -> { +memberAssignments.put(member.memberId(), member.assignment()); +group.add(new GenericGroupMember( +member.memberId(), +Optional.of(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +member.rebalanceTimeout(), +member.sessionTimeout(), +"consumer", +Collections.singletonList(new Protocol( +"range", +member.subscription() +)), +member.assignment() +)); +}); + +group.initNextGeneration(); +Record groupMetadataRecord = RecordHelpers.newGroupMetadataRecord( +group, +memberAssignments, +MetadataVersion.IBP_3_5_IV2 +); + +assertEquals(expectedRecord, groupMetadataRecord); +} } Review Comment: i updated `testNewGroupMetadataRecord` for 3). can you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #13517: KAFKA-14752: Kafka examples improvements - demo changes
fvaleri commented on PR #13517: URL: https://github.com/apache/kafka/pull/13517#issuecomment-1544315826 @showuon thanks for keeping the time to review all these PRs. We should be good now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13517: KAFKA-14752: Kafka examples improvements - demo changes
fvaleri commented on code in PR #13517: URL: https://github.com/apache/kafka/pull/13517#discussion_r1191435518 ## examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java: ## @@ -16,29 +16,59 @@ */ package kafka.examples; -import org.apache.kafka.common.errors.TimeoutException; - import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +/** + * This example can be decomposed into the following stages: + * + * 1. Clean any topics left from previous runs. + * 2. Create a producer thread to send a set of records to topic1. + * 3. Create a consumer thread to fetch all previously sent records from topic1. + * + * If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`. + * You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to + * record all the log output together. + */ public class KafkaConsumerProducerDemo { -public static void main(String[] args) throws InterruptedException { -boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync"); -CountDownLatch latch = new CountDownLatch(2); -Producer producerThread = new Producer( -"producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 1, -1, latch); -producerThread.start(); - -Consumer consumerThread = new Consumer( -"consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, "DemoConsumer", Optional.empty(), false, 1, latch); -consumerThread.start(); - -if (!latch.await(5, TimeUnit.MINUTES)) { -throw new TimeoutException("Timeout after 5 minutes waiting for demo producer and consumer to finish"); -} +public static final String BOOTSTRAP_SERVERS = "localhost:9092"; +public static final String TOPIC_NAME = "my-topic"; +public static final String GROUP_NAME = "my-group"; + +public static void main(String[] args) { +try { +if (args.length == 0) { +Utils.printHelp("This example takes 2 parameters (i.e. 1 sync):%n" + +"- records: total number of records to send (required)%n" + +"- mode: pass 'sync' to send records synchronously (optional)"); +return; +} -consumerThread.shutdown(); -System.out.println("All finished!"); +int numRecords = Integer.parseInt(args[0]); +boolean isAsync = args.length == 1 || !args[1].trim().equalsIgnoreCase("sync"); + +// stage 1: clean any topics left from previous runs +Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME); +CountDownLatch latch = new CountDownLatch(2); + +// stage 2: produce records to topic1 +Producer producerThread = new Producer( +"producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, false, numRecords, -1, latch); +producerThread.start(); + +// stage 3: consume records from topic1 +Consumer consumerThread = new Consumer( +"consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, Optional.empty(), false, numRecords, latch); +consumerThread.start(); + +if (!latch.await(5, TimeUnit.MINUTES)) { +Utils.printErr("Timeout after 5 minutes waiting for termination"); +//producerThread.shutdown(); +consumerThread.shutdown(); Review Comment: Rebase issue, fixed. ## examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java: ## @@ -16,182 +16,90 @@ */ package kafka.examples; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.TopicExistsException; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; - -import java.util.Arrays; -import java.util.List; import java.util.Optional; -import java.util.Properties; -import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** - * This exactly once demo driver takes 3 arguments: - * - partition: number of partitions for input/output topic - * - instances: number of instances - * - records: number of records - * An example argument list would be `6 3 5`. - * - * If you are using IntelliJ IDEA, the above arguments should be put in the configuration's `Program Arguments`. - * Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console - * output to file` to record
[GitHub] [kafka] clolov commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
clolov commented on code in PR #13663: URL: https://github.com/apache/kafka/pull/13663#discussion_r1191430711 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -0,0 +1,998 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.Group; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; + +/** + * + * This class holds metadata for a generic group where the + * member assignment is driven solely from the client side. + * + * The APIs members use to make changes to the group membership + * consist of JoinGroup, SyncGroup, and LeaveGroup. + */ +public class GenericGroup implements Group { + +/** + * Empty generation. + */ +public static final int NO_GENERATION = -1; + +/** + * Protocol with empty name. + */ +public static final String NO_PROTOCOL_NAME = ""; + +/** + * No leader. + */ +public static final String NO_LEADER = ""; + +/** + * Delimiter used to join a randomly generated UUID + * with a client id or a group instance id. + */ +private static final String MEMBER_ID_DELIMITER = "-"; + +/** + * The slf4j log context, used to create new loggers. + */ +private final LogContext logContext; + +/** + * The slf4j logger. + */ +private final Logger log; + +/** + * The group id. + */ +private final String groupId; + +/** + * The time. + */ +private final Time time; + +/** + * The current group state. + */ +private GenericGroupState state; + +/** + * The timestamp of when the group transitioned + * to its current state. + */ +private Optional currentStateTimestamp; Review Comment: Is this necessary to be an Optional? As far as I see we immediately define it in the constructor and we never set it equal to something which is empty. Am I missing something obvious? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
clolov commented on code in PR #13663: URL: https://github.com/apache/kafka/pull/13663#discussion_r1191430711 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -0,0 +1,998 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.Group; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; + +/** + * + * This class holds metadata for a generic group where the + * member assignment is driven solely from the client side. + * + * The APIs members use to make changes to the group membership + * consist of JoinGroup, SyncGroup, and LeaveGroup. + */ +public class GenericGroup implements Group { + +/** + * Empty generation. + */ +public static final int NO_GENERATION = -1; + +/** + * Protocol with empty name. + */ +public static final String NO_PROTOCOL_NAME = ""; + +/** + * No leader. + */ +public static final String NO_LEADER = ""; + +/** + * Delimiter used to join a randomly generated UUID + * with a client id or a group instance id. + */ +private static final String MEMBER_ID_DELIMITER = "-"; + +/** + * The slf4j log context, used to create new loggers. + */ +private final LogContext logContext; + +/** + * The slf4j logger. + */ +private final Logger log; + +/** + * The group id. + */ +private final String groupId; + +/** + * The time. + */ +private final Time time; + +/** + * The current group state. + */ +private GenericGroupState state; + +/** + * The timestamp of when the group transitioned + * to its current state. + */ +private Optional currentStateTimestamp; Review Comment: Is this necessary to be an Optional? As far as I see we immediately define it in the constructor and we never set it equal to null. Am I missing something obvious? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd merged pull request #13060: KAFKA-14559: Fix JMX tool to handle the object names with wild cards and optional attributes
satishd merged PR #13060: URL: https://github.com/apache/kafka/pull/13060 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13060: KAFKA-14559: Fix JMX tool to handle the object names with wild cards and optional attributes
satishd commented on PR #13060: URL: https://github.com/apache/kafka/pull/13060#issuecomment-1544287256 There were a couple of test failures that are unrelated to this PR, will merge to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
philipnee commented on code in PR #13678: URL: https://github.com/apache/kafka/pull/13678#discussion_r1191400253 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, this.completedOffsetCommits = new ConcurrentLinkedQueue<>(); this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix); this.interceptors = interceptors; +this.inFlightAsyncCommits = new AtomicInteger(); this.pendingAsyncCommits = new AtomicInteger(); Review Comment: Thanks @dajac - Do you mean the code isn't changing the counter in the commit callback? I don't know the exact reason, but my guess is, the commit isn't sent until a coordinator is ready and it is therefore called "pending". If the coordinator is connected, then there's no point to increment and decrement the counter, because there will be a response anyway. I'm in supportive of the consolidating suggestions. For me, it is mentally difficult to manage multiple different states correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
philipnee commented on code in PR #13678: URL: https://github.com/apache/kafka/pull/13678#discussion_r1191400253 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, this.completedOffsetCommits = new ConcurrentLinkedQueue<>(); this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix); this.interceptors = interceptors; +this.inFlightAsyncCommits = new AtomicInteger(); this.pendingAsyncCommits = new AtomicInteger(); Review Comment: Thanks @dajac - Do you mean the code isn't changing the counter in the commit callback? I don't know the exact reason, but my guess is, the commit isn't sent until a coordinator is ready and it is therefore called "pending". If the coordinator is connected, then there's no point to increment and decrement the counter, but it will be done anyway, i.e. there will (most likely) be a response. I'm in supportive of the consolidating suggestions. For me, it is mentally difficult to manage multiple different states correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
philipnee commented on code in PR #13678: URL: https://github.com/apache/kafka/pull/13678#discussion_r1191400253 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, this.completedOffsetCommits = new ConcurrentLinkedQueue<>(); this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix); this.interceptors = interceptors; +this.inFlightAsyncCommits = new AtomicInteger(); this.pendingAsyncCommits = new AtomicInteger(); Review Comment: Thanks @dajac - Do you mean the code isn't changing the counter in the commit callback? I don't know the exact reason, but my guess is, the commit isn't sent until a coordinator is ready and it is therefore called "pending". If the coordinator exist, then there's no point to increment and decrement the counter, but it will be done anyway, i.e. there will (most likely) be a response. I'm in supportive of the consolidating suggestions. For me, it is mentally difficult to manage multiple different states correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers
dajac commented on code in PR #13704: URL: https://github.com/apache/kafka/pull/13704#discussion_r1191395592 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord( ); } +/** + * Creates a GroupMetadata record. + * + * @param group The generic group. + * @param memberAssignments The assignment by member id. + * @param metadataVersionThe metadata version. + * @return The record. + */ +public static Record newGroupMetadataRecord( +GenericGroup group, +Map memberAssignments, Review Comment: hmm... don't we store it in MemberMetadata in the current implementation? We set it in `setAndPropagateAssignment`. I think that we need it because we need the ability to provide it the the member at any time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat merged pull request #13706: MINOR: clean up unused methods in core utils
jlprat merged PR #13706: URL: https://github.com/apache/kafka/pull/13706 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers
jeffkbkim commented on code in PR #13704: URL: https://github.com/apache/kafka/pull/13704#discussion_r1191390607 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord( ); } +/** + * Creates a GroupMetadata record. + * + * @param group The generic group. + * @param memberAssignments The assignment by member id. + * @param metadataVersionThe metadata version. + * @return The record. + */ +public static Record newGroupMetadataRecord( +GenericGroup group, +Map memberAssignments, Review Comment: the assignment comes directly from the sync group request, i don't think we need to store the assignment inside the group -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
philipnee commented on code in PR #13678: URL: https://github.com/apache/kafka/pull/13678#discussion_r1191387973 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) { } } +private boolean waitForPendingAsyncCommits(Timer timer) { Review Comment: could you elaborate `logic will spread around and understandability will suffer a lot` : I was suggesting to first check if there's any inflight commits, then wait for the coordinator to be connected. After the connection is established, you can try to send the async commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #13706: MINOR: clean up unused methods in core utils
jlprat commented on PR #13706: URL: https://github.com/apache/kafka/pull/13706#issuecomment-1544246745 All test failures seem to be unrelated to the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
dajac commented on code in PR #13663: URL: https://github.com/apache/kafka/pull/13663#discussion_r1191381128 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -0,0 +1,1012 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.Group; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; + +/** + * This class holds metadata for a generic group where the + * member assignment is driven solely from the client side. + * + * The APIs members use to make changes to the group membership + * consist of JoinGroup, SyncGroup, and LeaveGroup. + */ +public class GenericGroup implements Group { + +/** + * Empty generation. + */ +public static final int NO_GENERATION = -1; + +/** + * Protocol with empty name. + */ +public static final String NO_PROTOCOL_NAME = ""; + +/** + * No leader. + */ +public static final String NO_LEADER = ""; + +/** + * Delimiter used to join a randomly generated UUID + * with a client id or a group instance id. + */ +private static final String MEMBER_ID_DELIMITER = "-"; + +/** + * The slf4j logger. + */ +private final Logger log; + +/** + * The group id. + */ +private final String groupId; + +/** + * The time. + */ +private final Time time; + +/** + * The current group state. + */ +private GenericGroupState state; + +/** + * The timestamp of when the group transitioned + * to its current state. + */ +private Optional currentStateTimestamp; + +/** + * The protocol type used for rebalance. + */ +private Optional protocolType = Optional.empty(); + +/** + * The protocol name used for rebalance. + */ +private Optional protocolName = Optional.empty(); + +/** + * The generation id. + */ +private int generationId = 0; + +/** + * The id of the group's leader. + */ +private Optional leaderId = Optional.empty(); + +/** + * The members of the group. + */ +private final Map members = new HashMap<>(); + +/** + * The static members of the group. + */ +private final Map staticMembers = new HashMap<>(); + +/** + * Members who have yet to (re)join the group + * during the join group phase. + */ +private final Set pendingJoinMembers = new HashSet<>(); + +/** + * The number of members awaiting a join response. + */ +private int numMembersAwaitingJoinResponse = 0; + +/** + * Map of protocol names to the number of members that support them. + */ +private final Map supportedProtocols = new HashMap<>(); + +/** + * Members who have yet to sync with the group + * during the sync group phase.
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
jeffkbkim commented on code in PR #13663: URL: https://github.com/apache/kafka/pull/13663#discussion_r1191374954 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java: ## @@ -0,0 +1,935 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class GenericGroupTest { +private final String protocolType = "consumer"; +private final String groupInstanceId = "groupInstanceId"; +private final String memberId = "memberId"; +private final String clientId = "clientId"; +private final String clientHost = "clientHost"; +private final int rebalanceTimeoutMs = 6; +private final int sessionTimeoutMs = 1; + + +private GenericGroup group = null; + +@BeforeEach +public void initialize() { +group = new GenericGroup(new LogContext(), "groupId", EMPTY, Time.SYSTEM); +} + +@Test +public void testCanRebalanceWhenStable() { +assertTrue(group.canRebalance()); +} + +@Test +public void testCanRebalanceWhenCompletingRebalance() { +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(COMPLETING_REBALANCE); +assertTrue(group.canRebalance()); +} + +@Test +public void testCannotRebalanceWhenPreparingRebalance() { +group.transitionTo(PREPARING_REBALANCE); +assertFalse(group.canRebalance()); +} + +@Test +public void testCannotRebalanceWhenDead() { +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); +group.transitionTo(DEAD); +assertFalse(group.canRebalance()); +} + +@Test +public void testStableToPreparingRebalanceTransition() { +group.transitionTo(PREPARING_REBALANCE); +assertState(group, PREPARING_REBALANCE); +} + +@Test +public void testStableToDeadTransition() { +group.transitionTo(DEAD); +assertState(group, DEAD); +} + +@Test +public void testAwaitingRebalanceToPreparingRebalanceTransition() { +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(COMPLETING_REBALANCE); +group.transitionTo(PREPARING_REBALANCE); +assertState(group, PREPARING_REBALANCE); +} + +@Test +public void testPreparingRebalanceToDeadTransition() { +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(DEAD); +assertState(group, DEAD); +} + +@Test +public void testPreparingRebalanceToEmptyTransition() { +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); +assertState(group, EMPTY); +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
jeffkbkim commented on code in PR #13663: URL: https://github.com/apache/kafka/pull/13663#discussion_r1191359030 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java: ## @@ -0,0 +1,935 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class GenericGroupTest { +private final String protocolType = "consumer"; +private final String groupInstanceId = "groupInstanceId"; +private final String memberId = "memberId"; +private final String clientId = "clientId"; +private final String clientHost = "clientHost"; +private final int rebalanceTimeoutMs = 6; +private final int sessionTimeoutMs = 1; + + +private GenericGroup group = null; + +@BeforeEach +public void initialize() { +group = new GenericGroup(new LogContext(), "groupId", EMPTY, Time.SYSTEM); +} + +@Test +public void testCanRebalanceWhenStable() { +assertTrue(group.canRebalance()); +} + +@Test +public void testCanRebalanceWhenCompletingRebalance() { +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(COMPLETING_REBALANCE); +assertTrue(group.canRebalance()); +} + +@Test +public void testCannotRebalanceWhenPreparingRebalance() { +group.transitionTo(PREPARING_REBALANCE); +assertFalse(group.canRebalance()); +} + +@Test +public void testCannotRebalanceWhenDead() { +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); +group.transitionTo(DEAD); +assertFalse(group.canRebalance()); +} + +@Test +public void testStableToPreparingRebalanceTransition() { +group.transitionTo(PREPARING_REBALANCE); +assertState(group, PREPARING_REBALANCE); +} + +@Test +public void testStableToDeadTransition() { +group.transitionTo(DEAD); +assertState(group, DEAD); +} + +@Test +public void testAwaitingRebalanceToPreparingRebalanceTransition() { +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(COMPLETING_REBALANCE); +group.transitionTo(PREPARING_REBALANCE); +assertState(group, PREPARING_REBALANCE); +} + +@Test +public void testPreparingRebalanceToDeadTransition() { +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(DEAD); +assertState(group, DEAD); +} + +@Test +public void testPreparingRebalanceToEmptyTransition() { +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); +assertState(group, EMPTY); +
[jira] [Updated] (KAFKA-14978) ExactlyOnceWorkerSourceTask does not remove parent metrics
[ https://issues.apache.org/jira/browse/KAFKA-14978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14978: -- Fix Version/s: 3.4.1 > ExactlyOnceWorkerSourceTask does not remove parent metrics > -- > > Key: KAFKA-14978 > URL: https://issues.apache.org/jira/browse/KAFKA-14978 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > Fix For: 3.4.1, 3.6.0 > > > ExactlyOnceWorkerSourceTask removeMetrics does not invoke > super.removeMetrics, meaning that only the transactional metrics are removed, > and common source task metrics are not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
jeffkbkim commented on code in PR #13663: URL: https://github.com/apache/kafka/pull/13663#discussion_r1191340188 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -0,0 +1,998 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.Group; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; + +/** + * + * This class holds metadata for a generic group where the + * member assignment is driven solely from the client side. + * + * The APIs members use to make changes to the group membership + * consist of JoinGroup, SyncGroup, and LeaveGroup. + */ +public class GenericGroup implements Group { + +/** + * Empty generation. + */ +public static final int NO_GENERATION = -1; + +/** + * Protocol with empty name. + */ +public static final String NO_PROTOCOL_NAME = ""; + +/** + * No leader. + */ +public static final String NO_LEADER = ""; + +/** + * Delimiter used to join a randomly generated UUID + * with a client id or a group instance id. + */ +private static final String MEMBER_ID_DELIMITER = "-"; + +/** + * The slf4j log context, used to create new loggers. + */ +private final LogContext logContext; + +/** + * The slf4j logger. + */ +private final Logger log; + +/** + * The group id. + */ +private final String groupId; + +/** + * The time. + */ +private final Time time; + +/** + * The current group state. + */ +private GenericGroupState state; + +/** + * The timestamp of when the group transitioned + * to its current state. + */ +private Optional currentStateTimestamp; + +/** + * The protocol type used for rebalance. + */ +private Optional protocolType = Optional.empty(); + +/** + * The protocol name used for rebalance. + */ +private Optional protocolName = Optional.empty(); + +/** + * The generation id. + */ +private int generationId = 0; + +/** + * The id of the group's leader. + */ +private Optional leaderId = Optional.empty(); + +/** + * The members of the group. + */ +private final Map members = new HashMap<>(); + +/** + * The static members of the group. + */ +private final Map staticMembers = new HashMap<>(); + +/** + * Members who have yet to (re)join the group + * during the join group phase. + */ +private final Set pendingJoinMembers = new HashSet<>(); + +/** + * The number of members awaiting a join response. + */ +private int numMembersAwaitingJoinResponse = 0; + +/** + * Map of protocol names to the number of members that support them. + */ +private final Map
[GitHub] [kafka] kirktrue commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
kirktrue commented on code in PR #12685: URL: https://github.com/apache/kafka/pull/12685#discussion_r1191281659 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1383,9 +1386,13 @@ private ClusterResourceListeners configureClusterResourceListeners(Serializer * can be used (the partition is then calculated by built-in * partitioning logic). */ -private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Review Comment: Lest my eyes deceive me, this is just formatting, correct? ## clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java: ## @@ -31,13 +34,27 @@ public interface Partitioner extends Configurable, Closeable { * * @param topic The topic name * @param key The key to partition on (or null if no key) - * @param keyBytes The serialized key to partition on( or null if no key) + * @param keyBytes The serialized key to partition on(or null if no key) Review Comment: nit: Can we change: `The serialized key to partition on(or null if no key)` to `The serialized key to partition on (or null if no key)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
jeffkbkim commented on code in PR #13663: URL: https://github.com/apache/kafka/pull/13663#discussion_r1191303709 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -0,0 +1,998 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.Group; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; + +/** + * + * This class holds metadata for a generic group where the + * member assignment is driven solely from the client side. + * + * The APIs members use to make changes to the group membership + * consist of JoinGroup, SyncGroup, and LeaveGroup. + */ +public class GenericGroup implements Group { + +/** + * Empty generation. + */ +public static final int NO_GENERATION = -1; + +/** + * Protocol with empty name. + */ +public static final String NO_PROTOCOL_NAME = ""; + +/** + * No leader. + */ +public static final String NO_LEADER = ""; + +/** + * Delimiter used to join a randomly generated UUID + * with a client id or a group instance id. + */ +private static final String MEMBER_ID_DELIMITER = "-"; + +/** + * The slf4j log context, used to create new loggers. + */ +private final LogContext logContext; + +/** + * The slf4j logger. + */ +private final Logger log; + +/** + * The group id. + */ +private final String groupId; + +/** + * The time. + */ +private final Time time; + +/** + * The current group state. + */ +private GenericGroupState state; + +/** + * The timestamp of when the group transitioned + * to its current state. + */ +private Optional currentStateTimestamp; + +/** + * The protocol type used for rebalance. + */ +private Optional protocolType = Optional.empty(); + +/** + * The protocol name used for rebalance. + */ +private Optional protocolName = Optional.empty(); + +/** + * The generation id. + */ +private int generationId = 0; + +/** + * The id of the group's leader. + */ +private Optional leaderId = Optional.empty(); + +/** + * The members of the group. + */ +private final Map members = new HashMap<>(); + +/** + * The static members of the group. + */ +private final Map staticMembers = new HashMap<>(); + +/** + * Members who have yet to (re)join the group + * during the join group phase. + */ +private final Set pendingJoinMembers = new HashSet<>(); + +/** + * The number of members awaiting a join response. + */ +private int numMembersAwaitingJoinResponse = 0; + +/** + * Map of protocol names to the number of members that support them. + */ +private final Map
[GitHub] [kafka] lanshiqin commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…
lanshiqin commented on code in PR #13584: URL: https://github.com/apache/kafka/pull/13584#discussion_r1191256878 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -65,6 +68,32 @@ class LogSegmentTest { Utils.delete(logDir) } + /** + * LogSegmentOffsetOverflowException should be thrown while appending the logs if: + * 1. largestOffset < 0 Review Comment: Thanks for your advice, I have revised the description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lanshiqin commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…
lanshiqin commented on code in PR #13584: URL: https://github.com/apache/kafka/pull/13584#discussion_r1191252780 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -65,6 +68,32 @@ class LogSegmentTest { Utils.delete(logDir) } + /** + * LogSegmentOffsetOverflowException should be thrown while appending the logs if: + * 1. largestOffset < 0 + * 2. largestOffset - baseOffset < 0 + * 3. largestOffset - baseOffset > Integer.MAX_VALUE + */ + @ParameterizedTest + @CsvSource(Array( +"0, -2147483648", +"0, 2147483648", +"1, 0", +"100, 10", +"2147483648, 0", +"-2147483648, 0", +"2147483648,4294967296" Review Comment: Yes, it was intentionally written as 2147483648. The test method and LogSegment receive offset arguments of type Long in order to test scenarios exceeding Int.MAX_VALUE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lanshiqin commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…
lanshiqin commented on code in PR #13584: URL: https://github.com/apache/kafka/pull/13584#discussion_r1191248219 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -65,6 +68,32 @@ class LogSegmentTest { Utils.delete(logDir) } + /** + * LogSegmentOffsetOverflowException should be thrown while appending the logs if: + * 1. largestOffset < 0 + * 2. largestOffset - baseOffset < 0 + * 3. largestOffset - baseOffset > Integer.MAX_VALUE + */ + @ParameterizedTest + @CsvSource(Array( +"0, -2147483648", +"0, 2147483648", +"1, 0", +"100, 10", +"2147483648, 0", +"-2147483648, 0", +"2147483648,4294967296" + )) + def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = { Review Comment: When baseOffset == largestOffset, the teardown method in the test class is called at the end of the test, which in turn calls the logSegment _.close() => timeIndex.maybeAppend, Eventually an exception will be thrown (Integer overflow for offset: 0), but this exception is not the point that this test method needs to cover -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers
dajac commented on code in PR #13704: URL: https://github.com/apache/kafka/pull/13704#discussion_r1191225920 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord( ); } +/** + * Creates a GroupMetadata record. + * + * @param group The generic group. + * @param memberAssignments The assignment by member id. + * @param metadataVersionThe metadata version. + * @return The record. + */ +public static Record newGroupMetadataRecord( +GenericGroup group, +Map memberAssignments, Review Comment: Any reason why we can't get the assignments from the group? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord( ); } +/** + * Creates a GroupMetadata record. + * + * @param group The generic group. + * @param memberAssignments The assignment by member id. + * @param metadataVersionThe metadata version. + * @return The record. + */ +public static Record newGroupMetadataRecord( +GenericGroup group, +Map memberAssignments, +MetadataVersion metadataVersion +) { +short version; +if (metadataVersion.isLessThan(IBP_0_10_1_IV0)) { +version = (short) 0; +} else if (metadataVersion.isLessThan(IBP_2_1_IV0)) { +version = (short) 1; +} else if (metadataVersion.isLessThan(IBP_2_3_IV0)) { +version = (short) 2; +} else { +version = (short) 3; Review Comment: nit: Should we keep that comment? ``` // Serialize with the highest supported non-flexible version // until a tagged field is introduced or the version is bumped. ``` ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -383,4 +398,84 @@ public void testNewCurrentAssignmentTombstoneRecord() { "member-id" )); } + +@Test +public void testNewGroupMetadataRecord() { +Time time = new MockTime(); + +List expectedMembers = new ArrayList<>(); +expectedMembers.add( +new GroupMetadataValue.MemberMetadata() +.setMemberId("member-1") +.setClientId("client-1") +.setClientHost("host-1") +.setRebalanceTimeout(1000) +.setSessionTimeout(1500) +.setGroupInstanceId("group-instance-1") +.setSubscription(new byte[]{0, 1}) +.setAssignment(new byte[]{1, 2}) +); + +expectedMembers.add( +new GroupMetadataValue.MemberMetadata() +.setMemberId("member-2") +.setClientId("client-2") +.setClientHost("host-2") +.setRebalanceTimeout(1000) +.setSessionTimeout(1500) +.setGroupInstanceId("group-instance-2") +.setSubscription(new byte[]{1, 2}) +.setAssignment(new byte[]{2, 3}) +); + +Record expectedRecord = new Record( +new ApiMessageAndVersion( +new GroupMetadataKey() +.setGroup("group-id"), +(short) 2), +new ApiMessageAndVersion( +new GroupMetadataValue() +.setProtocol("range") +.setProtocolType("consumer") +.setLeader("member-1") +.setGeneration(1) +.setCurrentStateTimestamp(time.milliseconds()) +.setMembers(expectedMembers), +(short) 3)); + +GenericGroup group = new GenericGroup( +new LogContext(), +"group-id", +GenericGroupState.PREPARING_REBALANCE, +time +); + +Map memberAssignments = new HashMap<>(); +expectedMembers.forEach(member -> { +memberAssignments.put(member.memberId(), member.assignment()); +group.add(new GenericGroupMember( +member.memberId(), +Optional.of(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +member.rebalanceTimeout(), +member.sessionTimeout(), +"consumer", +Collections.singletonList(new Protocol( +"range", +member.subscription() +)), +member.assignment() +)); +}); + +group.initNextGeneration(); +Record groupMetadataRecord = RecordHelpers.newGroupMetadataRecord( +group, +memberAssignments, +
[GitHub] [kafka] clolov commented on a diff in pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito
clolov commented on code in PR #13681: URL: https://github.com/apache/kafka/pull/13681#discussion_r1191109013 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() { when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions); when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions); taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); -expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer))) -.andStubReturn(task01Converted); -activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); -expectLastCall().once(); +when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), Mockito.eq(taskId01Partitions), Review Comment: ```suggestion when(activeTaskCreator.createActiveTaskFromStandby(task01, taskId01Partitions, ``` ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -879,13 +880,10 @@ public void shouldCloseTasksRemovedFromStateUpdater() { when(tasks.removePendingTaskToCloseClean(task00.id())).thenReturn(true); when(tasks.removePendingTaskToCloseClean(task01.id())).thenReturn(true); taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); -activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); -expectLastCall().once(); -replay(activeTaskCreator); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); -verify(activeTaskCreator); +Mockito.verify(activeTaskCreator, times(1)).closeAndRemoveTaskProducerIfNeeded(any()); Review Comment: ```suggestion Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any()); ``` ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -960,10 +958,8 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { when(stateUpdater.drainRemovedTasks()) .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions)); when(stateUpdater.restoresActiveTasks()).thenReturn(true); - expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), eq(taskId01Partitions), eq(consumer))) -.andStubReturn(convertedTask1); -activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); -expectLastCall().times(2); + when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(taskToRecycle1), Mockito.eq(taskId01Partitions), Review Comment: ```suggestion when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, taskId01Partitions, ``` ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() { when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions); when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions); taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); -expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer))) -.andStubReturn(task01Converted); -activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); -expectLastCall().once(); +when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), Mockito.eq(taskId01Partitions), +Mockito.eq(consumer))).thenReturn(task01Converted); expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions))) .andStubReturn(task00Converted); -replay(activeTaskCreator, standbyTaskCreator); +replay(standbyTaskCreator); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); -verify(activeTaskCreator, standbyTaskCreator); +verify(standbyTaskCreator); +Mockito.verify(activeTaskCreator, times(1)).closeAndRemoveTaskProducerIfNeeded(any()); Review Comment: ```suggestion Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any()); ``` The default check of verify is times(1) as far as I know. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2162,11 +2140,12 @@ public Map prepareCommit() { // handleAssignment expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singleton(corruptedStandby)); -
[GitHub] [kafka] dajac commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
dajac commented on code in PR #13663: URL: https://github.com/apache/kafka/pull/13663#discussion_r1191160971 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -0,0 +1,967 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance; + +/** + * Java rewrite of {@link kafka.coordinator.group.GroupMetadata} that is used + * by the new group coordinator (KIP-848). Offset management will be handled + * by a different component. + * + * This class holds group metadata for a generic group. + */ +public class GenericGroup { + +/** + * Empty generation. + */ +public static final int NO_GENERATION = -1; + +/** + * Protocol with empty name. + */ +public static final String NO_PROTOCOL_NAME = ""; + +/** + * No leader. + */ +public static final String NO_LEADER = ""; + +/** + * Delimiter used to join a randomly generated UUID + * with client id or group instance id. + */ +private static final String MEMBER_ID_DELIMITER = "-"; + +/** + * The slf4j log context, used to create new loggers. + */ +private final LogContext logContext; + +/** + * The slf4j logger. + */ +private final Logger log; + +/** + * The group id. + */ +private final String groupId; + +/** + * The initial group state. + */ +private final GenericGroupState initialState; + +/** + * The time. + */ +private final Time time; + +/** + * The lock used to synchronize the group. + */ +private final Lock lock = new ReentrantLock(); + +/** + * The current group state. + */ +private GenericGroupState state; + +/** + * The timestamp of when the group transitioned + * to its current state. + */ +private Optional currentStateTimestamp; + +/** + * The protocol type used for rebalance. + */ +private Optional protocolType = Optional.empty(); + +/** + * The protocol name used for rebalance. + */ +private Optional protocolName = Optional.empty(); + +/** + * The generation id. + */ +private int generationId = 0; + +/** + * The id of the group's leader. + */ +private Optional leaderId = Optional.empty(); + +/** + * The members of the group. + */ +private final Map members = new HashMap<>(); + +/** + * The static members of the group. + */ +private final Map staticMembers = new HashMap<>(); + +/** + * Members who have yet to (re)join the group + * during the join group phase. + */ +private final Set pendingMembers = new HashSet<>(); + +/** + *
[GitHub] [kafka] dengziming commented on pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse
dengziming commented on PR #13679: URL: https://github.com/apache/kafka/pull/13679#issuecomment-1543964126 I rerun the failed tests locally and they work correctly, I create a JIRA to track this problem. https://issues.apache.org/jira/browse/KAFKA-14989 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14989) Flaky test TransactionsTest.testFailureToFenceEpoch
[ https://issues.apache.org/jira/browse/KAFKA-14989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721762#comment-17721762 ] Deng Ziming commented on KAFKA-14989: - https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13679/6/testReport/junit/kafka.api/TransactionsTest/Build___JDK_11_and_Scala_2_13___testFailureToFenceEpoch_String__quorum_kraft/ > Flaky test TransactionsTest.testFailureToFenceEpoch > --- > > Key: KAFKA-14989 > URL: https://issues.apache.org/jira/browse/KAFKA-14989 > Project: Kafka > Issue Type: Improvement >Reporter: Deng Ziming >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14989) Flaky test TransactionsTest.testFailureToFenceEpoch
Deng Ziming created KAFKA-14989: --- Summary: Flaky test TransactionsTest.testFailureToFenceEpoch Key: KAFKA-14989 URL: https://issues.apache.org/jira/browse/KAFKA-14989 Project: Kafka Issue Type: Improvement Reporter: Deng Ziming -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jlprat commented on pull request #13706: MINOR: clean up unused methods in core utils
jlprat commented on PR #13706: URL: https://github.com/apache/kafka/pull/13706#issuecomment-1543873607 Thanks @machi1990 I fixed the typo as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13694: MINOR: add test tag for testDeadToDeadIllegalTransition
dajac merged PR #13694: URL: https://github.com/apache/kafka/pull/13694 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
machi1990 commented on PR #12045: URL: https://github.com/apache/kafka/pull/12045#issuecomment-1543816986 > @machi1990 since you opened a PR to fix this flaky test, you might be familiar with this part of code. May I request you to review this PR please. Hey @divijvaidya I am new to Kafka and to this part of the code. It'll be good to get another round of reviews from committers since some of them have started to have a look at this PR. My attempt to fix the the flaky test in https://github.com/apache/kafka/pull/13702 was by slightly modifying the assertions which was more of having a quick win and stabilize the test. While this PR attempts to sort out the underlying issue with quota computation. I think it'll be good to get more eyes on the PR as suggested by https://github.com/apache/kafka/pull/12045#pullrequestreview-1004087364 and https://github.com/apache/kafka/pull/12045#pullrequestreview-959047597 what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #13701: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics …
viktorsomogyi commented on PR #13701: URL: https://github.com/apache/kafka/pull/13701#issuecomment-1543791889 @vamossagar12 yes, trunk and backports to 3.5 and 3.4 have been merged already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi merged pull request #13699: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics …
viktorsomogyi merged PR #13699: URL: https://github.com/apache/kafka/pull/13699 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi merged pull request #13698: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics …
viktorsomogyi merged PR #13698: URL: https://github.com/apache/kafka/pull/13698 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
Hangleton commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1190991070 ## core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogOffsetMetadata; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteLogReaderTest { Review Comment: Should we add a test for this use case: ``` minOneMessage = false hardMaxBytesLimit = false firstBatchSize > maxBytes? ``` or a combination thereof? Maybe, please let me know if I can help with a few unit tests. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14975) Make TopicBasedRemoteLogMetadataManager methods wait for initialize to complete
[ https://issues.apache.org/jira/browse/KAFKA-14975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew de Detrich updated KAFKA-14975: --- Parent: KAFKA-7739 Issue Type: Sub-task (was: Task) > Make TopicBasedRemoteLogMetadataManager methods wait for initialize to > complete > --- > > Key: KAFKA-14975 > URL: https://issues.apache.org/jira/browse/KAFKA-14975 > Project: Kafka > Issue Type: Sub-task >Reporter: Matthew de Detrich >Assignee: Matthew de Detrich >Priority: Major > > In the current implementation of TopicBasedRemoteLogMetadataManager various > methods internally call the > ensureInitializedAndNotClosed to ensure that the > TopicBasedRemoteLogMetadataManager is initialized. If > TopicBasedRemoteLogMetadataManager is not initialized then an exception will > be thrown. > This is not an ideal behaviour, rather than just throwing an exception we > should instead try to wait until TopicBasedRemoteLogMetadataManager is > initialised (with a timeout). This is what the expected behaviour from users > should be and its also what other parts of Kafka that use plugin based > systems (ergo kafka connect) do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on a diff in pull request #13517: KAFKA-14752: Kafka examples improvements - demo changes
showuon commented on code in PR #13517: URL: https://github.com/apache/kafka/pull/13517#discussion_r1190888533 ## examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java: ## @@ -16,29 +16,59 @@ */ package kafka.examples; -import org.apache.kafka.common.errors.TimeoutException; - import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +/** + * This example can be decomposed into the following stages: + * + * 1. Clean any topics left from previous runs. + * 2. Create a producer thread to send a set of records to topic1. + * 3. Create a consumer thread to fetch all previously sent records from topic1. + * + * If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`. + * You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to + * record all the log output together. + */ public class KafkaConsumerProducerDemo { -public static void main(String[] args) throws InterruptedException { -boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync"); -CountDownLatch latch = new CountDownLatch(2); -Producer producerThread = new Producer( -"producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 1, -1, latch); -producerThread.start(); - -Consumer consumerThread = new Consumer( -"consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, "DemoConsumer", Optional.empty(), false, 1, latch); -consumerThread.start(); - -if (!latch.await(5, TimeUnit.MINUTES)) { -throw new TimeoutException("Timeout after 5 minutes waiting for demo producer and consumer to finish"); -} +public static final String BOOTSTRAP_SERVERS = "localhost:9092"; +public static final String TOPIC_NAME = "my-topic"; +public static final String GROUP_NAME = "my-group"; + +public static void main(String[] args) { +try { +if (args.length == 0) { +Utils.printHelp("This example takes 2 parameters (i.e. 1 sync):%n" + +"- records: total number of records to send (required)%n" + +"- mode: pass 'sync' to send records synchronously (optional)"); +return; +} -consumerThread.shutdown(); -System.out.println("All finished!"); +int numRecords = Integer.parseInt(args[0]); +boolean isAsync = args.length == 1 || !args[1].trim().equalsIgnoreCase("sync"); + +// stage 1: clean any topics left from previous runs +Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME); +CountDownLatch latch = new CountDownLatch(2); + +// stage 2: produce records to topic1 +Producer producerThread = new Producer( +"producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, false, numRecords, -1, latch); +producerThread.start(); + +// stage 3: consume records from topic1 +Consumer consumerThread = new Consumer( +"consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, Optional.empty(), false, numRecords, latch); +consumerThread.start(); + +if (!latch.await(5, TimeUnit.MINUTES)) { +Utils.printErr("Timeout after 5 minutes waiting for termination"); +//producerThread.shutdown(); +consumerThread.shutdown(); Review Comment: Why does it get commented out? ## examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java: ## @@ -16,182 +16,90 @@ */ package kafka.examples; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.TopicExistsException; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; - -import java.util.Arrays; -import java.util.List; import java.util.Optional; -import java.util.Properties; -import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** - * This exactly once demo driver takes 3 arguments: - * - partition: number of partitions for input/output topic - * - instances: number of instances - * - records: number of records - * An example argument list would be `6 3 5`. - * - * If you are using IntelliJ IDEA, the above arguments should be put in the configuration's `Program Arguments`. - * Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console - * output to file`
[GitHub] [kafka] akatona84 commented on pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException
akatona84 commented on PR #11565: URL: https://github.com/apache/kafka/pull/11565#issuecomment-1543645307 thx. rebased again and did som adjustments. (this time I built it and ran my modified unit tests as well :) ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes
showuon merged PR #13516: URL: https://github.com/apache/kafka/pull/13516 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes
showuon commented on PR #13516: URL: https://github.com/apache/kafka/pull/13516#issuecomment-1543640638 Failed tests are unrelated: ``` Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.testMultipleServerMechanisms() Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testRestartReplication() Build / JDK 17 and Scala 2.13 / kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback() Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsTest.testFailureToFenceEpoch(String).quorum=kraft Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[5] Type=Raft-Combined, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.5-IV2, Security=PLAINTEXT ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes
showuon commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1190880657 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +try { +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +KafkaProducer producer = +new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get(); + +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +KafkaConsumer consumer = new Consumer( +"processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null) +.createKafkaConsumer(); + +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } - -Map offsets = consumerOffsets(); - -// Checkpoint the progress by sending offsets to group coordinator broker. -// Note that this API is only available for broker >= 2.5. -producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); - -// Finish the transaction. All sent records should be visible for consumption now. -producer.commitTransaction(); -messageProcessed += records.count(); +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { Review Comment: Good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the
[GitHub] [kafka] mimaison merged pull request #13170: MINOR: Remove unused methods in CoreUtils
mimaison merged PR #13170: URL: https://github.com/apache/kafka/pull/13170 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] AB027PS commented on pull request #12331: KAFKA-1194: changes needed to run on Windows
AB027PS commented on PR #12331: URL: https://github.com/apache/kafka/pull/12331#issuecomment-1543611485 I work with Martin Pelak and he suggested that I add my change to this PR, but my change is unrelated to the rest of this PR. So this doesn't address the errors mentioned in the first comment if that's what you're asking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13349) Allow Iterator.remove on KeyValueIterator
[ https://issues.apache.org/jira/browse/KAFKA-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721696#comment-17721696 ] Mahesh Madushan commented on KAFKA-13349: - Hello [~guozhang] . While I'm looking into this issue , and I'm new to the codebase and wondering that the range iterator mention here is a KeyValueIterator and if we want to support remove, we should implement remove method in all implementation of the KeyValueIterator? is my thought process is correct ? Thank You. > Allow Iterator.remove on KeyValueIterator > - > > Key: KAFKA-13349 > URL: https://issues.apache.org/jira/browse/KAFKA-13349 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip, newbie++ > > Today Stream's state store's range iterator does not support `remove`. We > could consider adding such support for all the built-in state stores: > * RocksDB's native iterator does not support removal, but we can always do a > delete(key) concurrently while the iterator is open on the snapshot. > * In-Memory: straight forward implementation. > The benefit of that is then for range-and-delete truncation operation we do > not necessarily have to be cautious about concurrent modification exceptions. > This could also help GC with in-memory stores. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jlprat commented on pull request #13706: MINOR: clean up unused methods in core utils
jlprat commented on PR #13706: URL: https://github.com/apache/kafka/pull/13706#issuecomment-1543608786 @mimaison I took inspiration from your PR to clean up some more unused methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a diff in pull request #13170: MINOR: Remove unused methods in CoreUtils
jlprat commented on code in PR #13170: URL: https://github.com/apache/kafka/pull/13170#discussion_r1190857226 ## core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala: ## @@ -223,7 +204,7 @@ class CoreUtilsTest extends Logging { val map = new ConcurrentHashMap[Int, AtomicInteger]().asScala implicit val executionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(nThreads)) Review Comment: Looks good! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat opened a new pull request, #13706: MINOR: clean up unused methods in core utils
jlprat opened a new pull request, #13706: URL: https://github.com/apache/kafka/pull/13706 Inspired by https://github.com/apache/kafka/pull/13170 I decided to check some other core utils classes and remove some unused methods. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13170: MINOR: Remove unused methods in CoreUtils
mimaison commented on code in PR #13170: URL: https://github.com/apache/kafka/pull/13170#discussion_r1190855504 ## core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala: ## @@ -223,7 +204,7 @@ class CoreUtilsTest extends Logging { val map = new ConcurrentHashMap[Int, AtomicInteger]().asScala implicit val executionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(nThreads)) Review Comment: I've add to use `ExecutionContextExecutorService` instead of `ExecutionContext` because we call `shutdownNow()` below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14988) Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944
[ https://issues.apache.org/jira/browse/KAFKA-14988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14988: - Fix Version/s: 3.6.0 > Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944 > - > > Key: KAFKA-14988 > URL: https://issues.apache.org/jira/browse/KAFKA-14988 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Minor > Fix For: 3.6.0 > > > Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from > a critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j] > > The CVE does not impact Kafka as per > https://issues.apache.org/jira/browse/KAFKA-14267 and is fixed in > ScalaCollectionCompact v2.9 as per > [https://github.com/scala/scala-collection-compat/pull/569] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14988) Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944
[ https://issues.apache.org/jira/browse/KAFKA-14988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14988: - Description: Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from a critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j] The CVE does not impact Kafka as per https://issues.apache.org/jira/browse/KAFKA-14267 (hence, not marking this as critical) and is fixed in ScalaCollectionCompact v2.9 as per [https://github.com/scala/scala-collection-compat/pull/569] was: Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from a critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j] The CVE does not impact Kafka as per https://issues.apache.org/jira/browse/KAFKA-14267 and is fixed in ScalaCollectionCompact v2.9 as per [https://github.com/scala/scala-collection-compat/pull/569] > Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944 > - > > Key: KAFKA-14988 > URL: https://issues.apache.org/jira/browse/KAFKA-14988 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Minor > Fix For: 3.6.0 > > > Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from > a critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j] > > The CVE does not impact Kafka as per > https://issues.apache.org/jira/browse/KAFKA-14267 (hence, not marking this > as critical) and is fixed in ScalaCollectionCompact v2.9 as per > [https://github.com/scala/scala-collection-compat/pull/569] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14988) Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944
[ https://issues.apache.org/jira/browse/KAFKA-14988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721691#comment-17721691 ] Divij Vaidya edited comment on KAFKA-14988 at 5/11/23 8:46 AM: --- [https://github.com/apache/kafka/pull/13673] was (Author: divijvaidya): Fixed by [https://github.com/apache/kafka/pull/13673] > Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944 > - > > Key: KAFKA-14988 > URL: https://issues.apache.org/jira/browse/KAFKA-14988 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Minor > > Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from > a critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j] > > The CVE does not impact Kafka as per > https://issues.apache.org/jira/browse/KAFKA-14267 and is fixed in > ScalaCollectionCompact v2.9 as per > [https://github.com/scala/scala-collection-compat/pull/569] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14988) Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944
[ https://issues.apache.org/jira/browse/KAFKA-14988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721691#comment-17721691 ] Divij Vaidya commented on KAFKA-14988: -- Fixed by [https://github.com/apache/kafka/pull/13673] > Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944 > - > > Key: KAFKA-14988 > URL: https://issues.apache.org/jira/browse/KAFKA-14988 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Minor > > Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from > a critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j] > > The CVE does not impact Kafka as per > https://issues.apache.org/jira/browse/KAFKA-14267 and is fixed in > ScalaCollectionCompact v2.9 as per > [https://github.com/scala/scala-collection-compat/pull/569] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Hangleton commented on pull request #12331: KAFKA-1194: changes needed to run on Windows
Hangleton commented on PR #12331: URL: https://github.com/apache/kafka/pull/12331#issuecomment-1543590907 Hi Julius (@AB027PS) - just to clarify, this isn't the same as the `AccessDeniedException` addressed by this CR right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on PR #12045: URL: https://github.com/apache/kafka/pull/12045#issuecomment-1543589903 @machi1990 since you opened a PR to fix this flaky test, you might be familiar with this part of code. May I request you to review this PR please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14988) Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944
Divij Vaidya created KAFKA-14988: Summary: Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944 Key: KAFKA-14988 URL: https://issues.apache.org/jira/browse/KAFKA-14988 Project: Kafka Issue Type: Improvement Reporter: Divij Vaidya Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from a critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j] The CVE does not impact Kafka as per https://issues.apache.org/jira/browse/KAFKA-14267 and is fixed in ScalaCollectionCompact v2.9 as per [https://github.com/scala/scala-collection-compat/pull/569] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] AB027PS commented on pull request #12331: KAFKA-1194: changes needed to run on Windows
AB027PS commented on PR #12331: URL: https://github.com/apache/kafka/pull/12331#issuecomment-1543556383 @Hangleton @cmccabe I encountered a bug while using KRaft where Kafka would mark .checkpoint files in the _cluster-metadata_0 folder for deletion and then would fail to delete them because they are read-only. The root cause is that it tried to do it using the Files.deleteIfExists method, which can't delete read-only files on Windows. I made changes to KafkaMetadataLog.scala and Snapshots.java where I created a new method for this purpose. path.toFile().setWritable(true) works if there is no file at the path on Windows, but I'm not sure how it behaves on Linux, so I can create a new unit test for it or put it behind an isOsWindows check. Should I create a new PR for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
erikvanoosten commented on code in PR #13678: URL: https://github.com/apache/kafka/pull/13678#discussion_r1190794344 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1145,6 +1150,7 @@ public void onFailure(RuntimeException e) { if (commitException instanceof FencedInstanceIdException) { asyncCommitFenced.set(true); } +inFlightAsyncCommits.decrementAndGet(); Review Comment: I think it is not needed here. There is nothing that could fail with an exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
erikvanoosten commented on code in PR #13678: URL: https://github.com/apache/kafka/pull/13678#discussion_r1190791004 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) { } } +private boolean waitForPendingAsyncCommits(Timer timer) { Review Comment: Renamed the method in 1fa48f53f0e6f5a2a9821075fa053e01cba6b0b2. Also, see this comment: https://github.com/apache/kafka/pull/13678#discussion_r1190730213 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org